Source code for rubato.utils.rb_time

"""
A static class to monitor time and to call functions at delay/interval.
"""
from dataclasses import dataclass, field
from typing import Callable
import heapq
import sdl2
from . import InitError


[docs]@dataclass(order=True) class DelayedTask: """ A task that is run after a specified number of seconds. Args: task: The task to invoke. delay: The number of seconds to wait before invoking the task. """ task: Callable[[], None] = field(compare=False) """The task to run.""" delay: float """The delay until the task is run, in seconds.""" is_stopped: bool = field(init=False, default=False, compare=False) """Whether the DelayedTask is stopped.""" next_run: float = field(init=False, compare=False) """The time at which the task will be run next, in seconds."""
[docs] def stop(self): """Stop the DelayedTask from invoking.""" self.is_stopped = True
[docs]@dataclass(order=True) class FramesTask: """ A task that is run after a specified number of frames. Args: task: The task to invoke. delay: The number of frames to wait before invoking the task. """ task: Callable[[], None] = field(compare=False) """The task to run.""" delay: int """The delay until the task is run, in frames.""" is_stopped: bool = field(init=False, default=False, compare=False) """Whether the FramesTask is stopped.""" next_run: int = field(init=False, compare=False) """The frame at which the task will be run next."""
[docs] def stop(self): """Stop the FramesTask from invoking.""" self.is_stopped = True
[docs]@dataclass(order=True) class RecurrentTask: """ A task that is run every specified number of seconds. Args: task: The task to invoke. interval: The number of seconds between task invocations. delay: The number of seconds to wait before starting the invocations. """ task: Callable[[], None] | Callable[["RecurrentTask"], None] = field(compare=False) """The task to run.""" interval: float = field(compare=False) """The interval between task invocations, in seconds.""" delay: float = field(default=0) """The initial delay until the task is run, in seconds.""" is_stopped: bool = field(init=False, default=False, compare=False) """Whether the RecurrentTask is stopped.""" next_run: float = field(init=False, compare=False) """The time at which the task will be run next, in seconds."""
[docs] def stop(self): """Stop the RecurrentTask from invoking.""" self.is_stopped = True
# THIS IS A STATIC CLASS
[docs]class Time: """ Implements time-related functions in rubato. """ frames: int = 0 """The total number of elapsed frames since the start of the game.""" fixed_delta: float = 0.1 """The number of seconds since the last fixed update.""" fps = 60 """The fps estimate using the last frame.""" _frame_queue: list[FramesTask] = [] _task_queue: list[DelayedTask] = [] _recurrent_queue: list[RecurrentTask] = [] _next_queue: list[Callable] = [] _delta_time: int = 1 _normal_delta: int = 0 _frame_start: int = 0 _physics_counter: float = 0 _past_fps = [0] * 120 _fps_index: int = 0 target_fps = 0 """The fps that the game should try to run at. 0 means that the game's fps will not be capped. Defaults to 0.""" _physics_fps = 0 """The fps that the physics should run at.""" def __init__(self) -> None: raise InitError(self) @classmethod @property def delta_time(cls) -> float: """The number of seconds between the last frame and the current frame (get-only).""" return cls._delta_time / 1000
[docs] @classmethod def smooth_fps(cls) -> int: """The average fps over the past 120 frames.""" return int(sum(cls._past_fps) / len(cls._past_fps))
[docs] @classmethod def frame_start(cls) -> float: """ Time from the start of the game to the start of the current frame, in seconds. """ return cls._frame_start * 1000
@classmethod def _now(cls) -> int: """The time since the start of the game, in milliseconds.""" return sdl2.SDL_GetTicks64()
[docs] @classmethod def now(cls) -> float: """The time since the start of the game, in seconds.""" return sdl2.SDL_GetTicks64() / 1000
@classmethod def _start_frame(cls): cls._frame_start = cls._now() @classmethod def _end_frame(cls): if Time.target_fps != 0: delay = cls._normal_delta + cls._frame_start - cls._now() if delay > 0: sdl2.SDL_Delay(delay) while cls._now() == cls._frame_start: sdl2.SDL_Delay(1) cls._delta_time = cls._now() - cls._frame_start
[docs] @classmethod def next_frame(cls, func: Callable[[], None]): """ Calls the function func to be called on the next frame. Args: func: The function to call. """ cls._next_queue.append(func)
[docs] @classmethod def delayed_frames(cls, task: Callable[[], None], delay: int): """ Calls the function func to be called at a later frame. Args: task: The function to call delay: The number of frames to wait. """ cls.schedule(FramesTask(task, delay))
[docs] @classmethod def delayed_call(cls, task: Callable[[], None], delay: float): """ Calls the function func to be called at a later time. Args: task: The function to call. delay: The time from now (in seconds) to run the function at. """ cls.schedule(DelayedTask(task, delay))
[docs] @classmethod def recurrent_call( cls, task: Callable[[], None] | Callable[[RecurrentTask], None], interval: float, delay: float = 0 ): """ Schedules the function func to be repeatedly called every interval. Args: task: The function to call. This method may take a RecurrentTask as an argument, which will be passed to it when it is invoked. interval: The interval (in seconds) to run the function at. delay: The delay (in seconds) to wait before starting the task. """ cls.schedule(RecurrentTask(task, interval, delay))
[docs] @classmethod def schedule(cls, task: DelayedTask | FramesTask | RecurrentTask): """ Schedules a task for execution based on what type of task it is. Args: task: The task to queue. """ if isinstance(task, DelayedTask): task.next_run = cls.now() + task.delay heapq.heappush(cls._task_queue, task) elif isinstance(task, FramesTask): task.next_run = cls.frames + task.delay heapq.heappush(cls._frame_queue, task) elif isinstance(task, RecurrentTask): task.next_run = cls.now() + task.delay heapq.heappush(cls._recurrent_queue, task) else: raise TypeError("Task argument must of of type DelayedTask, FramesTask or RecurrentTask.")
@classmethod def _process_calls(cls): """Processes the delayed function call as needed""" cls.frames += 1 cls.fps = 1 / cls.delta_time cls._past_fps[cls._fps_index] = int(cls.fps) cls._fps_index = (cls._fps_index + 1) % len(cls._past_fps) if cls._next_queue: for func in cls._next_queue: func() cls._next_queue.clear() while cls._frame_queue: if cls._frame_queue[0].next_run <= cls.frames: frame_task: FramesTask = heapq.heappop(cls._frame_queue) if not frame_task.is_stopped: frame_task.task() else: break while cls._task_queue: if cls._task_queue[0].next_run <= cls.now(): delayed_task: DelayedTask = heapq.heappop(cls._task_queue) if not delayed_task.is_stopped: delayed_task.task() else: break while cls._recurrent_queue: if cls._recurrent_queue[0].next_run <= cls.now(): recurrent_task: RecurrentTask = heapq.heappop(cls._recurrent_queue) if not recurrent_task.is_stopped: try: recurrent_task.task(recurrent_task) # type: ignore except TypeError: recurrent_task.task() # type: ignore recurrent_task.next_run += recurrent_task.interval heapq.heappush(cls._recurrent_queue, recurrent_task) else: break