"""
The asset loading system.
"""
import abc
import collections
import concurrent.futures
import logging
import queue
import sys
import threading
import weakref
import ppb.vfs as vfs
import ppb.events as events
from ppb.systemslib import System
__all__ = (
'AssetLoadingSystem',
'AbstractAsset', 'BackgroundMixin', 'ChainingMixin', 'FreeingMixin',
'Asset',
)
logger = logging.getLogger(__name__)
class DelayedThreadExecutor(concurrent.futures.ThreadPoolExecutor):
"""
Same as ThreadPoolExecutor, but doesn't start immediately.
Context manager. On exit, cancels all futures.
Also does stuff with events.
"""
# Note that this reaches through all kinds of internals, but they're pretty stable
_started = 0
_finished = 0
def __init__(self, *p, **kw):
super().__init__(*p, **kw)
self._actual_max_workers = self._max_workers
self._max_workers = 0
if hasattr(queue, 'SimpleQueue'): # 3.7
self._event_queue = queue.SimpleQueue()
else:
self._event_queue = queue.Queue()
def __enter__(self):
self._max_workers = self._actual_max_workers
self._adjust_thread_count()
return self
def __exit__(self, *exc):
if sys.version_info >= (3, 9):
self.shutdown(wait=False, cancel_futures=True)
else:
import queue
# Backport of 3.9 future cancelling code
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
self.shutdown(wait=False)
def running(self):
return (self._max_workers > 0) and (not self._shutdown)
def submit(self, fn, *args, _asset=None, **kwargs):
if _asset is not None:
self._started += 1
fut = super().submit(fn, *args, **kwargs)
if _asset is not None:
fut.__asset = weakref.ref(_asset)
fut.add_done_callback(self._finish)
return fut
def gather(self, futures, callback, *pargs, **kwargs):
mock = MockFuture()
def waiter():
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
for f in done:
exc = f.exception()
if exc is not None:
mock.set_exception(exc)
break
else:
assert not not_done
newfut = self.submit(callback, *pargs, **kwargs)
mock.handoff(newfut)
threading.Thread(target=waiter).start()
return mock
def _finish(self, fut):
asset = fut.__asset()
if asset is not None:
self._finished += 1
self._event_queue.put(events.AssetLoaded(
asset=asset,
total_loaded=self._finished,
total_queued=self._started - self._finished,
))
def queued_events(self):
while True:
try:
yield self._event_queue.get_nowait()
except queue.Empty:
break
_executor = DelayedThreadExecutor()
class MockFuture(concurrent.futures.Future):
"""
Acts as a Future's understudy until the real future is available.
"""
_handed_off = False
def handoff(self, fut):
"""
Gives our state to the real future
"""
with self._condition:
if self._handed_off:
raise concurrent.futures.InvalidStateError(f"{self!r} already handed off")
self._handed_off = True
# Add the callbacks
with self._condition:
callbacks, self._done_callbacks = self._done_callbacks, []
for fn in callbacks:
fut.add_done_callback(fn)
# Apply cancellation
if self.cancelled():
fut.cancel()
else:
fut.add_done_callback(self._pass_on_result)
def _pass_on_result(self, fut):
try:
result = fut.result()
except BaseException as exc:
self.set_exception(exc)
else:
self.set_result(result)
[docs]
class AbstractAsset(abc.ABC):
"""
The asset interface.
This defines the common interface for virtual assets, proxy assets, and
real/file assets.
"""
#: Can be overridden on child classes to add an additional message to show after "File not
#: found: %r.". See Image.not_found_message for an example.
not_found_message = ""
[docs]
@abc.abstractmethod
def load(self, timeout: float = None):
"""
Get the data of this asset, in the appropriate form.
"""
[docs]
def is_loaded(self):
"""
Returns if the data is ready now or if :py:meth:`load()` will block.
"""
return True
class BackgroundMixin:
"""
Asset that does stuff in the background.
"""
_future = None
def _start(self):
"""
Queue the background stuff to run.
Call at the end of __init__().
"""
self._future = _executor.submit(self._background, _asset=self)
def _background(self):
"""
The background processing.
Override me.
"""
def is_loaded(self):
"""
Returns if the data has been loaded and parsed.
"""
return self._future is not None and self._future.done()
def load(self, timeout: float = None):
"""
Gets the parsed data.
Will block until the data is loaded.
"""
# NOTE: This is called by FreeingMixin.__del__()
if not self.is_loaded() and not _executor.running():
logger.warning(f"Waited on {self!r} outside of the engine")
return self._future.result(timeout)
class ChainingMixin(BackgroundMixin):
"""
Asset that does stuff in the background, after other assets have loaded.
"""
def _start(self, *assets):
"""
Queue the background stuff to run.
Call at the end of __init__().
"""
self._future = _executor.gather([
asset._future
for asset in assets
if hasattr(asset, '_future')
], self._background, _asset=self)
class FreeingMixin:
"""
Asset that supports freeing
"""
def free(self, object):
"""
Called by :py:meth:`__del__()` if the data was loaded.
Meant to free any resources held outside of Python.
"""
def __del__(self):
# This should only be called after the background threads and other
# processing has finished.
# NOTE: This isn't super great, but there isn't a better way without
# knowing what we've been mixed with.
if self.is_loaded():
try:
data = self.load()
except BaseException:
pass
else:
self.free(data)
_asset_cache = weakref.WeakValueDictionary()
[docs]
class Asset(BackgroundMixin, FreeingMixin, AbstractAsset):
"""
A resource to be loaded from the filesystem and used.
Meant to be subclassed, but in specific ways.
"""
def __new__(cls, name):
clsname = f"{cls.__module__}:{cls.__qualname__}"
try:
return _asset_cache[(clsname, name)]
except KeyError:
self = super().__new__(cls)
self.name = str(name)
_asset_cache[(clsname, name)] = self
self._start()
return self
def __repr__(self):
return f"<{type(self).__name__} name={self.name!r}{' loaded' if self.is_loaded() else ''} at 0x{id(self):x}>"
def _background(self):
# Called in background thread
try:
file = vfs.open(self.name)
except FileNotFoundError:
if hasattr(self, 'file_missing'):
logger.warning("File not found: %r. %s", self.name, self.not_found_message)
return self.file_missing()
else:
raise
else:
with file:
raw = file.read()
return self.background_parse(raw)
[docs]
def background_parse(self, data: bytes):
"""
Takes the data loaded from the file and returns the parsed data.
Subclasses probably want to override this.
Called in the background thread.
"""
return data
class AssetLoadingSystem(System):
"""
Connects the asset system to PPB, managing lifecycles and such.
To minimize the chance of a race condition around initialization, place at
the end of the list of systems.
"""
def __init__(self, *, engine, **_):
super().__init__(**_)
self.engine = engine
self._event_queue = collections.deque()
def __enter__(self):
global _executor
_executor.__enter__()
def __exit__(self, *exc):
global _executor, _asset_cache
# Clean everything out
_executor.__exit__(*exc)
_asset_cache.clear()
_executor = DelayedThreadExecutor()
def on_idle(self, event, signal):
for event in _executor.queued_events():
signal(event)