Add cross-thread exception handling for event loop and stream callbacks

This commit is contained in:
jaseg 2023-02-26 19:46:59 +01:00
parent d7664eb671
commit f9a655e7ca

128
mpv.py
View file

@ -20,6 +20,7 @@
from ctypes import * from ctypes import *
import ctypes.util import ctypes.util
import threading import threading
import queue
import os import os
import sys import sys
from warnings import warn from warnings import warn
@ -880,6 +881,7 @@ class MPV(object):
self.register_stream_protocol('python', self._python_stream_open) self.register_stream_protocol('python', self._python_stream_open)
self._python_streams = {} self._python_streams = {}
self._python_stream_catchall = None self._python_stream_catchall = None
self._exception_futures = set()
self.overlay_ids = set() self.overlay_ids = set()
self.overlays = {} self.overlays = {}
if loglevel is not None or log_handler is not None: if loglevel is not None or log_handler is not None:
@ -891,6 +893,17 @@ class MPV(object):
else: else:
self._event_thread = None self._event_thread = None
@contextmanager
def _enqueue_exceptions(self):
try:
yield
except Exception as e:
try:
fut = next(iter(self._exception_futures))
fut.set_exception(e)
except StopIteration:
warn(f'Unhandled exception on python-mpv event loop: {e}\n{traceback.format_exc()}', RuntimeWarning)
def _loop(self): def _loop(self):
for event in _event_generator(self._event_handle): for event in _event_generator(self._event_handle):
try: try:
@ -901,45 +914,51 @@ class MPV(object):
self._core_shutdown = True self._core_shutdown = True
for callback in self._event_callbacks: for callback in self._event_callbacks:
callback(event) with self._enqueue_exceptions():
callback(event)
if eid == MpvEventID.PROPERTY_CHANGE: if eid == MpvEventID.PROPERTY_CHANGE:
pc = event.data pc = event.data
name, value, _fmt = pc.name, pc.value, pc.format name, value, _fmt = pc.name, pc.value, pc.format
for handler in self._property_handlers[name]: for handler in self._property_handlers[name]:
handler(name, value) with self._enqueue_exceptions():
handler(name, value)
if eid == MpvEventID.LOG_MESSAGE and self._log_handler is not None: if eid == MpvEventID.LOG_MESSAGE and self._log_handler is not None:
ev = event.data ev = event.data
self._log_handler(ev.level, ev.prefix, ev.text) with self._enqueue_exceptions():
self._log_handler(ev.level, ev.prefix, ev.text)
if eid == MpvEventID.CLIENT_MESSAGE: if eid == MpvEventID.CLIENT_MESSAGE:
# {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16} # {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16}
target, *args = event.data.args target, *args = event.data.args
target = target.decode("utf-8") target = target.decode("utf-8")
if target in self._message_handlers: if target in self._message_handlers:
self._message_handlers[target](*args) with self._enqueue_exceptions():
self._message_handlers[target](*args)
if eid == MpvEventID.COMMAND_REPLY: if eid == MpvEventID.COMMAND_REPLY:
key = event.reply_userdata key = event.reply_userdata
callback = self._command_reply_callbacks.pop(key, None) callback = self._command_reply_callbacks.pop(key, None)
if callback: if callback:
callback(ErrorCode.exception_for_ec(event.error), event.data) with self._enqueue_exceptions():
callback(ErrorCode.exception_for_ec(event.error), event.data)
if eid == MpvEventID.QUEUE_OVERFLOW: if eid == MpvEventID.QUEUE_OVERFLOW:
# cache list, since error handlers will unregister themselves # cache list, since error handlers will unregister themselves
for cb in list(self._command_reply_callbacks.values()): for cb in list(self._command_reply_callbacks.values()):
cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None) with self._enqueue_exceptions():
cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None)
if eid == MpvEventID.SHUTDOWN: if eid == MpvEventID.SHUTDOWN:
_mpv_destroy(self._event_handle) _mpv_destroy(self._event_handle)
for cb in list(self._command_reply_callbacks.values()): for cb in list(self._command_reply_callbacks.values()):
cb(ShutdownError('libmpv core has been shutdown'), None) with self._enqueue_exceptions():
cb(ShutdownError('libmpv core has been shutdown'), None)
return return
except Exception as e: except Exception as e:
print('Exception inside python-mpv event loop:', file=sys.stderr) warn(f'Unhandled {e} inside python-mpv event loop!\n{traceback.format_exc()}', RuntimeWarning)
traceback.print_exc()
@property @property
def core_shutdown(self): def core_shutdown(self):
@ -953,35 +972,35 @@ class MPV(object):
if self._core_shutdown: if self._core_shutdown:
raise ShutdownError('libmpv core has been shutdown') raise ShutdownError('libmpv core has been shutdown')
def wait_until_paused(self, timeout=None): def wait_until_paused(self, timeout=None, catch_errors=True):
"""Waits until playback of the current title is paused or done. Raises a ShutdownError if the core is shutdown while """Waits until playback of the current title is paused or done. Raises a ShutdownError if the core is shutdown while
waiting.""" waiting."""
self.wait_for_property('core-idle', timeout=timeout) self.wait_for_property('core-idle', timeout=timeout, catch_errors=catch_errors)
def wait_for_playback(self, timeout=None): def wait_for_playback(self, timeout=None, catch_errors=True):
"""Waits until playback of the current title is finished. Raises a ShutdownError if the core is shutdown while """Waits until playback of the current title is finished. Raises a ShutdownError if the core is shutdown while
waiting. waiting.
""" """
self.wait_for_event('end_file', timeout=timeout) self.wait_for_event('end_file', timeout=timeout, catch_errors=catch_errors)
def wait_until_playing(self, timeout=None): def wait_until_playing(self, timeout=None, catch_errors=True):
"""Waits until playback of the current title has started. Raises a ShutdownError if the core is shutdown while """Waits until playback of the current title has started. Raises a ShutdownError if the core is shutdown while
waiting.""" waiting."""
self.wait_for_property('core-idle', lambda idle: not idle, timeout=timeout) self.wait_for_property('core-idle', lambda idle: not idle, timeout=timeout, catch_errors=catch_errors)
def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None): def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None, catch_errors=True):
"""Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for """Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for
properties such as ``idle_active`` indicating the player is done with regular playback and just idling around. properties such as ``idle_active`` indicating the player is done with regular playback and just idling around.
Raises a ShutdownError when the core is shutdown while waiting. Raises a ShutdownError when the core is shutdown while waiting.
""" """
with self.prepare_and_wait_for_property(name, cond, level_sensitive, timeout=timeout) as result: with self.prepare_and_wait_for_property(name, cond, level_sensitive, timeout=timeout, catch_errors=catch_errors) as result:
pass pass
return result.result() return result.result()
def wait_for_shutdown(self, timeout=None): def wait_for_shutdown(self, timeout=None, catch_errors=True):
'''Wait for core to shutdown (e.g. through quit() or terminate()).''' '''Wait for core to shutdown (e.g. through quit() or terminate()).'''
try: try:
self.wait_for_event(None, timeout=timeout) self.wait_for_event(None, timeout=timeout, catch_errors=catch_errors)
except ShutdownError: except ShutdownError:
return return
@ -999,7 +1018,7 @@ class MPV(object):
return shutdown_handler.unregister_mpv_events return shutdown_handler.unregister_mpv_events
@contextmanager @contextmanager
def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None): def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None, catch_errors=True):
"""Context manager that waits until ``cond`` evaluates to a truthy value on the named property. See """Context manager that waits until ``cond`` evaluates to a truthy value on the named property. See
prepare_and_wait_for_event for usage. prepare_and_wait_for_event for usage.
Raises a ShutdownError when the core is shutdown while waiting. Re-raises any errors inside ``cond``. Raises a ShutdownError when the core is shutdown while waiting. Re-raises any errors inside ``cond``.
@ -1023,6 +1042,9 @@ class MPV(object):
try: try:
result.set_running_or_notify_cancel() result.set_running_or_notify_cancel()
if catch_errors:
self._exception_futures.add(result)
yield result yield result
rv = cond(getattr(self, name.replace('-', '_'))) rv = cond(getattr(self, name.replace('-', '_')))
@ -1035,18 +1057,19 @@ class MPV(object):
finally: finally:
err_unregister() err_unregister()
self.unobserve_property(name, observer) self.unobserve_property(name, observer)
self._exception_futures.discard(result)
def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None): def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None, catch_errors=True):
"""Waits for the indicated event(s). If cond is given, waits until cond(event) is true. Raises a ShutdownError """Waits for the indicated event(s). If cond is given, waits until cond(event) is true. Raises a ShutdownError
if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types. Re-raises any error if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types. Re-raises any error
inside ``cond``. inside ``cond``.
""" """
with self.prepare_and_wait_for_event(*event_types, cond=cond, timeout=timeout) as result: with self.prepare_and_wait_for_event(*event_types, cond=cond, timeout=timeout, catch_errors=catch_errors) as result:
pass pass
return result.result() return result.result()
@contextmanager @contextmanager
def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None): def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None, catch_errors=True):
"""Context manager that waits for the indicated event(s) like wait_for_event after running. If cond is given, """Context manager that waits for the indicated event(s) like wait_for_event after running. If cond is given,
waits until cond(event) is true. Raises a ShutdownError if the core is shutdown while waiting. This also happens waits until cond(event) is true. Raises a ShutdownError if the core is shutdown while waiting. This also happens
when 'shutdown' is in event_types. Re-raises any error inside ``cond``. when 'shutdown' is in event_types. Re-raises any error inside ``cond``.
@ -1081,13 +1104,18 @@ class MPV(object):
try: try:
result.set_running_or_notify_cancel() result.set_running_or_notify_cancel()
if catch_errors:
self._exception_futures.add(result)
yield result yield result
self.check_core_alive() self.check_core_alive()
result.result(timeout) result.result(timeout)
finally: finally:
err_unregister() err_unregister()
target_handler.unregister_mpv_events() target_handler.unregister_mpv_events()
self._exception_futures.discard(result)
def __del__(self): def __del__(self):
if self.handle: if self.handle:
@ -1772,32 +1800,60 @@ class MPV(object):
frontend = open_fn(uri.decode('utf-8')) frontend = open_fn(uri.decode('utf-8'))
except ValueError: except ValueError:
return ErrorCode.LOADING_FAILED return ErrorCode.LOADING_FAILED
except Exception as e:
try:
fut = next(iter(self._exception_futures))
fut.set_exception(e)
except StopIteration:
warnings.warn(f'Unhandled exception {e} inside stream open callback for URI {uri}\n{traceback.format_exc()}')
def read_backend(_userdata, buf, bufsize): return ErrorCode.LOADING_FAILED
data = frontend.read(bufsize)
for i in range(len(data)):
buf[i] = data[i]
return len(data)
cb_info.contents.cookie = None cb_info.contents.cookie = None
def read_backend(_userdata, buf, bufsize):
with self._enqueue_exceptions():
data = frontend.read(bufsize)
for i in range(len(data)):
buf[i] = data[i]
return len(data)
read = cb_info.contents.read = StreamReadFn(read_backend) read = cb_info.contents.read = StreamReadFn(read_backend)
close = cb_info.contents.close = StreamCloseFn(lambda _userdata: frontend.close())
def close_backend(_userdata):
with self._enqueue_exceptions():
del self._stream_protocol_frontends[proto][uri]
if hasattr(frontend, 'close'):
frontend.close()
close = cb_info.contents.close = StreamCloseFn(close_backend)
seek, size, cancel = None, None, None seek, size, cancel = None, None, None
if hasattr(frontend, 'seek'):
seek = cb_info.contents.seek = StreamSeekFn(lambda _userdata, offx: frontend.seek(offx))
if hasattr(frontend, 'size') and frontend.size is not None:
size = cb_info.contents.size = StreamSizeFn(lambda _userdata: frontend.size)
if hasattr(frontend, 'cancel'):
cancel = cb_info.contents.cancel = StreamCancelFn(lambda _userdata: frontend.cancel())
# keep frontend and callbacks in memory forever (TODO) if hasattr(frontend, 'seek'):
def seek_backend(_userdata, offx):
with self._enqueue_exceptions():
return frontend.seek(offx)
seek = cb_info.contents.seek = StreamSeekFn(seek_backend)
if hasattr(frontend, 'size') and frontend.size is not None:
def size_backend(_userdata):
with self._enqueue_exceptions():
return frontend.size
size = cb_info.contents.size = StreamSizeFn(size_backend)
if hasattr(frontend, 'cancel'):
def cancel_backend(_userdata):
with self._enqueue_exceptions():
frontend.cancel()
cancel = cb_info.contents.cancel = StreamCancelFn(cancel_backend)
# keep frontend and callbacks in memory until closed
frontend._registered_callbacks = [read, close, seek, size, cancel] frontend._registered_callbacks = [read, close, seek, size, cancel]
self._stream_protocol_frontends[proto][uri] = frontend self._stream_protocol_frontends[proto][uri] = frontend
return 0 return 0
if proto in self._stream_protocol_cbs: if proto in self._stream_protocol_cbs:
raise KeyError('Stream protocol already registered') raise KeyError('Stream protocol already registered')
# keep backend in memory forever
self._stream_protocol_cbs[proto] = [open_backend] self._stream_protocol_cbs[proto] = [open_backend]
_mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend) _mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend)