Add event queue overflow handling
This commit is contained in:
parent
3cb1196621
commit
5f4cf600b5
2 changed files with 54 additions and 19 deletions
51
mpv.py
51
mpv.py
|
|
@ -58,6 +58,9 @@ else:
|
||||||
class ShutdownError(SystemError):
|
class ShutdownError(SystemError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class EventOverflowError(SystemError):
|
||||||
|
pass
|
||||||
|
|
||||||
class MpvHandle(c_void_p):
|
class MpvHandle(c_void_p):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -886,8 +889,6 @@ class MPV(object):
|
||||||
for event in _event_generator(self._event_handle):
|
for event in _event_generator(self._event_handle):
|
||||||
try:
|
try:
|
||||||
eid = event.event_id.value
|
eid = event.event_id.value
|
||||||
if not eid == MpvEventID.LOG_MESSAGE:
|
|
||||||
print(event)
|
|
||||||
|
|
||||||
with self._event_handler_lock:
|
with self._event_handler_lock:
|
||||||
if eid == MpvEventID.SHUTDOWN:
|
if eid == MpvEventID.SHUTDOWN:
|
||||||
|
|
@ -919,8 +920,15 @@ class MPV(object):
|
||||||
if callback:
|
if callback:
|
||||||
callback(ErrorCode.exception_for_ec(event.error), event.data)
|
callback(ErrorCode.exception_for_ec(event.error), event.data)
|
||||||
|
|
||||||
|
if eid == MpvEventID.QUEUE_OVERFLOW:
|
||||||
|
# cache list, since error handlers will unregister themselves
|
||||||
|
for cb in list(self._command_reply_callbacks.values()):
|
||||||
|
cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None)
|
||||||
|
|
||||||
if eid == MpvEventID.SHUTDOWN:
|
if eid == MpvEventID.SHUTDOWN:
|
||||||
_mpv_destroy(self._event_handle)
|
_mpv_destroy(self._event_handle)
|
||||||
|
for cb in list(self._command_reply_callbacks.values()):
|
||||||
|
cb(ShutdownError('libmpv core has been shutdown'), None)
|
||||||
return
|
return
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
@ -971,6 +979,19 @@ class MPV(object):
|
||||||
except ShutdownError:
|
except ShutdownError:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def _set_error_handler(self, future):
|
||||||
|
@self.event_callback('shutdown', 'queue-overflow')
|
||||||
|
def shutdown_handler(event):
|
||||||
|
nonlocal future
|
||||||
|
try:
|
||||||
|
if event.event_id.value == MpvEventID.SHUTDOWN:
|
||||||
|
future.set_exception(ShutdownError('libmpv core has been shutdown'))
|
||||||
|
else:
|
||||||
|
future.set_exception(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'))
|
||||||
|
except InvalidStateError:
|
||||||
|
pass
|
||||||
|
return shutdown_handler.unregister_mpv_events
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None):
|
def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None):
|
||||||
"""Context manager that waits until ``cond`` evaluates to a truthy value on the named property. See
|
"""Context manager that waits until ``cond`` evaluates to a truthy value on the named property. See
|
||||||
|
|
@ -992,13 +1013,7 @@ class MPV(object):
|
||||||
except InvalidStateError:
|
except InvalidStateError:
|
||||||
pass
|
pass
|
||||||
self.observe_property(name, observer)
|
self.observe_property(name, observer)
|
||||||
|
err_unregister = self._set_error_handler(result)
|
||||||
@self.event_callback('shutdown')
|
|
||||||
def shutdown_handler(event):
|
|
||||||
try:
|
|
||||||
result.set_exception(ShutdownError('libmpv core has been shutdown'))
|
|
||||||
except InvalidStateError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result.set_running_or_notify_cancel()
|
result.set_running_or_notify_cancel()
|
||||||
|
|
@ -1012,7 +1027,7 @@ class MPV(object):
|
||||||
self.check_core_alive()
|
self.check_core_alive()
|
||||||
result.result(timeout)
|
result.result(timeout)
|
||||||
finally:
|
finally:
|
||||||
shutdown_handler.unregister_mpv_events()
|
err_unregister()
|
||||||
self.unobserve_property(name, observer)
|
self.unobserve_property(name, observer)
|
||||||
|
|
||||||
def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None):
|
def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None):
|
||||||
|
|
@ -1041,13 +1056,6 @@ class MPV(object):
|
||||||
"""
|
"""
|
||||||
result = Future()
|
result = Future()
|
||||||
|
|
||||||
@self.event_callback('shutdown')
|
|
||||||
def shutdown_handler(event):
|
|
||||||
try:
|
|
||||||
result.set_exception(ShutdownError('libmpv core has been shutdown'))
|
|
||||||
except InvalidStateError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@self.event_callback(*event_types)
|
@self.event_callback(*event_types)
|
||||||
def target_handler(evt):
|
def target_handler(evt):
|
||||||
|
|
||||||
|
|
@ -1063,6 +1071,8 @@ class MPV(object):
|
||||||
except InvalidStateError:
|
except InvalidStateError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
err_unregister = self._set_error_handler(result)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result.set_running_or_notify_cancel()
|
result.set_running_or_notify_cancel()
|
||||||
yield result
|
yield result
|
||||||
|
|
@ -1070,7 +1080,7 @@ class MPV(object):
|
||||||
result.result(timeout)
|
result.result(timeout)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
shutdown_handler.unregister_mpv_events()
|
err_unregister()
|
||||||
target_handler.unregister_mpv_events()
|
target_handler.unregister_mpv_events()
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
|
|
@ -1138,7 +1148,10 @@ class MPV(object):
|
||||||
result = result.unpack(decoder)
|
result = result.unpack(decoder)
|
||||||
future.set_result(callback(error, result))
|
future.set_result(callback(error, result))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
future.set_exception(e)
|
try:
|
||||||
|
future.set_exception(e)
|
||||||
|
except InvalidStateError:
|
||||||
|
pass
|
||||||
|
|
||||||
def abort():
|
def abort():
|
||||||
_mpv_abort_async_command(self._event_handle, id(future))
|
_mpv_abort_async_command(self._event_handle, id(future))
|
||||||
|
|
|
||||||
|
|
@ -659,6 +659,28 @@ class TestLifecycle(unittest.TestCase):
|
||||||
m.terminate()
|
m.terminate()
|
||||||
self.disp.stop()
|
self.disp.stop()
|
||||||
|
|
||||||
|
def test_wait_for_prooperty_event_overflow(self):
|
||||||
|
self.disp = Xvfb()
|
||||||
|
self.disp.start()
|
||||||
|
handler = mock.Mock()
|
||||||
|
m = mpv.MPV(vo=testvo)
|
||||||
|
m.play(TESTVID)
|
||||||
|
with self.assertRaises(mpv.EventOverflowError):
|
||||||
|
# level_sensitive=false needed to prevent get_property on dead
|
||||||
|
# handle
|
||||||
|
with m.prepare_and_wait_for_property('mute', cond=lambda val: time.sleep(0.001)):
|
||||||
|
for i in range(10000):
|
||||||
|
try:
|
||||||
|
# We really have to try hard to fill up the queue here. Simple async commands will not work,
|
||||||
|
# since then command_async will throw a memory error first. Property changes also do not work,
|
||||||
|
# since they are only processsed when the event loop is idle. This here works reliably.
|
||||||
|
m.command_async('script-message', 'foo', 'bar')
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
self.disp.stop()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_wait_for_event_shutdown(self):
|
def test_wait_for_event_shutdown(self):
|
||||||
self.disp = Xvfb()
|
self.disp = Xvfb()
|
||||||
self.disp.start()
|
self.disp.start()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue