Newer
Older
cortex-hub / test_venv / lib / python3.9 / site-packages / croniter / croniter.py
#!/usr/bin/env python
import binascii
import calendar
import copy
import datetime
import math
import platform
import random
import re
import struct
import sys
import traceback as _traceback
from time import time
from typing import Any, Literal, Optional, Union

from dateutil.relativedelta import relativedelta
from dateutil.tz import datetime_exists, tzutc

ExpandedExpression = list[Union[int, Literal["*", "l"]]]


def is_32bit() -> bool:
    """
    Detect if Python is running in 32-bit mode.
    Returns True if running on 32-bit Python, False for 64-bit.
    """
    # Method 1: Check pointer size
    bits = struct.calcsize("P") * 8

    # Method 2: Check platform architecture string
    try:
        architecture = platform.architecture()[0]
    except RuntimeError:
        architecture = None

    # Method 3: Check maxsize
    is_small_maxsize = sys.maxsize <= 2**32

    # Evaluate all available methods
    is_32 = False

    if bits == 32:
        is_32 = True
    elif architecture and "32" in architecture:
        is_32 = True
    elif is_small_maxsize:
        is_32 = True

    return is_32


try:
    # https://github.com/python/cpython/issues/101069 detection
    if is_32bit():
        datetime.datetime.fromtimestamp(3999999999)
    OVERFLOW32B_MODE = False
except OverflowError:
    OVERFLOW32B_MODE = True


UTC_DT = datetime.timezone.utc
EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)

M_ALPHAS: dict[str, Union[int, str]] = {
    "jan": 1,
    "feb": 2,
    "mar": 3,
    "apr": 4,
    "may": 5,
    "jun": 6,
    "jul": 7,
    "aug": 8,
    "sep": 9,
    "oct": 10,
    "nov": 11,
    "dec": 12,
}
DOW_ALPHAS: dict[str, Union[int, str]] = {
    "sun": 0,
    "mon": 1,
    "tue": 2,
    "wed": 3,
    "thu": 4,
    "fri": 5,
    "sat": 6,
}

MINUTE_FIELD = 0
HOUR_FIELD = 1
DAY_FIELD = 2
MONTH_FIELD = 3
DOW_FIELD = 4
SECOND_FIELD = 5
YEAR_FIELD = 6

UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD)
SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD)
YEAR_FIELDS = (
    MINUTE_FIELD,
    HOUR_FIELD,
    DAY_FIELD,
    MONTH_FIELD,
    DOW_FIELD,
    SECOND_FIELD,
    YEAR_FIELD,
)

step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
only_int_re = re.compile(r"^\d+$")

DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
WEEKDAYS = "|".join(DOW_ALPHAS.keys())
MONTHS = "|".join(M_ALPHAS.keys())
star_or_int_re = re.compile(r"^(\d+|\*)$")
special_dow_re = re.compile(
    rf"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)"
    rf"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$"
)
nearest_weekday_re = re.compile(r"^(?:(\d+)w|w(\d+))$")
re_star = re.compile("[*]")
hash_expression_re = re.compile(
    r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
)

CRON_FIELDS = {
    "unix": UNIX_FIELDS,
    "second": SECOND_FIELDS,
    "year": YEAR_FIELDS,
    len(UNIX_FIELDS): UNIX_FIELDS,
    len(SECOND_FIELDS): SECOND_FIELDS,
    len(YEAR_FIELDS): YEAR_FIELDS,
}
UNIX_CRON_LEN = len(UNIX_FIELDS)
SECOND_CRON_LEN = len(SECOND_FIELDS)
YEAR_CRON_LEN = len(YEAR_FIELDS)
# retrocompat
VALID_LEN_EXPRESSION = {a for a in CRON_FIELDS if isinstance(a, int)}

MARKER = object()


def datetime_to_timestamp(d):
    if d.tzinfo is not None:
        d = d.replace(tzinfo=None) - d.utcoffset()

    return (d - datetime.datetime(1970, 1, 1)).total_seconds()


def _is_leap(year: int) -> bool:
    return year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)


def _last_day_of_month(year: int, month: int) -> int:
    """Calculate the last day of the given month (honor leap years)."""
    last_day = DAYS[month - 1]
    if month == 2 and _is_leap(year):
        last_day += 1
    return last_day


def _is_successor(
    date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
) -> bool:
    """Check if the given date is a successor (after/before) of the previous date."""
    if is_prev:
        return date.astimezone(UTC_DT) < previous_date.astimezone(UTC_DT)
    return date.astimezone(UTC_DT) > previous_date.astimezone(UTC_DT)


def _timezone_delta(date1: datetime.datetime, date2: datetime.datetime) -> datetime.timedelta:
    """Calculate the timezone difference of the given dates."""
    offset1 = date1.utcoffset()
    offset2 = date2.utcoffset()
    assert offset1 is not None
    assert offset2 is not None
    return offset2 - offset1


def _add_tzinfo(
    date: datetime.datetime, previous_date: datetime.datetime, is_prev: bool
) -> tuple[datetime.datetime, bool]:
    """Add the tzinfo from the previous date to the given date.

    In case the new date is ambiguous, determine the correct date
    based on it being closer to the previous date but still a successor
    (after/before based on `is_prev`).

    In case the date does not exist, jump forward to the next existing date.
    """
    localize = getattr(previous_date.tzinfo, "localize", None)
    if localize is not None:
        # pylint: disable-next=import-outside-toplevel
        import pytz

        try:
            result = localize(date, is_dst=None)
        except pytz.NonExistentTimeError:
            while True:
                date += datetime.timedelta(minutes=1)
                try:
                    result = localize(date, is_dst=None)
                except pytz.NonExistentTimeError:
                    continue
                break
            return result, False
        except pytz.AmbiguousTimeError:
            closer = localize(date, is_dst=not is_prev)
            farther = localize(date, is_dst=is_prev)
            # TODO: Check negative DST
            assert (closer.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
            if _is_successor(closer, previous_date, is_prev):
                result = closer
            else:
                assert _is_successor(farther, previous_date, is_prev)
                result = farther
        return result, True

    result = date.replace(fold=1 if is_prev else 0, tzinfo=previous_date.tzinfo)
    if not datetime_exists(result):
        while not datetime_exists(result):
            result += datetime.timedelta(minutes=1)
        return result, False

    # result is closer to the previous date
    farther = date.replace(fold=0 if is_prev else 1, tzinfo=previous_date.tzinfo)
    # Comparing the UTC offsets in the check for the date being ambiguous.
    if result.utcoffset() != farther.utcoffset():
        # TODO: Check negative DST
        assert (result.astimezone(UTC_DT) > farther.astimezone(UTC_DT)) == is_prev
        if not _is_successor(result, previous_date, is_prev):
            assert _is_successor(farther, previous_date, is_prev)
            result = farther
    return result, True


class CroniterError(ValueError):
    """General top-level Croniter base exception"""


class CroniterBadTypeRangeError(TypeError):
    """."""


class CroniterBadCronError(CroniterError):
    """Syntax, unknown value, or range error within a cron expression"""


class CroniterUnsupportedSyntaxError(CroniterBadCronError):
    """Valid cron syntax, but likely to produce inaccurate results"""

    # Extending CroniterBadCronError, which may be contridatory, but this allows
    # catching both errors with a single exception.  From a user perspective
    # these will likely be handled the same way.


class CroniterBadDateError(CroniterError):
    """Unable to find next/prev timestamp match"""


class CroniterNotAlphaError(CroniterBadCronError):
    """Cron syntax contains an invalid day or month abbreviation"""


class croniter:
    MONTHS_IN_YEAR = 12

    # This helps with expanding `*` fields into `lower-upper` ranges. Each item
    # in this tuple maps to the corresponding field index
    RANGES = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099))

    ALPHACONV: tuple[dict[str, Union[int, str]], ...] = (
        {},  # 0: min
        {},  # 1: hour
        {"l": "l"},  # 2: dom
        # 3: mon
        copy.deepcopy(M_ALPHAS),
        # 4: dow
        copy.deepcopy(DOW_ALPHAS),
        # 5: second
        {},
        # 6: year
        {},
    )

    LOWMAP: tuple[dict[int, int], ...] = ({}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {})

    LEN_MEANS_ALL = (60, 24, 31, 12, 7, 60, 130)

    def __init__(
        self,
        expr_format: str,
        start_time: Optional[Union[datetime.datetime, float]] = None,
        ret_type: type = float,
        day_or: bool = True,
        max_years_between_matches: Optional[int] = None,
        is_prev: bool = False,
        hash_id: Optional[Union[bytes, str]] = None,
        implement_cron_bug: bool = False,
        second_at_beginning: bool = False,
        expand_from_start_time: bool = False,
    ) -> None:
        self._ret_type = ret_type
        self._day_or = day_or
        self._implement_cron_bug = implement_cron_bug
        self.second_at_beginning = bool(second_at_beginning)
        self._expand_from_start_time = expand_from_start_time

        if hash_id is not None:
            if not isinstance(hash_id, (bytes, str)):
                raise TypeError("hash_id must be bytes or UTF-8 string")
            if not isinstance(hash_id, bytes):
                hash_id = hash_id.encode("UTF-8")

        self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
        if max_years_between_matches is None:
            max_years_between_matches = 50
        self._max_years_between_matches = max(int(max_years_between_matches), 1)

        if start_time is None:
            start_time = time()

        self.tzinfo: Optional[datetime.tzinfo] = None

        self.start_time = 0.0
        self.dst_start_time = 0.0
        self.cur = 0.0
        self.set_current(start_time, force=True)

        self.expanded, self.nth_weekday_of_month, self.expressions, self.nearest_weekday = self._expand(
            expr_format,
            hash_id=hash_id,
            from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
            second_at_beginning=second_at_beginning,
        )
        self.fields = CRON_FIELDS[len(self.expanded)]
        self._is_prev = is_prev

    @classmethod
    def _alphaconv(cls, index, key, expressions):
        try:
            return cls.ALPHACONV[index][key]
        except KeyError:
            raise CroniterNotAlphaError(f"[{' '.join(expressions)}] is not acceptable")

    def get_next(self, ret_type=None, start_time=None, update_current=True):
        if start_time and self._expand_from_start_time:
            raise ValueError(
                "start_time is not supported when using expand_from_start_time = True."
            )
        return self._get_next(
            ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current
        )

    def get_prev(self, ret_type=None, start_time=None, update_current=True):
        return self._get_next(
            ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current
        )

    def get_current(self, ret_type=None):
        ret_type = ret_type or self._ret_type
        if issubclass(ret_type, datetime.datetime):
            return self.timestamp_to_datetime(self.cur)
        return self.cur

    def set_current(
        self, start_time: Optional[Union[datetime.datetime, float]], force: bool = True
    ) -> float:
        if (force or (self.cur is None)) and start_time is not None:
            if isinstance(start_time, datetime.datetime):
                self.tzinfo = start_time.tzinfo
                start_time = self.datetime_to_timestamp(start_time)

            self.start_time = start_time
            self.dst_start_time = start_time
            self.cur = start_time
        return self.cur

    @staticmethod
    def datetime_to_timestamp(d: datetime.datetime) -> float:
        """
        Converts a `datetime` object `d` into a UNIX timestamp.
        """
        return datetime_to_timestamp(d)

    _datetime_to_timestamp = datetime_to_timestamp  # retrocompat

    def timestamp_to_datetime(self, timestamp: float, tzinfo: Any = MARKER) -> datetime.datetime:
        """
        Converts a UNIX `timestamp` into a `datetime` object.
        """
        if tzinfo is MARKER:  # allow to give tzinfo=None even if self.tzinfo is set
            tzinfo = self.tzinfo
        if OVERFLOW32B_MODE:
            # degraded mode to workaround Y2038
            # see https://github.com/python/cpython/issues/101069
            result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
        else:
            result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
        if tzinfo:
            result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
        return result

    _timestamp_to_datetime = timestamp_to_datetime  # retrocompat

    def _get_next(self, ret_type=None, start_time=None, is_prev=None, update_current=None):
        if update_current is None:
            update_current = True
        self.set_current(start_time, force=True)
        if is_prev is None:
            is_prev = self._is_prev
        self._is_prev = is_prev

        ret_type = ret_type or self._ret_type

        if not issubclass(ret_type, (float, datetime.datetime)):
            raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")

        result = self._calc_next(is_prev)
        timestamp = self.datetime_to_timestamp(result)
        if update_current:
            self.cur = timestamp
        if issubclass(ret_type, datetime.datetime):
            return result
        return timestamp

    # iterator protocol, to enable direct use of croniter
    # objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
    # or for combining multiple croniters into single
    # dates feed using 'itertools' module
    def all_next(self, ret_type=None, start_time=None, update_current=None):
        """
        Returns a generator yielding consecutive dates.

        May be used instead of an implicit call to __iter__ whenever a
        non-default `ret_type` needs to be specified.
        """
        # In a Python 3.7+ world:  contextlib.suppress and contextlib.nullcontext could
        # be used instead
        try:
            while True:
                self._is_prev = False
                yield self._get_next(
                    ret_type=ret_type, start_time=start_time, update_current=update_current
                )
                start_time = None
        except CroniterBadDateError:
            if self._max_years_btw_matches_explicitly_set:
                return
            raise

    def all_prev(self, ret_type=None, start_time=None, update_current=None):
        """
        Returns a generator yielding previous dates.
        """
        try:
            while True:
                self._is_prev = True
                yield self._get_next(
                    ret_type=ret_type, start_time=start_time, update_current=update_current
                )
                start_time = None
        except CroniterBadDateError:
            if self._max_years_btw_matches_explicitly_set:
                return
            raise

    def iter(self, *args, **kwargs):
        return self.all_prev if self._is_prev else self.all_next

    def __iter__(self):
        return self

    __next__ = next = _get_next

    def _calc_next(self, is_prev: bool) -> datetime.datetime:
        current = self.timestamp_to_datetime(self.cur)
        expanded = self.expanded[:]
        nth_weekday_of_month = self.nth_weekday_of_month.copy()

        # exception to support day of month and day of week as defined in cron
        if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
            # If requested, handle a bug in vixie cron/ISC cron where day_of_month and
            # day_of_week form an intersection (AND) instead of a union (OR) if either
            # field is an asterisk or starts with an asterisk (https://crontab.guru/cron-bug.html)
            if self._implement_cron_bug and (
                re_star.match(self.expressions[DAY_FIELD])
                or re_star.match(self.expressions[DOW_FIELD])
            ):
                # To produce a schedule identical to the cron bug, we'll bypass the code
                # that makes a union of DOM and DOW, and instead skip to the code that
                # does an intersect instead
                pass
            else:
                bak = expanded[DOW_FIELD]
                expanded[DOW_FIELD] = ["*"]
                t1 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
                expanded[DOW_FIELD] = bak
                expanded[DAY_FIELD] = ["*"]

                t2 = self._calc(current, expanded, nth_weekday_of_month, is_prev)
                if is_prev:
                    return t1 if t1 > t2 else t2
                return t1 if t1 < t2 else t2

        return self._calc(current, expanded, nth_weekday_of_month, is_prev)

    def _calc(
        self,
        now: datetime.datetime,
        expanded: list[ExpandedExpression],
        nth_weekday_of_month: dict[int, set[int]],
        is_prev: bool,
    ) -> datetime.datetime:
        if is_prev:
            nearest_diff_method = self._get_prev_nearest_diff
            offset = relativedelta(microseconds=-1)
        else:
            nearest_diff_method = self._get_next_nearest_diff
            if len(expanded) > UNIX_CRON_LEN:
                offset = relativedelta(seconds=1)
            else:
                offset = relativedelta(minutes=1)
        # Calculate the next cron time in local time a.k.a. timezone unaware time.
        unaware_time = now.replace(tzinfo=None) + offset
        if len(expanded) > UNIX_CRON_LEN:
            unaware_time = unaware_time.replace(microsecond=0)
        else:
            unaware_time = unaware_time.replace(second=0, microsecond=0)

        month = unaware_time.month
        year = current_year = unaware_time.year

        def proc_year(d):
            if len(expanded) == YEAR_CRON_LEN:
                try:
                    expanded[YEAR_FIELD].index("*")
                except ValueError:
                    # use None as range_val to indicate no loop
                    diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
                    if diff_year is None:
                        return None, d
                    if diff_year != 0:
                        if is_prev:
                            d += relativedelta(
                                years=diff_year, month=12, day=31, hour=23, minute=59, second=59
                            )
                        else:
                            d += relativedelta(
                                years=diff_year, month=1, day=1, hour=0, minute=0, second=0
                            )
                        return True, d
            return False, d

        def proc_month(d):
            try:
                expanded[MONTH_FIELD].index("*")
            except ValueError:
                diff_month = nearest_diff_method(
                    d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR
                )
                reset_day = 1

                if diff_month is not None and diff_month != 0:
                    if is_prev:
                        d += relativedelta(months=diff_month)
                        reset_day = _last_day_of_month(d.year, d.month)
                        d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
                    else:
                        d += relativedelta(
                            months=diff_month, day=reset_day, hour=0, minute=0, second=0
                        )
                    return True, d
            return False, d

        def proc_day_of_month(d):
            try:
                expanded[DAY_FIELD].index("*")
            except ValueError:
                days = _last_day_of_month(year, month)
                if "l" in expanded[DAY_FIELD] and days == d.day:
                    return False, d

                if is_prev:
                    prev_month = (month - 2) % self.MONTHS_IN_YEAR + 1
                    prev_year = year - 1 if month == 1 else year
                    days_in_prev_month = _last_day_of_month(prev_year, prev_month)
                    diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
                else:
                    diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)

                if diff_day is not None and diff_day != 0:
                    if is_prev:
                        d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
                    else:
                        d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
                    return True, d
            return False, d

        def proc_day_of_week(d):
            try:
                expanded[DOW_FIELD].index("*")
            except ValueError:
                diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
                if diff_day_of_week is not None and diff_day_of_week != 0:
                    if is_prev:
                        d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
                    else:
                        d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
                    return True, d
            return False, d

        def proc_day_of_week_nth(d):
            if "*" in nth_weekday_of_month:
                s = nth_weekday_of_month["*"]
                for i in range(0, 7):
                    if i in nth_weekday_of_month:
                        nth_weekday_of_month[i].update(s)
                    else:
                        nth_weekday_of_month[i] = s
                del nth_weekday_of_month["*"]

            candidates = []
            for wday, nth in nth_weekday_of_month.items():
                c = self._get_nth_weekday_of_month(d.year, d.month, wday)
                for n in nth:
                    if n == "l":
                        candidate = c[-1]
                    elif len(c) < n:
                        continue
                    else:
                        candidate = c[n - 1]
                    if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
                        candidates.append(candidate)

            if not candidates:
                if is_prev:
                    d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
                else:
                    days = _last_day_of_month(year, month)
                    d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
                return True, d

            candidates.sort()
            diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
            if diff_day != 0:
                if is_prev:
                    d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
                else:
                    d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
                return True, d
            return False, d

        def proc_nearest_weekday(d):
            """Process W (nearest weekday) day-of-month entries."""
            candidates = []
            for w_day in self.nearest_weekday:
                candidate = self._get_nearest_weekday(d.year, d.month, w_day)
                if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
                    candidates.append(candidate)

            if not candidates:
                if is_prev:
                    d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
                else:
                    days = _last_day_of_month(year, month)
                    d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
                return True, d

            candidates.sort()
            diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
            if diff_day != 0:
                if is_prev:
                    d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
                else:
                    d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
                return True, d
            return False, d

        def proc_hour(d):
            try:
                expanded[HOUR_FIELD].index("*")
            except ValueError:
                diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
                if diff_hour is not None and diff_hour != 0:
                    if is_prev:
                        d += relativedelta(hours=diff_hour, minute=59, second=59)
                    else:
                        d += relativedelta(hours=diff_hour, minute=0, second=0)
                    return True, d
            return False, d

        def proc_minute(d):
            try:
                expanded[MINUTE_FIELD].index("*")
            except ValueError:
                diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
                if diff_min is not None and diff_min != 0:
                    if is_prev:
                        d += relativedelta(minutes=diff_min, second=59)
                    else:
                        d += relativedelta(minutes=diff_min, second=0)
                    return True, d
            return False, d

        def proc_second(d):
            if len(expanded) > UNIX_CRON_LEN:
                try:
                    expanded[SECOND_FIELD].index("*")
                except ValueError:
                    diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
                    if diff_sec is not None and diff_sec != 0:
                        d += relativedelta(seconds=diff_sec)
                        return True, d
            else:
                d += relativedelta(second=0)
            return False, d

        procs = [
            proc_year,
            proc_month,
            (proc_nearest_weekday if self.nearest_weekday else proc_day_of_month),
            (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
            proc_hour,
            proc_minute,
            proc_second,
        ]

        while abs(year - current_year) <= self._max_years_between_matches:
            next = False
            stop = False
            for proc in procs:
                (changed, unaware_time) = proc(unaware_time)
                # `None` can be set mostly for year processing
                # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
                if changed is None:
                    stop = True
                    break
                if changed:
                    month, year = unaware_time.month, unaware_time.year
                    next = True
                    break
            if stop:
                break
            if next:
                continue

            unaware_time = unaware_time.replace(microsecond=0)
            if now.tzinfo is None:
                return unaware_time

            # Add timezone information back and handle DST changes
            aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)

            if not exists and (
                not _is_successor(aware_time, now, is_prev) or "*" in expanded[HOUR_FIELD]
            ):
                # The calculated local date does not exist and moving the time forward
                # to the next valid time isn't the correct solution. Search for the
                # next matching cron time that exists.
                while not exists:
                    unaware_time = self._calc(
                        unaware_time, expanded, nth_weekday_of_month, is_prev
                    )
                    aware_time, exists = _add_tzinfo(unaware_time, now, is_prev)

            offset_delta = _timezone_delta(now, aware_time)
            if not offset_delta:
                # There was no DST change.
                return aware_time

            # There was a DST change. So check if there is a alternative cron time
            # for the other UTC offset.
            alternative_unaware_time = now.replace(tzinfo=None) + offset_delta
            alternative_unaware_time = self._calc(
                alternative_unaware_time, expanded, nth_weekday_of_month, is_prev
            )
            alternative_aware_time, exists = _add_tzinfo(alternative_unaware_time, now, is_prev)

            if not _is_successor(alternative_aware_time, now, is_prev):
                # The alternative time is an ancestor of now. Thus it is not an alternative.
                return aware_time

            if _is_successor(aware_time, alternative_aware_time, is_prev):
                return alternative_aware_time

            return aware_time

        if is_prev:
            raise CroniterBadDateError("failed to find prev date")
        raise CroniterBadDateError("failed to find next date")

    @staticmethod
    def _get_next_nearest_diff(x, to_check, range_val):
        """
        `range_val` is the range of a field.
        If no available time, we can move to next loop(like next month).
        `range_val` can also be set to `None` to indicate that there is no loop.
        ( Currently, should only used for `year` field )
        """
        for i, d in enumerate(to_check):
            if range_val is not None:
                if d == "l":
                    # if 'l' then it is the last day of month
                    # => its value of range_val
                    d = range_val
                elif d > range_val:
                    continue
            if d >= x:
                return d - x
        # When range_val is None and x not exists in to_check,
        # `None` will be returned to suggest no more available time
        if range_val is None:
            return None
        return to_check[0] - x + range_val

    @staticmethod
    def _get_prev_nearest_diff(x, to_check, range_val):
        """
        `range_val` is the range of a field.
        If no available time, we can move to previous loop(like previous month).
        Range_val can also be set to `None` to indicate that there is no loop.
        ( Currently should only used for `year` field )
        """
        candidates = to_check[:]
        candidates.reverse()
        for d in candidates:
            if d != "l" and d <= x:
                return d - x
        if "l" in candidates:
            return -x
        # When range_val is None and x not exists in to_check,
        # `None` will be returned to suggest no more available time
        if range_val is None:
            return None
        candidate = candidates[0]
        for c in candidates:
            # fixed: c < range_val
            # this code will reject all 31 day of month, 12 month, 59 second,
            # 23 hour and so on.
            # if candidates has just a element, this will not harmful.
            # but candidates have multiple elements, then values equal to
            # range_val will rejected.
            if c <= range_val:
                candidate = c
                break
        # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error
        # return 2021-03-02 06:00:00
        if candidate > range_val:
            return -range_val
        return candidate - x - range_val

    @staticmethod
    def _get_nth_weekday_of_month(year: int, month: int, day_of_week: int) -> tuple[int, ...]:
        """For a given year/month return a list of days in nth-day-of-month order.
        The last weekday of the month is always [-1].
        """
        w = (day_of_week + 6) % 7
        c = calendar.Calendar(w).monthdayscalendar(year, month)
        if c[0][0] == 0:
            c.pop(0)
        return tuple(i[0] for i in c)

    @staticmethod
    def _get_nearest_weekday(year, month, day):
        """Get the nearest weekday (Mon-Fri) to the given day in the given month.

        Rules:
        - If the day is a weekday, return it.
        - If Saturday, return Friday (day-1), unless that crosses into previous month,
          then return Monday (day+2).
        - If Sunday, return Monday (day+1), unless that crosses into next month,
          then return Friday (day-2).
        """
        last_day = _last_day_of_month(year, month)
        day = min(day, last_day)
        weekday = calendar.weekday(year, month, day)  # 0=Mon, 6=Sun
        if weekday < 5:  # Mon-Fri
            return day
        if weekday == 5:  # Saturday
            if day > 1:
                return day - 1  # Friday
            else:
                return day + 2  # Monday (1st is Sat, so 3rd is Mon)
        # Sunday
        if day < last_day:
            return day + 1  # Monday
        else:
            return day - 2  # Friday (last day is Sun, go back to Fri)

    @classmethod
    def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
        if isinstance(len_expressions, (list, dict, tuple, set)):
            len_expressions = len(len_expressions)
        if val in cls.LOWMAP[field_index] and not (
            # do not support 0 as a month either for classical 5 fields cron,
            # 6fields second repeat form or 7 fields year form
            # but still let conversion happen if day field is shifted
            (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
            or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
            or (
                field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD]
                and len_expressions == YEAR_CRON_LEN
            )
        ):
            val = cls.LOWMAP[field_index][val]
        return val

    # Maximum days in each month (non-leap year for Feb)
    DAYS_IN_MONTH = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}

    @classmethod
    def _expand(cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, strict=False, strict_year=None):
        # Split the expression in components, and normalize L -> l, MON -> mon,
        # etc. Keep expr_format untouched so we can use it in the exception
        # messages.
        expr_aliases = {
            "@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
            "@hourly": ("0 * * * *", "h * * * * h"),
            "@daily": ("0 0 * * *", "h h * * * h"),
            "@weekly": ("0 0 * * 0", "h h * * h h"),
            "@monthly": ("0 0 1 * *", "h h h * * h"),
            "@yearly": ("0 0 1 1 *", "h h h h * h"),
            "@annually": ("0 0 1 1 *", "h h h h * h"),
        }

        efl = expr_format.lower()
        hash_id_expr = 1 if hash_id is not None else 0
        try:
            efl = expr_aliases[efl][hash_id_expr]
        except KeyError:
            pass

        expressions = efl.split()

        if len(expressions) not in VALID_LEN_EXPRESSION:
            raise CroniterBadCronError(
                "Exactly 5, 6 or 7 columns has to be specified for iterator expression."
            )

        if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
            # move second to it's own(6th) field to process by same logical
            expressions.insert(SECOND_FIELD, expressions.pop(0))

        expanded = []
        nth_weekday_of_month = {}
        nearest_weekday = set()

        for field_index, expr in enumerate(expressions):
            for expanderid, expander in EXPANDERS.items():
                expr = expander(cls).expand(
                    efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp
                )

            if "?" in expr:
                if expr != "?":
                    raise CroniterBadCronError(
                        f"[{expr_format}] is not acceptable."
                        f" Question mark can not used with other characters"
                    )
                if field_index not in [DAY_FIELD, DOW_FIELD]:
                    raise CroniterBadCronError(
                        f"[{expr_format}] is not acceptable. "
                        f"Question mark can only used in day_of_month or day_of_week"
                    )
                # currently just trade `?` as `*`
                expr = "*"

            e_list = expr.split(",")
            res = []

            while len(e_list) > 0:
                e = e_list.pop()
                nth = None

                if field_index == DOW_FIELD:
                    # Handle special case in the dow expression: 2#3, l3
                    special_dow_rem = special_dow_re.match(str(e))
                    if special_dow_rem:
                        g = special_dow_rem.groupdict()
                        he, last = g.get("he", ""), g.get("last", "")
                        if he:
                            e = he
                            try:
                                nth = int(last)
                                assert 5 >= nth >= 1
                            except (KeyError, ValueError, AssertionError):
                                raise CroniterBadCronError(
                                    f"[{expr_format}] is not acceptable."
                                    f" Invalid day_of_week value: '{nth}'"
                                )
                        elif last:
                            e = last
                            nth = g["pre"]  # 'l'

                if field_index == DAY_FIELD:
                    # Handle W (nearest weekday) in day-of-month: 15w, w15
                    w_match = nearest_weekday_re.match(str(e))
                    if w_match:
                        w_day = int(w_match.group(1) or w_match.group(2))
                        if w_day < 1 or w_day > 31:
                            raise CroniterBadCronError(
                                f"[{expr_format}] is not acceptable,"
                                f" nearest weekday day value '{w_day}' out of range"
                            )
                        if len(e_list) > 0 or len(res) > 0:
                            raise CroniterBadCronError(
                                f"[{expr_format}] is not acceptable."
                                f" 'W' can only be used with a single day value,"
                                f" not in a list or range"
                            )
                        nearest_weekday.add(w_day)
                        res.append(w_day)
                        continue

                # Before matching step_search_re, normalize "*" to "{min}-{max}".
                # Example: in the minute field, "*/5" normalizes to "0-59/5"
                t = re.sub(
                    r"^\*(\/.+)$",
                    r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
                    str(e),
                )
                m = step_search_re.search(t)

                if not m:
                    # Before matching step_search_re,
                    # normalize "{start}/{step}" to "{start}-{max}/{step}".
                    # Example: in the minute field, "10/5" normalizes to "10-59/5"
                    t = re.sub(r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e))
                    m = step_search_re.search(t)

                if m:
                    # early abort if low/high are out of bounds
                    (low, high, step) = m.group(1), m.group(2), m.group(4) or 1
                    if field_index == DAY_FIELD and high == "l":
                        high = "31"

                    if not only_int_re.search(low):
                        low = str(cls._alphaconv(field_index, low, expressions))

                    if not only_int_re.search(high):
                        high = str(cls._alphaconv(field_index, high, expressions))

                    # normally, it's already guarded by the RE that should not accept
                    # not-int values.
                    if not only_int_re.search(str(step)):
                        raise CroniterBadCronError(
                            f"[{expr_format}] step '{step}'"
                            f" in field {field_index} is not acceptable"
                        )
                    step = int(step)

                    for band in low, high:
                        if not only_int_re.search(str(band)):
                            raise CroniterBadCronError(
                                f"[{expr_format}] bands '{low}-{high}'"
                                f" in field {field_index} are not acceptable"
                            )

                    low, high = (
                        cls.value_alias(int(_val), field_index, expressions)
                        for _val in (low, high)
                    )

                    if max(low, high) > max(
                        cls.RANGES[field_index][0], cls.RANGES[field_index][1]
                    ):
                        raise CroniterBadCronError(f"{expr_format} is out of bands")

                    if from_timestamp:
                        low = cls._get_low_from_current_date_number(
                            field_index, int(step), int(from_timestamp)
                        )

                    # Handle when the second bound of the range is in backtracking order:
                    # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
                    if low > high:
                        whole_field_range = list(
                            range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1)
                        )
                        # Add FirstBound -> ENDRANGE, respecting step
                        rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
                        # Then 0 -> SecondBound, but skipping n first occurences according to step
                        # EG to respect such expressions : Apr-Jan/3
                        to_skip = 0
                        if rng:
                            already_skipped = list(reversed(whole_field_range)).index(rng[-1])
                            curpos = whole_field_range.index(rng[-1])
                            if ((curpos + step) > len(whole_field_range)) and (
                                already_skipped < step
                            ):
                                to_skip = step - already_skipped
                        rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
                    # if we include a range type: Jan-Jan, or Sun-Sun,
                    #  it means the whole cycle (all days of week, # all monthes of year, etc)
                    elif low == high:
                        rng = list(
                            range(cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step)
                        )
                    else:
                        try:
                            rng = list(range(low, high + 1, step))
                        except ValueError as exc:
                            raise CroniterBadCronError(f"invalid range: {exc}")

                    if field_index == DOW_FIELD and nth and nth != "l":
                        rng = [f"{item}#{nth}" for item in rng]
                    e_list += [a for a in rng if a not in e_list]
                else:
                    if t.startswith("-"):
                        raise CroniterBadCronError(
                            f"[{expr_format}] is not acceptable, negative numbers not allowed"
                        )
                    if not star_or_int_re.search(t):
                        t = cls._alphaconv(field_index, t, expressions)

                    try:
                        t = int(t)
                    except ValueError:
                        pass

                    t = cls.value_alias(t, field_index, expressions)

                    if t not in ["*", "l"] and (
                        int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
                    ):
                        raise CroniterBadCronError(
                            f"[{expr_format}] is not acceptable, out of range"
                        )

                    res.append(t)

                    if field_index == DOW_FIELD and nth:
                        if t not in nth_weekday_of_month:
                            nth_weekday_of_month[t] = set()
                        nth_weekday_of_month[t].add(nth)

            res = set(res)
            res = sorted(res, key=lambda i: f"{i:02}" if isinstance(i, int) else i)
            if len(res) == cls.LEN_MEANS_ALL[field_index]:
                # Make sure the wildcard is used in the correct way (avoid over-optimization)
                if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
                    field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
                ):
                    pass
                else:
                    res = ["*"]

            expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)

        # Check to make sure the dow combo in use is supported
        if nth_weekday_of_month:
            dow_expanded_set = set(expanded[DOW_FIELD])
            dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
            dow_expanded_set.discard("*")
            # Skip: if it's all weeks instead of wildcard
            if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
                raise CroniterUnsupportedSyntaxError(
                    f"day-of-week field does not support mixing literal values and nth"
                    f" day of week syntax.  Cron: '{expr_format}'"
                    f"    dow={dow_expanded_set} vs nth={nth_weekday_of_month}"
                )

        if strict:
            # Cross-validate day-of-month against month (and optionally year)
            # to reject impossible combinations like "0 0 31 2 *" (Feb 31st).
            days = expanded[DAY_FIELD]
            months = expanded[MONTH_FIELD]
            if days != ["*"] and days != ["l"] and months != ["*"]:
                int_days = [d for d in days if isinstance(d, int)]
                int_months = [m for m in months if isinstance(m, int)]
                if int_days and int_months:
                    # Determine max days per month, accounting for leap years
                    days_in_month = dict(cls.DAYS_IN_MONTH)
                    if 2 in int_months:
                        has_leap_year = True  # assume possible by default
                        if strict_year is not None:
                            # Year explicitly provided as parameter
                            if isinstance(strict_year, int):
                                has_leap_year = calendar.isleap(strict_year)
                            else:
                                has_leap_year = any(calendar.isleap(y) for y in strict_year)
                        elif len(expanded) > YEAR_FIELD:
                            years = expanded[YEAR_FIELD]
                            if years != ["*"]:
                                int_years = [y for y in years if isinstance(y, int)]
                                if int_years:
                                    has_leap_year = any(calendar.isleap(y) for y in int_years)
                        if has_leap_year:
                            days_in_month[2] = 29
                    min_day = min(int_days)
                    max_possible = max(days_in_month[m] for m in int_months)
                    if min_day > max_possible:
                        raise CroniterBadCronError(
                            f"[{expr_format}] is not acceptable. Day(s) {int_days}"
                            f" can never occur in month(s) {int_months}"
                        )

        return expanded, nth_weekday_of_month, expressions, nearest_weekday

    @classmethod
    def expand(
        cls,
        expr_format: str,
        hash_id: Optional[Union[bytes, str]] = None,
        second_at_beginning: bool = False,
        from_timestamp: Optional[float] = None,
        strict: bool = False,
        strict_year: Optional[Union[int, list[int]]] = None,
    ) -> tuple[list[ExpandedExpression], dict[int, set[int]]]:
        """
        Expand a cron expression format into a noramlized format of
        list[list[int | 'l' | '*']]. The first list representing each element
        of the epxression, and each sub-list representing the allowed values
        for that expression component.

        A tuple is returned, the first value being the expanded epxression
        list, and the second being a `nth_weekday_of_month` mapping.

        Examples:

        # Every minute
        >>> croniter.expand('* * * * *')
        ([['*'], ['*'], ['*'], ['*'], ['*']], {})

        # On the hour
        >>> croniter.expand('0 0 * * *')
        ([[0], [0], ['*'], ['*'], ['*']], {})

        # Hours 0-5 and 10 monday through friday
        >>> croniter.expand('0-5,10 * * * mon-fri')
        ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})

        Note that some special values such as nth day of week are expanded to a
        special mapping format for later processing:

        # Every minute on the 3rd tuesday of the month
        >>> croniter.expand('* * * * 2#3')
        ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})

        # Every hour on the last day of the month
        >>> croniter.expand('0 * l * *')
        ([[0], ['*'], ['l'], ['*'], ['*']], {})

        # On the hour every 15 seconds
        >>> croniter.expand('0 0 * * * */15')
        ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
        """
        try:
            expanded, nth_weekday_of_month, _expressions, _nearest_weekday = cls._expand(
                expr_format,
                hash_id=hash_id,
                second_at_beginning=second_at_beginning,
                from_timestamp=from_timestamp,
                strict=strict,
                strict_year=strict_year,
            )
            return expanded, nth_weekday_of_month
        except (ValueError,) as exc:
            if isinstance(exc, CroniterError):
                raise
            trace = _traceback.format_exc()
            raise CroniterBadCronError(trace)

    @classmethod
    def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
        dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
        if field_index == MINUTE_FIELD:
            return dt.minute % step
        if field_index == HOUR_FIELD:
            return dt.hour % step
        if field_index == DAY_FIELD:
            return ((dt.day - 1) % step) + 1
        if field_index == MONTH_FIELD:
            return dt.month % step
        if field_index == DOW_FIELD:
            return (dt.weekday() + 1) % step

        raise ValueError("Can't get current date number for index larger than 4")

    @classmethod
    def is_valid(cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, strict=False, strict_year=None):
        if hash_id:
            if not isinstance(hash_id, (bytes, str)):
                raise TypeError("hash_id must be bytes or UTF-8 string")
            if not isinstance(hash_id, bytes):
                hash_id = hash_id.encode(encoding)
        try:
            cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning, strict=strict, strict_year=strict_year)
        except CroniterError:
            return False
        return True

    @classmethod
    def match(
        cls,
        cron_expression,
        testdate,
        day_or=True,
        second_at_beginning=False,
        precision_in_seconds=None,
    ):
        return cls.match_range(
            cron_expression, testdate, testdate, day_or, second_at_beginning, precision_in_seconds
        )

    @classmethod
    def match_range(
        cls,
        cron_expression,
        from_datetime,
        to_datetime,
        day_or=True,
        second_at_beginning=False,
        precision_in_seconds=None,
    ):
        cron = cls(
            cron_expression,
            to_datetime,
            ret_type=datetime.datetime,
            day_or=day_or,
            second_at_beginning=second_at_beginning,
        )
        tdp = cron.get_current(datetime.datetime)
        if not tdp.microsecond:
            tdp += relativedelta(microseconds=1)
        cron.set_current(tdp, force=True)
        try:
            tdt = cron.get_prev()
        except CroniterBadDateError:
            return False
        if precision_in_seconds is None:
            precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
        duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
        return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second


def croniter_range(
    start,
    stop,
    expr_format,
    ret_type=None,
    day_or=True,
    exclude_ends=False,
    _croniter=None,
    second_at_beginning=False,
    expand_from_start_time=False,
):
    """
    Generator that provides all times from start to stop matching the given cron expression.
    If the cron expression matches either 'start' and/or 'stop', those times will be returned as
    well unless 'exclude_ends=True' is passed.

    You can think of this function as sibling to the builtin range function for datetime objects.
    Like range(start,stop,step), except that here 'step' is a cron expression.
    """
    _croniter = _croniter or croniter
    auto_rt = datetime.datetime
    # type is used in first if branch for perfs reasons
    if type(start) is not type(stop) and not (
        isinstance(start, type(stop)) or isinstance(stop, type(start))
    ):
        raise CroniterBadTypeRangeError(
            f"The start and stop must be same type.  {type(start)} != {type(stop)}"
        )
    if isinstance(start, (float, int)):
        start, stop = (
            datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)
        )
        auto_rt = float
    if ret_type is None:
        ret_type = auto_rt
    if not exclude_ends:
        ms1 = relativedelta(microseconds=1)
        if start < stop:  # Forward (normal) time order
            start -= ms1
            stop += ms1
        else:  # Reverse time order
            start += ms1
            stop -= ms1
    year_span = math.floor(abs(stop.year - start.year)) + 1
    ic = _croniter(
        expr_format,
        start,
        ret_type=datetime.datetime,
        day_or=day_or,
        max_years_between_matches=year_span,
        second_at_beginning=second_at_beginning,
        expand_from_start_time=expand_from_start_time,
    )
    # define a continue (cont) condition function and step function for the main while loop
    if start < stop:  # Forward

        def cont(v):
            return v < stop

        step = ic.get_next
    else:  # Reverse

        def cont(v):
            return v > stop

        step = ic.get_prev
    try:
        dt = step()
        while cont(dt):
            if ret_type is float:
                yield ic.get_current(float)
            else:
                yield dt
            dt = step()
    except CroniterBadDateError:
        # Stop iteration when this exception is raised; no match found within the given year range
        return


class HashExpander:
    def __init__(self, cronit):
        self.cron = cronit

    def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
        """Return a hashed/random integer given range/hash information"""
        if range_end is None:
            range_end = self.cron.RANGES[idx][1]
        if range_begin is None:
            range_begin = self.cron.RANGES[idx][0]
        if hash_type == "r":
            crc = random.randint(0, 0xFFFFFFFF)
        else:
            crc = binascii.crc32(hash_id) & 0xFFFFFFFF
        return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin

    def match(self, efl, idx, expr, hash_id=None, **kw):
        return hash_expression_re.match(expr)

    def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
        """Expand a hashed/random expression to its normal representation"""
        if match == "":
            match = self.match(efl, idx, expr, hash_id, **kw)
        if not match:
            return expr
        m = match.groupdict()

        if m["hash_type"] == "h" and hash_id is None:
            raise CroniterBadCronError("Hashed definitions must include hash_id")

        if m["range_begin"] and m["range_end"]:
            if int(m["range_begin"]) >= int(m["range_end"]):
                raise CroniterBadCronError("Range end must be greater than range begin")

        if m["range_begin"] and m["range_end"] and m["divisor"]:
            # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
            if int(m["divisor"]) == 0:
                raise CroniterBadCronError(f"Bad expression: {expr}")

            x = self.do(
                idx,
                hash_type=m["hash_type"],
                hash_id=hash_id,
                range_begin=int(m["range_begin"]),
                range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
            )
            return f"{x}-{int(m['range_end'])}/{int(m['divisor'])}"
        if m["range_begin"] and m["range_end"]:
            # Example: H(0-29) -> 12
            return str(
                self.do(
                    idx,
                    hash_type=m["hash_type"],
                    hash_id=hash_id,
                    range_end=int(m["range_end"]),
                    range_begin=int(m["range_begin"]),
                )
            )
        if m["divisor"]:
            # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
            if int(m["divisor"]) == 0:
                raise CroniterBadCronError(f"Bad expression: {expr}")

            x = self.do(
                idx,
                hash_type=m["hash_type"],
                hash_id=hash_id,
                range_begin=self.cron.RANGES[idx][0],
                range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
            )
            return f"{x}-{self.cron.RANGES[idx][1]}/{int(m['divisor'])}"

        # Example: H -> 32
        return str(self.do(idx, hash_type=m["hash_type"], hash_id=hash_id))


EXPANDERS = {"hash": HashExpander}