Source code for rubato.utils.rb_time
"""
A static time class to monitor time and to call functions in the future.
"""
from dataclasses import dataclass, field
from typing import Callable, List
import heapq
import sdl2
[docs]class Time():
    """
    The time class
    Attributes:
        frames (int): The total number of elapsed frames since the start of the game.
        fps (float): The current fps of this frame.
        target_fps (float): The fps that the game should try to run at. 0 means that the game's fps will not be capped.
            Defaults to 0.
        physics_fps (float): The fps that the physics should run at. Defaults to 60.
        delta_time (int): The number of seconds since the last frame.
        fixed_delta (int): The number of seconds since the last fixed update.
    """
    frames = 0
    sorted_frame_times: List[TimerTask] = []
    sorted_task_times: List[TimerTask] = []
    delta_time: float = 0.001
    fixed_delta: float = 0
    normal_delta: float = 0
    fps = 60
    physics_counter = 0
    _past_fps = [0] * 250
    target_fps = 0  # this means no cap
    capped = False
    physics_fps = 60
    @classmethod
    @property
    def smooth_fps(cls) -> float:
        """The average fps over the past 250 frames. This is a get-only property."""
        return sum(cls._past_fps) / 250
    @classmethod
    @property
    def now(cls) -> int:
        """The time since the start of the game, in milliseconds. This is a get-only property."""
        return sdl2.SDL_GetTicks64()
[docs]    @classmethod
    def delayed_call(cls, time_delta: int, func: Callable):
        """
        Calls the function func at a later time.
        Args:
            time_delta: The time from now (in milliseconds)
                to run the function at.
            func: The function to call.
        """
        heapq.heappush(cls.sorted_task_times, TimerTask(time_delta + cls.now, func))
[docs]    @classmethod
    def delayed_frames(cls, frames_delta: int, func: Callable):
        """
        Calls the function func at a later frame.
        Args:
            frames_delta: The number of frames to wait.
            func: The function to call
        """
        heapq.heappush(cls.sorted_frame_times, TimerTask(cls.frames + frames_delta, func))
[docs]    @classmethod
    def milli_to_sec(cls, milli: int) -> float:
        """
        Converts milliseconds to seconds.
        Args:
            milli: A number in milliseconds.
        Returns:
            float: The converted number in seconds.
        """
        return milli / 1000
[docs]    @classmethod
    def sec_to_milli(cls, sec: int) -> float:
        """
        Converts seconds to milliseconds.
        Args:
            sec: A number in seconds.
        Returns:
            float: The converted number in milliseconds.
        """
        return sec * 1000
[docs]    @classmethod
    def process_calls(cls):
        """Processes the delayed function call as needed"""
        cls.frames += 1
        cls.fps = 1 / cls.delta_time
        del cls._past_fps[0]
        cls._past_fps.append(cls.fps)
        # pylint: disable=comparison-with-callable
        processing = True
        while processing and cls.sorted_frame_times:
            if cls.sorted_frame_times[0].time <= cls.now:
                timer_task = heapq.heappop(cls.sorted_frame_times)
                timer_task.task()
            else:
                processing = False
        processing = True
        while processing and cls.sorted_task_times:
            if cls.sorted_task_times[0].time <= cls.now:
                timer_task = heapq.heappop(cls.sorted_task_times)
                timer_task.task()
            else:
                processing = False