Source code for rubato.utils.rb_time

"""
A static time class to monitor time and to call functions in the future.

"""
from dataclasses import dataclass, field
from typing import Callable, List
import heapq
import sdl2


[docs]@dataclass(order=True) class DelayedTask: """A task that is run after a specified number of milliseconds.""" delay: int task: Callable = field(compare=False) is_stopped: bool = field(default=False, compare=False)
[docs] def stop(self): """Stop the DelayedTask from invoking.""" self.is_stopped = True
[docs]@dataclass(order=True) class FramesTask: """A task that is run after a specified number of frames.""" delay: int task: Callable = field(compare=False) is_stopped: bool = field(default=False, compare=False)
[docs] def stop(self): """Stop the FramesTask from invoking.""" self.is_stopped = True
[docs]@dataclass(order=True) class ScheduledTask: """A task that is run every specified number of milliseconds.""" interval: int = field(compare=False) task: Callable = field(compare=False) delay: int = field(default=0) is_stopped: bool = field(default=False, compare=False)
[docs] def stop(self): """Stop the ScheduledTask from invoking.""" self.is_stopped = True
[docs]class Time: """ The time class Attributes: frames (int): The total number of elapsed frames since the start of the game. fps (float): The current fps of this frame. target_fps (float): The fps that the game should try to run at. 0 means that the game's fps will not be capped. Defaults to 0. physics_fps (float): The fps that the physics should run at. Defaults to 60. delta_time (int): The number of seconds since the last frame. fixed_delta (int): The number of seconds since the last fixed update. """ frames = 0 _frame_queue: List[FramesTask] = [] _task_queue: List[DelayedTask] = [] _schedule_queue: List[ScheduledTask] = [] delta_time: float = 0.001 fixed_delta: float = 0 normal_delta: float = 0 fps = 60 _frame_start: int = 0 physics_counter = 0 _fps_history = 120 _past_fps = [0] * _fps_history target_fps = 0 # this means no cap capped = False physics_fps = 60 @classmethod @property def smooth_fps(cls) -> float: """The average fps over the past 120 frames. This is a get-only property.""" return sum(cls._past_fps) / cls._fps_history
[docs] @classmethod def now(cls) -> int: """The time since the start of the game, in milliseconds.""" return sdl2.SDL_GetTicks64()
@classmethod @property def frame_start(cls) -> int: """The time since the start of the game, in milliseconds, taken at the start of the frame. This is a get-only property.""" return cls._frame_start
[docs] @classmethod def schedule(cls, task: DelayedTask | FramesTask | ScheduledTask): """ Schedules a task for delayed execution based on what type of task it is. Args: task (DelayedTask | FramesTask | ScheduledTask): The task to queue. """ if isinstance(task, DelayedTask): task.delay += cls.now() heapq.heappush(cls._task_queue, task) elif isinstance(task, FramesTask): task.delay += cls.frames heapq.heappush(cls._frame_queue, task) elif isinstance(task, ScheduledTask): task.delay += cls.now() heapq.heappush(cls._schedule_queue, task) else: raise TypeError("Task argument must of of type DelayedTask, FramesTask or ScheduledTask.")
[docs] @classmethod def delayed_call(cls, delay: int, func: Callable): """ Calls the function func at a later time. Args: delay: The time from now (in milliseconds) to run the function at. func: The function to call. """ heapq.heappush(cls._task_queue, DelayedTask(delay + cls.now(), func))
[docs] @classmethod def delayed_frames(cls, delay: int, func: Callable): """ Calls the function func at a later frame. Args: delay: The number of frames to wait. func: The function to call """ heapq.heappush(cls._frame_queue, FramesTask(cls.frames + delay, func))
[docs] @classmethod def scheduled_call(cls, interval: int, func: Callable): """ Calls the function func at a scheduled interval. Args: interval: The interval (in milliseconds) to run the function at. func: The function to call. """ heapq.heappush(cls._schedule_queue, ScheduledTask(interval, func, interval + cls.now()))
[docs] @classmethod def milli_to_sec(cls, milli: int) -> float: """ Converts milliseconds to seconds. Args: milli: A number in milliseconds. Returns: float: The converted number in seconds. """ return milli / 1000
[docs] @classmethod def sec_to_milli(cls, sec: int) -> float: """ Converts seconds to milliseconds. Args: sec: A number in seconds. Returns: float: The converted number in milliseconds. """ return sec * 1000
[docs] @classmethod def process_calls(cls): """Processes the delayed function call as needed""" cls.frames += 1 cls.fps = 1 / cls.delta_time del cls._past_fps[0] cls._past_fps.append(cls.fps) while cls._frame_queue: if cls._frame_queue[0].delay <= cls.frames: timer_task = heapq.heappop(cls._frame_queue) if not timer_task.is_stopped: timer_task.task() else: break while cls._task_queue: if cls._task_queue[0].delay <= cls.now(): timer_task = heapq.heappop(cls._task_queue) if not timer_task.is_stopped: timer_task.task() else: break while cls._schedule_queue: if cls._schedule_queue[0].delay <= cls.now(): scheduled_task = heapq.heappop(cls._schedule_queue) if not scheduled_task.is_stopped: scheduled_task.task() if not scheduled_task.is_stopped: scheduled_task.delay += scheduled_task.interval heapq.heappush(cls._schedule_queue, scheduled_task) else: break