Source code for pytb.schedule

"""
A simple task scheduling system to run periodic tasks
"""

from datetime import datetime, timedelta
from typing import Generator, Optional, Sequence, Callable, Any, Mapping
from threading import Event, Thread

ScheduleGenerator = Generator[datetime, None, None]


[docs]class Schedule(Thread): """ This represents a reoccuring task, exceuting ``target`` every time the schedule is due. The target is run in an extra thread by default. If you stop the schedule while the target function is running, the thread is canceled after finishing its current run. :param target: The target function to execute each time the schedule is due :param interval: A generator yielding datetime objects that determine when the schedule is due. When this generator is exhausted, the schedule stops. Datetime objects in the past are simply ignored and the next value from the generator is used to schedule the job. """ def __init__( self, target: Callable[..., Any], interval: ScheduleGenerator, ): super().__init__() self._target = target self._interval = interval self._stop_event = Event() self.is_running = Event()
[docs] def start_schedule(self, *args: Sequence[Any], **kwargs: Mapping[str, Any]) -> None: """ Start the scheduler and pass all supplied arguments to the target function each time the schedule is due """ self._args = args self._kwargs = kwargs super().start()
[docs] def run(self) -> None: """ Start the schedule execution in an extra thread. The target function is called, passing all arguments supplied to this call. """ while not self._stop_event.is_set(): try: next_schedule = self.next_schedule() except StopIteration: self._stop_event.set() continue wait_time = (next_schedule - datetime.now()).total_seconds() self._stop_event.wait(wait_time) if not self._stop_event.is_set(): self.is_running.set() self._target(*self._args, **self._kwargs) self.is_running.clear() self.stop()
[docs] def next_schedule(self) -> datetime: """ Return the datetime object this schedule is due """ return next(self._interval)
[docs] def stop(self) -> None: """ Stop the async execution of the schedule, cacnel all future tasks """ # set the stop event, then cancel the timer self._stop_event.set() # wait for the runthread to finish try: self.join() except RuntimeError: pass
def __call__(self, *args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: """ Make calling the function possible from the decorated object """ return self._target(*args, **kwargs)
[docs]def parse_cron_spec(spec: str, max_value: int, min_value: int = 0) -> Sequence[int]: """ Parse a string of in a cron-like expression format to a sequence accepted numbers. The expression needs to have one of the following forms: - ``i`` sequence contains only the element i - ``*`` indicates that all values possible for this part are included - ``i,j,k`` specifies a list of possible values - ``i-j`` specifies a range of values *including* ``j`` - ``i-j/s`` additionally specifies the step-size :param spec: The cron-like expression to parse :param max_value: The maximum value allowed for this range. This is needed to specify the range using the '*' wildcard :param min_value: The minimum allowed value :raises ValueError: if the spec tries to exceed the limits Example: .. testsetup:: * from pytb.schedule import parse_cron_spec .. doctest:: >>> list(parse_cron_spec('5', max_value=7,)) [5] >>> list(parse_cron_spec('*', max_value=7,)) [0, 1, 2, 3, 4, 5, 6, 7] >>> list(parse_cron_spec('1-4', max_value=7,)) [1, 2, 3, 4] >>> list(parse_cron_spec('1-4/2', max_value=7,)) [1, 3] """ parsed_values: Sequence[int] = [] if spec.isdigit(): parsed_values = [int(spec)] if spec == "*": parsed_values = range(min_value, max_value + 1) if "," in spec: parsed_values = tuple(int(val) for val in spec.split(",")) if spec.count("-") == 1: step = 1 if "/" in spec: spec, step_spec = spec.split("/") step = int(step_spec) start, end = spec.split("-") parsed_values = range(int(start), int(end) + 1, int(step)) out_of_bounds = [min_value > val > max_value for val in parsed_values] if any(out_of_bounds): raise ValueError(f"found out-of-bounds value for expression {spec}") if len(parsed_values) == 0: raise ValueError(f"Cannot parse cron-like expression {spec}") return parsed_values
[docs]def at( # pylint: disable=invalid-name minute: str = "*", hour: str = "*", day: str = "*", month: str = "*", weekday: str = "*", ) -> Callable[..., Schedule]: """ run the task every time the current system-time matches the cron-like expression. Check the documentation for :func:`parse_cron_spec` for the supported syntax. """ selected_minutes = parse_cron_spec(minute, max_value=59) selected_hours = parse_cron_spec(hour, max_value=23) selected_days = parse_cron_spec(day, min_value=1, max_value=31) selected_months = parse_cron_spec(month, min_value=1, max_value=12) selected_weekdays = [x % 7 for x in parse_cron_spec(weekday, max_value=7)] def get_next_due_date() -> ScheduleGenerator: """ Get the datetime where this schedule should be run next """ next_schedule = datetime.min while True: if next_schedule < datetime.now(): now = datetime.now().replace(second=0, microsecond=0) # iterate over all possible dates in the next year to find the suitable next date # this sounds horrible at first but isn't bad at all. This check is only run when # the previous date expired, meaning for often-running tasks, the loop will break # pretty soon and for tasks scheduled long into the future, the worst case is to # iterate over ~0.5M possible candidates once a year. for offset in range(1, 60 * 24 * 366): possible_date = now + timedelta(minutes=offset) if ( possible_date.minute in selected_minutes and possible_date.hour in selected_hours and possible_date.day in selected_days and possible_date.month in selected_months and possible_date.weekday() in selected_weekdays ): next_schedule = possible_date break else: raise AssertionError( "Could not find a suitable date matching the condition in the next year." ) yield next_schedule def schedule_decorator(fun: Callable[..., Any]) -> Schedule: return Schedule(fun, get_next_due_date()) return schedule_decorator
[docs]def every( interval: timedelta, start_at: Optional[datetime] = None ) -> Callable[..., Schedule]: """ Run a task repeadetly at the given interval :param interval: run the command this often the most :param start_at: run the command for the first time only after this date has passed. If not specified, run the command immediatley """ start = datetime.now() if start_at is not None: start = start_at def get_next_due_date() -> ScheduleGenerator: """ Get the datetime where this schedule should be run next """ while True: time_since_last_schedule = (datetime.now() - start) % interval yield datetime.now() + (interval - time_since_last_schedule) def schedule_decorator(fun: Callable[..., Any]) -> Schedule: return Schedule(fun, get_next_due_date()) return schedule_decorator