"""
The asset loading system.
"""
import abc
import collections
import concurrent.futures
from functools import partial
import logging
import threading
import weakref
import ppb.vfs as vfs
import ppb.events as events
from ppb.systemslib import System
__all__ = 'AbstractAsset', 'Asset', 'AssetLoadingSystem',
logger = logging.getLogger(__name__)
[docs]class AbstractAsset(abc.ABC):
"""
The asset interface.
This defines the common interface for virtual assets, proxy assets, and
real/file assets.
"""
[docs] @abc.abstractmethod
def load(self):
"""
Get the data of this asset, in the appropriate form.
"""
[docs] def is_loaded(self):
"""
Returns if the data is ready now or if :py:meth:`load()` will block.
"""
return True
_asset_cache = weakref.WeakValueDictionary()
[docs]class Asset(AbstractAsset):
"""
A resource to be loaded from the filesystem and used.
Meant to be subclassed, but in specific ways.
"""
def __new__(cls, name):
clsname = f"{cls.__module__}:{cls.__qualname__}"
try:
return _asset_cache[(clsname, name)]
except KeyError:
inst = super().__new__(cls)
_asset_cache[(clsname, name)] = inst
return inst
def __init__(self, name):
self.name = str(name)
self._finished = threading.Event()
_hint(self.name, self._finished_background)
def __repr__(self):
return f"<{type(self).__name__} name={self.name!r}{' loaded' if self.is_loaded() else ''}>"
def _finished_background(self, fut):
# Internal
# Called in background thread
try:
try:
raw = fut.result()
except FileNotFoundError:
if hasattr(self, 'file_missing'):
logger.warning("File not found: %r", self.name)
self._data = self.file_missing()
if _finished is not None:
_finished(self)
else:
raise
else:
self._data = self.background_parse(raw)
if _finished is not None:
_finished(self)
except Exception as exc:
import traceback
traceback.print_exc()
# Save unhandled exceptions to be raised in the main thread
self._raise_error = exc
finally:
# This always needs to happen so the main thread isn't just blocked
self._finished.set()
[docs] def background_parse(self, data: bytes):
"""
Takes the data loaded from the file and returns the parsed data.
Subclasses probably want to override this.
Called in the background thread.
"""
return data
[docs] def is_loaded(self):
"""
Returns if the data has been loaded and parsed.
"""
return self._finished.is_set()
[docs] def load(self, timeout: float = None):
"""
Gets the parsed data.
Will block until the data is loaded.
"""
if _hint is _default_hint:
logger.warning(f"Waited on {self!r} before the engine began")
self._finished.wait(timeout)
if hasattr(self, '_raise_error'):
raise self._raise_error
else:
return self._data
def force_background_thread(func, *pargs, **kwargs):
"""
Calls the given function from not the main thread.
If already not the main thread, calls it syncronously.
If this is the main thread, creates a new thread to call it.
"""
if threading.current_thread() is threading.main_thread():
t = threading.Thread(target=func, args=pargs, kwargs=kwargs, daemon=True)
t.start()
else:
func(*pargs, **kwargs)
class AssetLoadingSystem(System):
def __init__(self, *, engine, **_):
super().__init__(**_)
self.engine = engine
self._executor = concurrent.futures.ThreadPoolExecutor()
self._queue = weakref.WeakValueDictionary() # maps names to futures
self._began = 0
self._ended = 0
self._event_queue = collections.deque()
def __enter__(self):
# 1. Register ourselves as the hint provider
global _hint, _finished, _backlog
assert _hint is _default_hint
_hint = self._hint
_finished = self._finished
# 2. Grab-n-clear the backlog (atomically?)
queue, _backlog = _backlog, []
# 3. Process the backlog
for filename, callback in queue:
self._hint(filename, callback)
def __exit__(self, exc_type, exc_val, exc_tb):
# Reset the hint provider
global _hint, _finished
_hint = _default_hint
_finished = None
def _hint(self, filename, callback=None):
self._began += 1
try:
fut = self._queue[filename]
except KeyError:
fut = self._queue[filename] = self._executor.submit(self._load, filename)
if callback is not None:
# There are circumstances where Future will call us syncronously
# In which case, redirect to a fresh background thread.
fut.add_done_callback(partial(force_background_thread, callback))
@staticmethod
def _load(filename):
with vfs.open(filename) as file:
return file.read()
def _finished(self, asset):
self._ended += 1
self._event_queue.append(asset)
def on_idle(self, event, signal):
while self._event_queue:
asset = self._event_queue.popleft()
signal(events.AssetLoaded(
asset=asset,
total_loaded=self._ended,
total_queued=self._began - self._ended,
))
_backlog = []
def _default_hint(filename, callback=None):
_backlog.append((filename, callback))
_hint = _default_hint
_finished = None