Sprinkle some thread safety over event loop, add *wait_for_event

This commit is contained in:
jaseg 2020-07-18 14:21:31 +02:00
parent ad68ec5927
commit 846f2a65ae
2 changed files with 99 additions and 65 deletions

View file

@ -395,33 +395,32 @@ class KeyBindingTest(MpvTestCase):
self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a']) self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a'])
def keypress_and_sync(key): def keypress_and_sync(key):
self.m.keypress(key) with self.m.prepare_and_wait_for_event('client_message'):
self.m.frame_step() self.m.keypress(key)
self.m.wait_until_playing()
keypress_and_sync('a') keypress_and_sync('a')
handler1.assert_has_calls([ mock.call() ]) handler1.assert_has_calls([ mock.call() ])
handler2.assert_has_calls([]) handler2.assert_has_calls([])
handler1.reset_mock() handler1.reset_mock()
keypress_and_sync('x') self.m.keypress('x')
keypress_and_sync('X') self.m.keypress('X')
keypress_and_sync('b') keypress_and_sync('b')
handler1.assert_has_calls([ mock.call() ]) handler1.assert_has_calls([ mock.call() ])
handler2.assert_has_calls([]) handler2.assert_has_calls([])
handler1.reset_mock() handler1.reset_mock()
keypress_and_sync('c') keypress_and_sync('c')
keypress_and_sync('B') self.m.keypress('B')
handler1.assert_has_calls([]) handler1.assert_has_calls([])
handler2.assert_has_calls([ mock.call() ]) handler2.assert_has_calls([ mock.call() ])
handler2.reset_mock() handler2.reset_mock()
reg_test_fun.unregister_mpv_key_bindings() reg_test_fun.unregister_mpv_key_bindings()
keypress_and_sync('a') self.m.keypress('a')
keypress_and_sync('c') keypress_and_sync('c')
keypress_and_sync('x') self.m.keypress('x')
keypress_and_sync('A') self.m.keypress('A')
handler1.assert_has_calls([]) handler1.assert_has_calls([])
handler2.assert_has_calls([ mock.call() ]) handler2.assert_has_calls([ mock.call() ])

147
mpv.py
View file

@ -23,6 +23,7 @@ import os
import sys import sys
from warnings import warn from warnings import warn
from functools import partial, wraps from functools import partial, wraps
from contextlib import contextmanager
import collections import collections
import re import re
import traceback import traceback
@ -636,37 +637,6 @@ def _event_generator(handle):
yield event yield event
def _event_loop(event_handle, event_callbacks, message_handlers, property_handlers, log_handler):
for event in _event_generator(event_handle):
try:
devent = event.as_dict(decoder=lazy_decoder) # copy data from ctypes
eid = devent['event_id']
for callback in event_callbacks:
callback(devent)
if eid == MpvEventID.PROPERTY_CHANGE:
pc = devent['event']
name, value, _fmt = pc['name'], pc['value'], pc['format']
for handler in property_handlers[name]:
handler(name, value)
if eid == MpvEventID.LOG_MESSAGE and log_handler is not None:
ev = devent['event']
log_handler(ev['level'], ev['prefix'], ev['text'])
if eid == MpvEventID.CLIENT_MESSAGE:
# {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16}
target, *args = devent['event']['args']
if target in message_handlers:
message_handlers[target](*args)
if eid == MpvEventID.SHUTDOWN:
_mpv_detach_destroy(event_handle)
return
except Exception as e:
traceback.print_exc()
_py_to_mpv = lambda name: name.replace('_', '-') _py_to_mpv = lambda name: name.replace('_', '-')
_mpv_to_py = lambda name: name.replace('-', '_') _mpv_to_py = lambda name: name.replace('-', '_')
@ -857,13 +827,13 @@ class MPV(object):
self.lazy = _DecoderPropertyProxy(self, lazy_decoder) self.lazy = _DecoderPropertyProxy(self, lazy_decoder)
self._event_callbacks = [] self._event_callbacks = []
self._event_handler_lock = threading.Lock()
self._property_handlers = collections.defaultdict(lambda: []) self._property_handlers = collections.defaultdict(lambda: [])
self._quit_handlers = set() self._quit_handlers = set()
self._message_handlers = {} self._message_handlers = {}
self._key_binding_handlers = {} self._key_binding_handlers = {}
self._event_handle = _mpv_create_client(self.handle, b'py_event_handler') self._event_handle = _mpv_create_client(self.handle, b'py_event_handler')
self._loop = partial(_event_loop, self._event_handle, self._event_callbacks, self._log_handler = log_handler
self._message_handlers, self._property_handlers, log_handler)
self._stream_protocol_cbs = {} self._stream_protocol_cbs = {}
self._stream_protocol_frontends = collections.defaultdict(lambda: {}) self._stream_protocol_frontends = collections.defaultdict(lambda: {})
self.register_stream_protocol('python', self._python_stream_open) self.register_stream_protocol('python', self._python_stream_open)
@ -881,36 +851,68 @@ class MPV(object):
self._event_thread = None self._event_thread = None
self._core_shutdown = False self._core_shutdown = False
# This is the first callback in line, so other event callback-based mechanisms can use core_shutdown
@self.event_callback('shutdown') def _loop(self):
def shutdown_event_callback(event): for event in _event_generator(self._event_handle):
nonlocal self try:
self._core_shutdown = True devent = event.as_dict(decoder=lazy_decoder) # copy data from ctypes
eid = devent['event_id']
with self._event_handler_lock:
if eid == MpvEventID.SHUTDOWN:
self._core_shutdown = True
for callback in self._event_callbacks:
callback(devent)
if eid == MpvEventID.PROPERTY_CHANGE:
pc = devent['event']
name, value, _fmt = pc['name'], pc['value'], pc['format']
for handler in self._property_handlers[name]:
handler(name, value)
if eid == MpvEventID.LOG_MESSAGE and self._log_handler is not None:
ev = devent['event']
self._log_handler(ev['level'], ev['prefix'], ev['text'])
if eid == MpvEventID.CLIENT_MESSAGE:
# {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16}
target, *args = devent['event']['args']
if target in self._message_handlers:
self._message_handlers[target](*args)
if eid == MpvEventID.SHUTDOWN:
_mpv_detach_destroy(self._event_handle)
return
except Exception as e:
print('Exception inside python-mpv event loop:', file=sys.stderr)
traceback.print_exc()
@property @property
def core_shutdown(self): def core_shutdown(self):
return self._core_shutdown return self._core_shutdown
def wait_until_paused(self): def wait_until_paused(self):
"""Waits until playback of the current title is paused or done.""" """Waits until playback of the current title is paused or done. Raises a ShutdownError if the core is shutdown while
waiting."""
self.wait_for_property('core-idle') self.wait_for_property('core-idle')
def wait_for_playback(self): def wait_for_playback(self):
"""Waits until playback of the current title is paused or done. """Waits until playback of the current title is finished. Raises a ShutdownError if the core is shutdown while
waiting.
NOTE: This function changed from an event-based implementation to a property observer-based implementation in """
v0.5.0. This may cause different results in certain cases. If you find one such case, for documentation please self.wait_for_event('end_file')
tell the world in an issue on the github project."""
self.wait_until_playing()
self.wait_until_paused()
def wait_until_playing(self): def wait_until_playing(self):
"""Waits until playback of the current title has started.""" """Waits until playback of the current title has started. Raises a ShutdownError if the core is shutdown while
waiting."""
self.wait_for_property('core-idle', lambda idle: not idle) self.wait_for_property('core-idle', lambda idle: not idle)
def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True): def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True):
"""Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for """Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for
properties such as ``idle_active`` indicating the player is done with regular playback and just idling around properties such as ``idle_active`` indicating the player is done with regular playback and just idling around.
Raises a ShutdownError when the core is shutdown while waiting.
""" """
sema = threading.Semaphore(value=0) sema = threading.Semaphore(value=0)
@ -929,8 +931,38 @@ class MPV(object):
if self._core_shutdown: if self._core_shutdown:
raise ShutdownError('libmpv core has been shutdown') raise ShutdownError('libmpv core has been shutdown')
shutdown_handler.unregister_mpv_events()
self.unobserve_property(name, observer) self.unobserve_property(name, observer)
def wait_for_event(self, *event_types, cond=lambda evt: True):
with self.prepare_and_wait_for_event(*event_types, cond=cond):
pass
@contextmanager
def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True):
"""Waits for the indicated event(s). If cond is given, waits until cond(event) is true. Raises a ShutdownError
if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types.
"""
sema = threading.Semaphore(value=0)
@self.event_callback('shutdown')
def shutdown_handler(event):
sema.release()
@self.event_callback(*event_types)
def target_handler(evt):
if cond(evt):
sema.release()
yield
sema.acquire()
if self._core_shutdown:
raise ShutdownError('libmpv core has been shutdown')
shutdown_handler.unregister_mpv_events()
target_handler.unregister_mpv_events()
def __del__(self): def __del__(self):
if self.handle: if self.handle:
self.terminate() self.terminate()
@ -1369,14 +1401,17 @@ class MPV(object):
my_handler.unregister_mpv_events() my_handler.unregister_mpv_events()
""" """
def register(callback): def register(callback):
types = [MpvEventID.from_str(t) if isinstance(t, str) else t for t in event_types] or MpvEventID.ANY with self._event_handler_lock:
@wraps(callback) if self._core_shutdown:
def wrapper(event, *args, **kwargs): raise ShutdownError('libmpv core has been shutdown')
if event['event_id'] in types: types = [MpvEventID.from_str(t) if isinstance(t, str) else t for t in event_types] or MpvEventID.ANY
callback(event, *args, **kwargs) @wraps(callback)
self._event_callbacks.append(wrapper) def wrapper(event, *args, **kwargs):
wrapper.unregister_mpv_events = partial(self.unregister_event_callback, wrapper) if event['event_id'] in types:
return wrapper callback(event, *args, **kwargs)
self._event_callbacks.append(wrapper)
wrapper.unregister_mpv_events = partial(self.unregister_event_callback, wrapper)
return wrapper
return register return register
@staticmethod @staticmethod