Source code for rubato.game

"""
The main game module. It controls everything in the game.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sdl2, sdl2.sdlttf
import sys

from . import Time, Display, Debug, Radio, Events, Font, PrintError, Camera, IdError, Draw, InitError, Input

if TYPE_CHECKING:
    from . import Scene


# THIS IS A STATIC CLASS
[docs]class Game: """ The main game class. """ RUNNING = 1 STOPPED = 2 PAUSED = 3 debug: bool = False """Whether to use debug-mode.""" show_fps: bool = False """Whether to show fps.""" debug_font: Font """What font to draw debug text in.""" state: int = STOPPED """ The state of the game. The game states are:: Game.RUNNING Game.STOPPED Game.PAUSED """ _initialized = False _scenes: dict[str, Scene] = {} _scene_id : int = 0 _current: str = "" def __init__(self) -> None: raise InitError(self) @classmethod @property def current(cls) -> Scene: # test: skip """ The current scene. (getonly) Returns: The current scene. """ return cls._scenes.get(cls._current)
[docs] @classmethod def set_scene(cls, scene_id: str): # test: skip """ Changes the current scene. Takes effect on the next frame. Args: scene_id: The id of the new scene. """ cls._current = scene_id
@classmethod def _add(cls, scene: Scene): # test: skip """ Add a scene to the game. Also set the current scene if this is the first added scene. Args: scene: The scene to add. Raises: IdError: The given scene id is already used. """ if scene.name is None: scene._id = "scene" + str(cls._scene_id) # pylint: disable=protected-access if scene.name in cls._scenes: raise IdError(f"A scene with name '{scene.name}' has already been added.") if not cls._scenes: cls.set_scene(scene.name) cls._scenes[scene.name] = scene cls._scene_id += 1 @classmethod @property def camera(cls) -> Camera: # test: skip """ A shortcut getter allowing easy access to the current camera. This is a get-only property. Note: Returns a pointer to the current camera object. This is so you can access/change the current camera properties faster, but you'd still need to use :func:`Game.current.camera <rubato.struct.scene.Scene.camera>` to access the camera directly. Returns: Camera: The current scene's camera """ return cls.current.camera
[docs] @classmethod def quit(cls): # test: skip """Quit the game and close the python process.""" Radio.broadcast(Events.EXIT) cls.state = cls.STOPPED sys.stdout.flush() sdl2.sdlttf.TTF_Quit() sdl2.SDL_Quit() sys.exit(0)
[docs] @classmethod def start(cls): # test: skip """ Starts the main game loop. Called automatically by :meth:`rubato.begin`. """ cls.state = cls.RUNNING try: cls.loop() except KeyboardInterrupt: cls.quit() except PrintError as e: sys.stdout.flush() raise e except (Exception,) as e: sys.stdout.flush() raise type(e)( str(e) + "\nRubato Error-ed. Was it our fault? Issue tracker: " "https://github.com/rubatopy/rubato/issues" ).with_traceback(sys.exc_info()[2]) finally: sys.stdout.flush()
[docs] @classmethod def loop(cls): # test: skip """ Rubato's main game loop. Called automatically by :meth:`rubato.Game.start`. """ while True: # start timing the update loop Time._frame_start = Time.now() # pylint: disable= protected-access if cls.state == cls.STOPPED: sdl2.SDL_PushEvent(sdl2.SDL_Event(sdl2.SDL_QUIT)) # Pump SDL events sdl2.SDL_PumpEvents() # Event handling if Radio.handle(): cls.quit() # Register controllers Input.update_controllers() # process delayed calls Time.process_calls() cls.update() curr = cls.current if curr: # pylint: disable=using-constant-test if cls.state == Game.PAUSED: # process user set pause update curr.private_paused_update() else: # normal update curr.private_update() # fixed update Time.physics_counter += Time.delta_time while Time.physics_counter >= Time.fixed_delta: if cls.state != Game.PAUSED: curr.private_fixed_update() Time.physics_counter -= Time.fixed_delta curr.private_draw() else: Draw.clear() cls.draw() Draw.dump() if cls.show_fps: Debug.draw_fps(cls.debug_font) # update renderers Display.renderer.present() # use delay to cap the fps if need be if Time.capped: delay = Time.normal_delta - (1000 * Time.delta_time) if delay > 0: sdl2.SDL_Delay(int(delay)) # dont allow updates to occur more than once in a millisecond # this will likely never occur but is a failsafe while Time.now() == Time.frame_start: # pylint: disable= comparison-with-callable sdl2.SDL_Delay(1) # clock the time the update call took Time.delta_time = (Time.now() - Time.frame_start) / 1000 # pylint: disable= comparison-with-callable
[docs] @staticmethod def update(): # test: skip """An overrideable method for updating the game. Called once per frame, before the current scene updates.""" pass
[docs] @staticmethod def draw(): # test: skip """An overrideable method for drawing the game. Called once per frame, after the current scene draws.""" pass