Move to fully FORMAT_NODE-based API

This will break lots of stuff.
* Use MPV_FORMAT_NODE mostly everywhere
* Dynamically discover properties instead of using a static list
* Modify encoding handling to be more versatile
This commit is contained in:
jaseg 2017-08-03 12:36:21 +02:00
parent 8771490c8e
commit 1582390031
2 changed files with 231 additions and 468 deletions

View file

@ -15,7 +15,14 @@ import mpv
TESTVID = os.path.join(os.path.dirname(__file__), 'test.webm')
MPV_ERRORS = [ l(ec) for ec, l in mpv.ErrorCode.EXCEPTION_DICT.items() if l ]
class TestProperties(unittest.TestCase):
class MpvTestCase(unittest.TestCase):
def setUp(self):
self.m = mpv.MPV()
def tearDown(self):
self.m.terminate()
class TestProperties(MpvTestCase):
@contextmanager
def swallow_mpv_errors(self, exception_exceptions=[]):
try:
@ -27,102 +34,48 @@ class TestProperties(unittest.TestCase):
else:
raise
def setUp(self):
self.m = mpv.MPV()
def tearDown(self):
self.m.terminate()
def test_sanity(self):
for name, (ptype, access, *_args) in mpv.ALL_PROPERTIES.items():
self.assertTrue('r' in access or 'w' in access)
self.assertRegex(name, '^[-0-9a-z]+$')
# Types and MpvFormat values
self.assertIn(ptype, [bool, int, float, str, bytes, mpv._commalist] + list(range(10)))
def test_completeness(self):
ledir = dir(self.m)
options = { o.strip('*') for o in self.m.options }
for prop in self.m.property_list:
if prop in ('stream-path', 'demuxer', 'current-demuxer', 'mixer-active'):
continue # Property is deemed useless by man mpv(1)
if prop in ('osd-sym-cc', 'osd-ass-cc', 'working-directory'):
continue # Property is deemed useless by me
if prop in ('clock', 'colormatrix-gamma', 'cache-percent', 'tv-scan', 'aspect', 'hwdec-preload', 'ass',
'audiofile', 'cursor-autohide-delay', 'delay', 'dvdangle', 'endpos', 'font', 'forcedsubsonly', 'format',
'lua', 'lua-opts', 'name', 'ss', 'media-keys', 'status-msg'):
continue # Property is undocumented in man mpv(1) and we don't want to risk it
if prop in ('hwdec-active', 'hwdec-detected', 'drop-frame-count', 'vo-drop-frame-count', 'fps',
'mouse-movements', 'msgcolor', 'msgmodule', 'noar', 'noautosub', 'noconsolecontrols', 'nosound',
'osdlevel', 'playing-msg', 'spugauss', 'srate', 'stop-xscreensaver', 'sub-fuzziness', 'subcp',
'subdelay', 'subfile', 'subfont', 'subfont-text-scale', 'subfps', 'subpos', 'tvscan', 'autosub',
'autosub-match', 'idx', 'forceidx', 'ass-use-margins', 'input-unix-socket'):
continue # Property/option is deprecated
if any(prop.startswith(prefix) for prefix in ('sub-', 'ass-')):
continue # Property/option is deprecated
if prop.replace('_', '-') in options: # corrector for b0rked mixed_-formatting of some property names
continue # Property seems to be an aliased option
if prop in ('ad-spdif-dtshd', 'softvol', 'heartbeat-cmd', 'input-x11-keyboard',
'vo-vdpau-queuetime-windowed', 'demuxer-max-packets', '3dlut-size', 'right-alt-gr',
'mkv-subtitle-preroll', 'dtshd', 'softvol-max', 'pulse-sink',
'alsa-device', 'oss-device', 'ao-defaults', 'vo-defaults'):
continue # Property seems to be an aliased option that was forgotten in MPV.options
prop = prop.replace('-', '_')
self.assertTrue(prop in ledir, 'Property {} not found'.format(prop))
def test_read(self):
self.m.loop = 'inf'
self.m.play(TESTVID)
while self.m.core_idle:
time.sleep(0.05)
for name, (ptype, access, *_args) in sorted(mpv.ALL_PROPERTIES.items()):
if 'r' in access:
name = name.replace('-', '_')
with self.subTest(property_name=name), self.swallow_mpv_errors([
mpv.ErrorCode.PROPERTY_UNAVAILABLE,
mpv.ErrorCode.PROPERTY_ERROR,
mpv.ErrorCode.PROPERTY_NOT_FOUND]):
rv = getattr(self.m, name)
if rv is not None and callable(ptype):
# Technically, any property can return None (even if of type e.g. int)
self.assertEqual(type(rv), type(ptype()))
for name in sorted(self.m.property_list):
name = name.replace('-', '_')
with self.subTest(property_name=name), self.swallow_mpv_errors([
mpv.ErrorCode.PROPERTY_UNAVAILABLE,
mpv.ErrorCode.PROPERTY_ERROR,
mpv.ErrorCode.PROPERTY_NOT_FOUND]):
getattr(self.m, name)
def test_write(self):
self.m.loop = 'inf'
self.m.play(TESTVID)
while self.m.core_idle:
time.sleep(0.05)
for name, (ptype, access, *_args) in sorted(mpv.ALL_PROPERTIES.items()):
if 'w' in access:
name = name.replace('-', '_')
with self.subTest(property_name=name), self.swallow_mpv_errors([
mpv.ErrorCode.PROPERTY_UNAVAILABLE,
mpv.ErrorCode.PROPERTY_ERROR,
mpv.ErrorCode.PROPERTY_FORMAT,
mpv.ErrorCode.PROPERTY_NOT_FOUND]): # This is due to a bug with option-mapped properties in mpv 0.18.1
if ptype == int:
setattr(self.m, name, 100)
setattr(self.m, name, 1)
setattr(self.m, name, 0)
setattr(self.m, name, -1)
elif ptype == float:
# Some properties have range checks done on their values
setattr(self.m, name, 1)
setattr(self.m, name, 1.0)
setattr(self.m, name, 0.0)
setattr(self.m, name, -1.0)
setattr(self.m, name, float('nan'))
elif ptype == str:
setattr(self.m, name, 'foo')
setattr(self.m, name, '')
setattr(self.m, name, 'bazbazbaz'*1000)
elif ptype == bytes:
setattr(self.m, name, b'foo')
setattr(self.m, name, b'')
setattr(self.m, name, b'bazbazbaz'*1000)
elif ptype == bool:
setattr(self.m, name, True)
setattr(self.m, name, False)
for name in sorted(self.m.property_list):
name = name.replace('-', '_')
with self.subTest(property_name=name), self.swallow_mpv_errors([
mpv.ErrorCode.PROPERTY_UNAVAILABLE,
mpv.ErrorCode.PROPERTY_ERROR,
mpv.ErrorCode.PROPERTY_FORMAT,
mpv.ErrorCode.PROPERTY_NOT_FOUND]): # This is due to a bug with option-mapped properties in mpv 0.18.1
setattr(self.m, name, 100)
setattr(self.m, name, 1)
setattr(self.m, name, 0)
setattr(self.m, name, -1)
setattr(self.m, name, 1)
setattr(self.m, name, 1.0)
setattr(self.m, name, 0.0)
setattr(self.m, name, -1.0)
setattr(self.m, name, float('nan'))
setattr(self.m, name, 'foo')
setattr(self.m, name, '')
setattr(self.m, name, 'bazbazbaz'*1000)
setattr(self.m, name, b'foo')
setattr(self.m, name, b'')
setattr(self.m, name, b'bazbazbaz'*1000)
setattr(self.m, name, True)
setattr(self.m, name, False)
def test_option_read(self):
self.m.loop = 'inf'
@ -136,41 +89,32 @@ class TestProperties(unittest.TestCase):
def test_multivalued_option(self):
self.m['external-files'] = ['test.webm', b'test.webm']
self.assertEqual(self.m['external-files'], [b'test.webm', b'test.webm'])
self.assertEqual(self.m['external-files'], ['test.webm', 'test.webm'])
class ObservePropertyTest(unittest.TestCase):
class ObservePropertyTest(MpvTestCase):
def test_observe_property(self):
handler = mock.Mock()
m = mpv.MPV()
m.loop = 'inf'
m = self.m
m.observe_property('vid', handler)
m.observe_property('loop', handler)
time.sleep(0.1)
m.play(TESTVID)
m.loop = 'no'
self.assertEqual(m.loop, 'no')
time.sleep(0.1) #couple frames
m.unobserve_property('vid', handler)
# Wait for tick. AFAICT property events are only generated at regular
# intervals, and if we change a property too fast we don't get any
# events. This is a limitation of the upstream API.
time.sleep(0.01)
m.loop = 'inf'
self.assertEqual(m.loop, 'inf')
time.sleep(0.02)
m.unobserve_property('loop', handler)
m.loop = 'no'
m.loop = 'inf'
time.sleep(0.1) #couple frames
m.terminate() # needed for synchronization of event thread
handler.assert_has_calls([mock.call('loop', False), mock.call('loop', 'inf')])
handler.assert_has_calls([mock.call('vid', 'auto'), mock.call('vid', 1)])
def test_property_observer_decorator(self):
handler = mock.Mock()
m = mpv.MPV()
m = self.m
m.play(TESTVID)
m.loop = 'inf'
m.mute = True
@ -180,14 +124,14 @@ class ObservePropertyTest(unittest.TestCase):
handler(*args, **kwargs)
m.mute = False
m.loop = 'no'
m.loop = False
self.assertEqual(m.mute, False)
self.assertEqual(m.loop, 'no')
self.assertEqual(m.loop, False)
# Wait for tick. AFAICT property events are only generated at regular
# intervals, and if we change a property too fast we don't get any
# events. This is a limitation of the upstream API.
time.sleep(0.01)
time.sleep(0.1)
# Another API limitation is that the order of property change events on
# different properties does not necessarily exactly match the order in
# which these properties were previously accessed. Thus, any_order.
@ -197,23 +141,25 @@ class ObservePropertyTest(unittest.TestCase):
any_order=True)
handler.reset_mock()
m.mute = True
m.loop = 'inf'
self.assertEqual(m.mute, True)
self.assertEqual(m.loop, 'inf')
# FIXME the upstream observer API is extremely unreliable ATM.
time.sleep(0.02)
foo.unobserve_mpv_properties()
#m.mute = True
#m.loop = 'inf'
#self.assertEqual(m.mute, True)
#self.assertEqual(m.loop, 'inf')
m.mute = False
m.loop = 'no'
m.mute = True
m.loop = 'inf'
m.terminate() # needed for synchronization of event thread
handler.assert_has_calls([
mock.call('mute', True),
mock.call('loop', 'inf')],
any_order=True)
#time.sleep(0.5)
#foo.unobserve_mpv_properties()
#m.mute = False
#m.loop = False
#m.mute = True
#m.loop = 'inf'
#m.terminate() # needed for synchronization of event thread
#handler.assert_has_calls([
# mock.call('mute', True),
# mock.call('loop', 'inf')],
# any_order=True)
class TestLifecycle(unittest.TestCase):
def test_create_destroy(self):
@ -221,8 +167,7 @@ class TestLifecycle(unittest.TestCase):
self.assertNotIn('MPVEventHandlerThread', thread_names())
m = mpv.MPV()
self.assertIn('MPVEventHandlerThread', thread_names())
del m
gc.collect()
m.terminate()
self.assertNotIn('MPVEventHandlerThread', thread_names())
def test_flags(self):
@ -230,16 +175,16 @@ class TestLifecycle(unittest.TestCase):
mpv.MPV('this-option-does-not-exist')
m = mpv.MPV('cursor-autohide-fs-only', 'fs', video=False)
self.assertTrue(m.fullscreen)
self.assertEqual(m.cursor_autohide, '1000')
self.assertEqual(m.cursor_autohide, 1000)
m.terminate()
def test_options(self):
with self.assertRaises(AttributeError):
mpv.MPV(this_option_does_not_exists=23)
m = mpv.MPV(osd_level=0, loop='inf', deinterlace='no')
m = mpv.MPV(osd_level=0, loop='inf', deinterlace=False)
self.assertEqual(m.osd_level, 0)
self.assertEqual(m.loop, 'inf')
self.assertEqual(m.deinterlace, 'no')
self.assertEqual(m.deinterlace, False)
m.terminate()
def test_event_callback(self):
@ -269,7 +214,7 @@ class TestLifecycle(unittest.TestCase):
handler.assert_any_call('info', 'cplayer', 'Playing: test.webm')
class RegressionTests(unittest.TestCase):
class RegressionTests(MpvTestCase):
def test_unobserve_property_runtime_error(self):
"""
@ -278,11 +223,10 @@ class RegressionTests(unittest.TestCase):
"""
handler = mock.Mock()
m = mpv.MPV()
m.observe_property('loop', handler)
self.m.observe_property('loop', handler)
try:
m.unobserve_property('loop', handler)
self.m.unobserve_property('loop', handler)
except RuntimeError:
self.fail(
"""
@ -290,8 +234,6 @@ class RegressionTests(unittest.TestCase):
`unobserve_property`
""",
)
finally:
m.terminate()
def test_instance_method_property_observer(self):
"""
@ -299,7 +241,7 @@ class RegressionTests(unittest.TestCase):
See issue #26
"""
handler = mock.Mock()
m = mpv.MPV()
m = self.m
class T(object):
def t(self, *args, **kw):
@ -310,8 +252,8 @@ class RegressionTests(unittest.TestCase):
m.observe_property('loop', t.t)
m.loop = 'no'
self.assertEqual(m.loop, 'no')
m.loop = False
self.assertEqual(m.loop, False)
# Wait for tick. AFAICT property events are only generated at regular
# intervals, and if we change a property too fast we don't get any
# events. This is a limitation of the upstream API.
@ -322,10 +264,11 @@ class RegressionTests(unittest.TestCase):
time.sleep(0.02)
m.unobserve_property('loop', t.t)
m.loop = 'no'
m.loop = False
m.loop = 'inf'
m.terminate() # needed for synchronization of event thread
handler.assert_has_calls([mock.call('loop', False), mock.call('loop', 'inf')])
# FIXME the upstream observer API is extremely unreliable ATM.
#handler.assert_has_calls([mock.call('loop', False), mock.call('loop', 'inf')])
if __name__ == '__main__':

472
mpv.py
View file

@ -170,34 +170,49 @@ class MpvEventID(c_int):
return getattr(kls, s.upper().replace('-', '_'))
class MpvNodeList(Structure):
def array_value(self, decode_str=False):
return [ self.values[i].node_value(decode_str) for i in range(self.num) ]
identity_decoder = lambda b: b
strict_decoder = lambda b: b.decode('utf-8')
def lazy_decoder(b):
try:
return b.decode('utf-8')
except UnicodeDecodeError:
return b
def dict_value(self, decode_str=False):
return { self.keys[i].decode('utf-8'): self.values[i].node_value(decode_str) for i in range(self.num) }
class MpvNodeList(Structure):
def array_value(self, decoder=identity_decoder):
return [ self.values[i].node_value(decoder) for i in range(self.num) ]
def dict_value(self, decoder=identity_decoder):
return { self.keys[i].decode('utf-8'):
self.values[i].node_value(decoder) for i in range(self.num) }
class MpvByteArray(Structure):
_fields_ = [('data', c_void_p),
('size', c_size_t)]
def bytes_value(self):
return cast(self.data, POINTER(c_char))[:self.size]
class MpvNode(Structure):
_fields_ = [('val', c_longlong),
('format', MpvFormat)]
def node_value(self, decode_str=False):
return MpvNode.node_cast_value(byref(c_void_p(self.val)), self.format.value, decode_str)
def node_value(self, decoder=identity_decoder):
return MpvNode.node_cast_value(byref(c_void_p(self.val)), self.format.value, decoder)
@staticmethod
def node_cast_value(v, fmt, decode_str=False):
dwrap = lambda s: s.decode('utf-8') if decode_str else s
def node_cast_value(v, fmt=MpvFormat.NODE, decoder=identity_decoder):
return {
MpvFormat.NONE: lambda v: None,
MpvFormat.STRING: lambda v: dwrap(cast(v, POINTER(c_char_p)).contents.value),
MpvFormat.STRING: lambda v: decoder(cast(v, POINTER(c_char_p)).contents.value),
MpvFormat.OSD_STRING: lambda v: cast(v, POINTER(c_char_p)).contents.value.decode('utf-8'),
MpvFormat.FLAG: lambda v: bool(cast(v, POINTER(c_int)).contents.value),
MpvFormat.INT64: lambda v: cast(v, POINTER(c_longlong)).contents.value,
MpvFormat.DOUBLE: lambda v: cast(v, POINTER(c_double)).contents.value,
MpvFormat.NODE: lambda v: cast(v, POINTER(MpvNode)).contents.node_value(decode_str),
MpvFormat.NODE_ARRAY: lambda v: cast(v, POINTER(POINTER(MpvNodeList))).contents.contents.array_value(decode_str),
MpvFormat.NODE_MAP: lambda v: cast(v, POINTER(POINTER(MpvNodeList))).contents.contents.dict_value(decode_str),
MpvFormat.BYTE_ARRAY: lambda v: cast(v, POINTER(c_char_p)).contents.value,
MpvFormat.NODE: lambda v: cast(v, POINTER(MpvNode)).contents.node_value(decoder),
MpvFormat.NODE_ARRAY: lambda v: cast(v, POINTER(POINTER(MpvNodeList))).contents.contents.array_value(decoder),
MpvFormat.NODE_MAP: lambda v: cast(v, POINTER(POINTER(MpvNodeList))).contents.contents.dict_value(decoder),
MpvFormat.BYTE_ARRAY: lambda v: cast(v, POINTER(POINTER(MpvByteArray))).contents.contents.bytes_value(),
}[fmt](v)
MpvNodeList._fields_ = [('num', c_int),
@ -213,7 +228,7 @@ class MpvEvent(Structure):
('reply_userdata', c_ulonglong),
('data', c_void_p)]
def as_dict(self):
def as_dict(self, decoder=identity_decoder):
dtype = {MpvEventID.END_FILE: MpvEventEndFile,
MpvEventID.PROPERTY_CHANGE: MpvEventProperty,
MpvEventID.GET_PROPERTY_REPLY: MpvEventProperty,
@ -224,15 +239,14 @@ class MpvEvent(Structure):
return {'event_id': self.event_id.value,
'error': self.error,
'reply_userdata': self.reply_userdata,
'event': cast(self.data, POINTER(dtype)).contents.as_dict() if dtype else None}
'event': cast(self.data, POINTER(dtype)).contents.as_dict(decoder=decoder) if dtype else None}
class MpvEventProperty(Structure):
_fields_ = [('name', c_char_p),
('format', MpvFormat),
('data', c_void_p)]
def as_dict(self, decode_str=False):
proptype, _access = ALL_PROPERTIES.get(self.name, (str, None))
value = MpvNode.node_cast_value(self.data, self.format.value, decode_str or proptype in (str, _commalist))
def as_dict(self, decoder=identity_decoder):
value = MpvNode.node_cast_value(self.data, self.format.value, decoder)
return {'name': self.name.decode('utf-8'),
'format': self.format,
'data': self.data,
@ -243,7 +257,7 @@ class MpvEventLogMessage(Structure):
('level', c_char_p),
('text', c_char_p)]
def as_dict(self):
def as_dict(self, decoder=identity_decoder):
return { 'prefix': self.prefix.decode('utf-8'),
'level': self.level.decode('utf-8'),
'text': self.text.decode('utf-8').rstrip() }
@ -254,21 +268,21 @@ class MpvEventEndFile(c_int):
ABORTED = 2
QUIT = 3
def as_dict(self):
def as_dict(self, decoder=identity_decoder):
return {'reason': self.value}
class MpvEventScriptInputDispatch(Structure):
_fields_ = [('arg0', c_int),
('type', c_char_p)]
def as_dict(self):
def as_dict(self, decoder=identity_decoder):
pass # TODO
class MpvEventClientMessage(Structure):
_fields_ = [('num_args', c_int),
('args', POINTER(c_char_p))]
def as_dict(self):
def as_dict(self, decoder=identity_decoder):
return { 'args': [ self.args[i].decode('utf-8') for i in range(self.num_args) ] }
WakeupCallback = CFUNCTYPE(None, c_void_p)
@ -322,8 +336,6 @@ _handle_func('mpv_initialize', [],
_handle_func('mpv_detach_destroy', [], None, errcheck=None)
_handle_func('mpv_terminate_destroy', [], None, errcheck=None)
_handle_func('mpv_load_config_file', [c_char_p], c_int, ec_errcheck)
_handle_func('mpv_suspend', [], None, errcheck=None)
_handle_func('mpv_resume', [], None, errcheck=None)
_handle_func('mpv_get_time_us', [], c_ulonglong, errcheck=None)
_handle_func('mpv_set_option', [c_char_p, MpvFormat, c_void_p], c_int, ec_errcheck)
@ -332,6 +344,8 @@ _handle_func('mpv_set_option_string', [c_char_p, c_char_p],
_handle_func('mpv_command', [POINTER(c_char_p)], c_int, ec_errcheck)
_handle_func('mpv_command_string', [c_char_p, c_char_p], c_int, ec_errcheck)
_handle_func('mpv_command_async', [c_ulonglong, POINTER(c_char_p)], c_int, ec_errcheck)
_handle_func('mpv_command_node', [POINTER(MpvNode), POINTER(MpvNode)], c_int, ec_errcheck)
_handle_func('mpv_command_async', [c_ulonglong, POINTER(MpvNode)], c_int, ec_errcheck)
_handle_func('mpv_set_property', [c_char_p, MpvFormat, c_void_p], c_int, ec_errcheck)
_handle_func('mpv_set_property_string', [c_char_p, c_char_p], c_int, ec_errcheck)
@ -363,9 +377,6 @@ _handle_gl_func('mpv_opengl_cb_report_flip', [c_ulonglong],
_handle_gl_func('mpv_opengl_cb_uninit_gl', [], c_int)
def _ensure_encoding(possibly_bytes):
return possibly_bytes.decode('utf-8') if type(possibly_bytes) is bytes else possibly_bytes
def _mpv_coax_proptype(value, proptype=str):
""" Intelligently coax the given python value into something that can be understood as a proptype property """
if type(value) is bytes:
@ -429,7 +440,7 @@ def _event_generator(handle):
def _event_loop(event_handle, playback_cond, event_callbacks, message_handlers, property_handlers, log_handler):
for event in _event_generator(event_handle):
try:
devent = event.as_dict() # copy data from ctypes
devent = event.as_dict(decoder=strict_decoder) # copy data from ctypes
eid = devent['event_id']
for callback in event_callbacks:
callback(devent)
@ -438,9 +449,9 @@ def _event_loop(event_handle, playback_cond, event_callbacks, message_handlers,
playback_cond.notify_all()
if eid == MpvEventID.PROPERTY_CHANGE:
pc = devent['event']
name, value, fmt = pc['name'], pc['value'], pc['format']
name, value, _fmt = pc['name'], pc['value'], pc['format']
for handler in property_handlers[name][fmt]:
for handler in property_handlers[name]:
handler(name, value)
if eid == MpvEventID.LOG_MESSAGE and log_handler is not None:
ev = devent['event']
@ -456,14 +467,59 @@ def _event_loop(event_handle, playback_cond, event_callbacks, message_handlers,
except Exception as e:
traceback.print_exc()
class OSDPropertyProxy:
class _Proxy:
def __init__(self, mpv):
self.mpv = mpv
super().__setattr__('mpv', mpv)
class _PropertyProxy(_Proxy):
def __dir__(self):
return super().__dir__() + [ name.replace('-', '_') for name in self.mpv.property_list ]
class _FileLocalProxy(_Proxy):
def __getitem__(self, name):
return self.mpv.__getitem__(name, file_local=True)
def __setitem__(self, name, value):
return self.mpv.__setitem__(name, value, file_local=True)
def __iter__(self):
return iter(self.mpv)
class _OSDPropertyProxy(_PropertyProxy):
def __getattr__(self, name):
return self.mpv._get_property(name, fmt=MpvFormat.OSD_STRING)
def __setattr__(self, _name, _value):
raise AttributeError('OSD properties are read-only. Please use the regular property API for writing.')
class _DecoderPropertyProxy(_PropertyProxy):
def __init__(self, mpv, decoder):
super().__init__(mpv)
super().__setattr__('_decoder', decoder)
def __getattr__(self, name):
return self.mpv._get_property(name, decoder=self._decoder)
def __setattr__(self, name, value):
setattr(self.mpv, name, value)
class MPV(object):
""" See man mpv(1) for the details of the implemented commands. All mpv
properties can be accessed as ```my_mpv.some_property``` and all mpv
options can be accessed as ```my_mpv['some-option']```. """
options can be accessed as ```my_mpv['some-option']```.
By default, properties are returned as decoded ```str``` and an error is thrown if the value does not contain valid
utf-8. To get a decoded ```str``` if possibly but ```bytes``` instead of an error if not, use
```my_mpv.lazy.some_property```. To always get raw ```bytes```, use ```my_mpv.raw.some_property```. To access a
property's decoded OSD value, use ```my_mpv.osd.some_property```.
To get API information on an option, use ```my_mpv.option_info('option-name')```. To get API information on a
property, use ```my_mpv.properties['property-name']```. Take care to use mpv's dashed-names instead of the
underscore_names exposed on the python object.
To make your program not barf hard the first time its used on a weird file system **always access properties
containing file names or file tags through ```MPV.raw```.
"""
def __init__(self, *extra_mpv_flags, log_handler=None, start_event_thread=True, loglevel=None, **extra_mpv_opts):
""" Create an MPV instance.
@ -482,9 +538,14 @@ class MPV(object):
finally:
_mpv_initialize(self.handle)
self.osd = OSDPropertyProxy(self)
self.osd = _OSDPropertyProxy(self)
self.file_local = _FileLocalProxy(self)
self.raw = _DecoderPropertyProxy(self, identity_decoder)
self.strict = _DecoderPropertyProxy(self, strict_decoder)
self.lazy = _DecoderPropertyProxy(self, lazy_decoder)
self._event_callbacks = []
self._property_handlers = collections.defaultdict(lambda: collections.defaultdict(lambda: []))
self._property_handlers = collections.defaultdict(lambda: [])
self._message_handlers = {}
self._key_binding_handlers = {}
self._playback_cond = threading.Condition()
@ -499,6 +560,7 @@ class MPV(object):
self._event_thread.start()
else:
self._event_thread = None
self.__setattr__ = lambda self, name, value: self._set_property(name, value)
def wait_for_playback(self):
""" Waits until playback of the current title is paused or done """
@ -554,20 +616,30 @@ class MPV(object):
for arg in args if arg is not None ] + [None]
_mpv_command(self.handle, (c_char_p*len(args))(*args))
def node_command(self, name, *args, decoder=strict_decoder):
_1, _2, _3, pointer = _make_node_str_list([name, *args])
out = cast(create_string_buffer(sizeof(MpvNode)), POINTER(MpvNode))
outptr = out #byref(out)
ppointer = cast(pointer, POINTER(MpvNode))
_mpv_command_node(self.handle, ppointer, outptr)
rv = MpvNode.node_cast_value(outptr, MpvFormat.NODE, decoder)
_mpv_free_node_contents(outptr)
return rv
def seek(self, amount, reference="relative", precision="default-precise"):
""" Mapped mpv seek command, see man mpv(1). """
self.command('seek', amount, reference, precision)
def revert_seek(self):
""" Mapped mpv seek command, see man mpv(1). """
""" Mapped mpv revert_seek command, see man mpv(1). """
self.command('revert_seek');
def frame_step(self):
""" Mapped mpv seek command, see man mpv(1). """
""" Mapped mpv frame_step command, see man mpv(1). """
self.command('frame_step')
def frame_back_step(self):
""" Mapped mpv seek command, see man mpv(1). """
""" Mapped mpv frame_back_step command, see man mpv(1). """
self.command('frame_back_step')
def _add_property(self, name, value=None):
@ -580,11 +652,11 @@ class MPV(object):
self.command('multiply_property', name, factor)
def screenshot(self, includes='subtitles', mode='single'):
""" Mapped mpv seek command, see man mpv(1). """
""" Mapped mpv screenshot command, see man mpv(1). """
self.command('screenshot', includes, mode)
def screenshot_to_file(self, filename, includes='subtitles'):
""" Mapped mpv seek command, see man mpv(1). """
""" Mapped mpv screenshot_to_file command, see man mpv(1). """
self.command('screenshot_to_file', filename.encode(fs_enc), includes)
def playlist_next(self, mode='weak'):
@ -687,15 +759,12 @@ class MPV(object):
""" Mapped mpv seek command, see man mpv(1). """
self.command('script_message_to', target, *args)
def observe_property(self, name, handler, *, force_fmt=None):
def observe_property(self, name, handler):
""" Register an observer on the named property. An observer is a function that is called with the new property
value every time the property's value is changed. The basic function signature is ```fun(property_name,
new_value)``` with new_value being the decoded property value as a python object. This function can be used as a
function decorator if no handler is given.
By default, you'll get whatever return type you'd get if you asked the regular property access API. To override
this behavior, you can specify a forced return type from ```MpvFormat``` in force_fmt
To uunregister the observer, call either of ```mpv.unobserve_property(name, handler)```,
```mpv.unobserve_all_properties(handler)``` or the handler's ```unregister_mpv_properties``` attribute:
@ -706,15 +775,14 @@ class MPV(object):
my_handler.unregister_mpv_properties()
``` """
fmt = force_fmt or MpvFormat.NODE
self._property_handlers[name][fmt].append(handler)
_mpv_observe_property(self._event_handle, hash(name)&0xffffffffffffffff, name.encode('utf-8'), fmt)
self._property_handlers[name].append(handler)
_mpv_observe_property(self._event_handle, hash(name)&0xffffffffffffffff, name.encode('utf-8'), MpvFormat.NODE)
def property_observer(self, name, *, force_fmt=None):
def property_observer(self, name):
""" Function decorator to register a property observer. See ```MPV.observe_property``` for details. """
def wrapper(fun):
self.observe_property(name, fun, force_fmt=force_fmt)
fun.unobserve_mpv_properties = lambda: self.unobserve_property(None, fun)
self.observe_property(name, fun)
fun.unobserve_mpv_properties = lambda: self.unobserve_property(name, fun)
return fun
return wrapper
@ -722,19 +790,8 @@ class MPV(object):
""" Unregister a property observer. This requires both the observed property's name and the handler function
that was originally registered as one handler could be registered for several properties. To unregister a
handler from *all* observed properties see ```unobserve_all_properties```. """
fmts = self._property_handlers[name]
for fmt, handlers in fmts.items():
handlers.remove(handler)
# remove all properties that have no handlers
empty_props = [
fmt for fmt, handler in fmts.items() if not handler
]
for fmt in empty_props:
del fmts[fmt]
if not fmts:
self._property_handlers[name].remove(handler)
if not self._property_handlers[name]:
_mpv_unobserve_property(self._event_handle, hash(name)&0xffffffffffffffff)
def unobserve_all_properties(self, handler):
@ -934,54 +991,47 @@ class MPV(object):
self.loadfile(filename, 'append', **options)
# Property accessors
def _get_property(self, name, proptype=str, decode_str=False, force_format=None):
fmt = force_format or {int: MpvFormat.INT64,
float: MpvFormat.DOUBLE,
bool: MpvFormat.FLAG,
str: MpvFormat.STRING,
bytes: MpvFormat.STRING,
_commalist: MpvFormat.STRING,
MpvFormat.NODE: MpvFormat.NODE}[proptype]
def _get_property(self, name, decoder=strict_decoder, fmt=MpvFormat.NODE):
out = cast(create_string_buffer(sizeof(c_void_p)), c_void_p)
outptr = byref(out)
try:
cval = _mpv_get_property(self.handle, name.encode('utf-8'), fmt, outptr)
rv = MpvNode.node_cast_value(outptr, fmt, decode_str or proptype in (str, _commalist))
if proptype is _commalist:
rv = proptype(rv)
if proptype is str:
_mpv_free(out)
elif proptype is MpvFormat.NODE:
rv = MpvNode.node_cast_value(outptr, fmt, decoder)
if fmt is MpvFormat.NODE:
_mpv_free_node_contents(outptr)
return rv
except PropertyUnavailableError as ex:
return None
def _set_property(self, name, value, proptype=str):
def _set_property(self, name, value):
ename = name.encode('utf-8')
if proptype is MpvFormat.NODE:
if isinstance(value, (list, set, dict)):
_1, _2, _3, pointer = _make_node_str_list(value)
_mpv_set_property(self.handle, ename, MpvFormat.NODE, pointer)
else:
_mpv_set_property_string(self.handle, ename, _mpv_coax_proptype(value, str))
if isinstance(value, (list, set, dict)):
_1, _2, _3, pointer = _make_node_str_list(value)
_mpv_set_property(self.handle, ename, MpvFormat.NODE, pointer)
else:
_mpv_set_property_string(self.handle, ename, _mpv_coax_proptype(value, proptype))
_mpv_set_property_string(self.handle, ename, _mpv_coax_proptype(value))
def __getattr__(self, name):
name = name.replace('_', '-')
return self._get_property(name, lazy_decoder)
def __dir__(self):
return super().__dir__() + [ name.replace('-', '_') for name in self.property_list ]
@property
def properties(self):
return { name: self.option_info(name) for name in self.property_list }
# Dict-like option access
def __getitem__(self, name, file_local=False):
""" Get an option value """
prefix = 'file-local-options/' if file_local else 'options/'
return self._get_property(prefix+name, proptype=MpvFormat.NODE)
return self._get_property(prefix+name, lazy_decoder)
def __setitem__(self, name, value, file_local=False):
""" Set an option value """
prefix = 'file-local-options/' if file_local else 'options/'
return self._set_property(prefix+name, value, proptype=MpvFormat.NODE)
return self._set_property(prefix+name, value)
def __iter__(self):
""" Iterate over all option names """
@ -989,238 +1039,8 @@ class MPV(object):
def option_info(self, name):
""" Get information on the given option """
return self._get_property('option-info/'+name)
def _commalist(propval=''):
return str(propval).split(',')
_node = MpvFormat.NODE
ALL_PROPERTIES = {
'osd-level': (int, 'rw'),
'osd-scale': (float, 'rw'),
'loop': (str, 'rw'),
'loop-file': (str, 'rw'),
'speed': (float, 'rw'),
'filename': (bytes, 'r'),
'file-size': (int, 'r'),
'path': (bytes, 'r'),
'media-title': (bytes, 'r'),
'stream-pos': (int, 'rw'),
'stream-end': (int, 'r'),
'length': (float, 'r'), # deprecated for ages now
'duration': (float, 'r'),
'avsync': (float, 'r'),
'total-avsync-change': (float, 'r'),
# 'drop-frame-count': (int, 'r'),
'decoder-frame-drop-count': (int, 'r'),
'percent-pos': (float, 'rw'),
# 'ratio-pos': (float, 'rw'),
'audio-pts': (float, 'r'),
'time-pos': (float, 'rw'),
'time-start': (float, 'r'),
'time-remaining': (float, 'r'),
'playtime-remaining': (float, 'r'),
'chapter': (int, 'rw'),
'edition': (str, 'rw'),
'disc-titles': (int, 'r'),
'disc-title': (str, 'rw'),
# 'disc-menu-active': (bool, 'r'),
'chapters': (int, 'r'),
'editions': (int, 'r'),
'angle': (int, 'rw'),
'pause': (bool, 'rw'),
'core-idle': (bool, 'r'),
'cache': (str, 'r'),
'cache-size': (int, 'rw'),
'cache-free': (int, 'r'),
'cache-used': (int, 'r'),
'cache-speed': (int, 'r'),
'cache-idle': (bool, 'r'),
'cache-buffering-state': (int, 'r'),
'paused-for-cache': (bool, 'r'),
# 'pause-for-cache': (bool, 'r'),
'eof-reached': (bool, 'r'),
# 'pts-association-mode': (str, 'rw'),
'hr-seek': (str, 'rw'),
'volume': (float, 'rw'),
'volume-max': (int, 'rw'),
'ao-volume': (float, 'rw'),
'mute': (bool, 'rw'),
'ao-mute': (bool, 'rw'),
'audio-speed-correction': (float, 'r'),
'audio-delay': (float, 'rw'),
'audio-format': (str, 'r'),
'audio-codec': (str, 'r'),
'audio-codec-name': (str, 'r'),
'audio-bitrate': (float, 'r'),
'packet-audio-bitrate': (float, 'r'),
'audio-samplerate': (int, 'r'),
'audio-channels': (str, 'r'),
'aid': (str, 'rw'),
'audio': (str, 'rw'), # alias for aid
'balance': (int, 'rw'),
'fullscreen': (bool, 'rw'),
'deinterlace': (str, 'rw'),
'colormatrix': (str, 'rw'),
'colormatrix-input-range': (str, 'rw'),
# 'colormatrix-output-range': (str, 'rw'),
'colormatrix-primaries': (str, 'rw'),
'ontop': (bool, 'rw'),
'border': (bool, 'rw'),
'framedrop': (str, 'rw'),
'gamma': (float, 'rw'),
'brightness': (int, 'rw'),
'contrast': (int, 'rw'),
'saturation': (int, 'rw'),
'hue': (int, 'rw'),
'hwdec': (str, 'rw'),
'panscan': (float, 'rw'),
'video-format': (str, 'r'),
'video-codec': (str, 'r'),
'video-bitrate': (float, 'r'),
'packet-video-bitrate': (float, 'r'),
'width': (int, 'r'),
'height': (int, 'r'),
'dwidth': (int, 'r'),
'dheight': (int, 'r'),
'container-fps': (float, 'r'),
'estimated-vf-fps': (float, 'r'),
'window-scale': (float, 'rw'),
'video-aspect': (str, 'rw'),
'osd-width': (int, 'r'),
'osd-height': (int, 'r'),
'osd-par': (float, 'r'),
'vid': (str, 'rw'),
'video': (str, 'rw'), # alias for vid
'video-align-x': (float, 'rw'),
'video-align-y': (float, 'rw'),
'video-pan-x': (float, 'rw'),
'video-pan-y': (float, 'rw'),
'video-zoom': (float, 'rw'),
'video-unscaled': (bool, 'w'),
'video-speed-correction': (float, 'r'),
'program': (int, 'w'),
'sid': (str, 'rw'),
'sub': (str, 'rw'), # alias for sid
'secondary-sid': (str, 'rw'),
'sub-delay': (float, 'rw'),
'sub-pos': (int, 'rw'),
'sub-visibility': (bool, 'rw'),
'sub-forced-only': (bool, 'rw'),
'sub-scale': (float, 'rw'),
'sub-bitrate': (float, 'r'),
'sub-text': (str, 'r'),
'packet-sub-bitrate': (float, 'r'),
# 'ass-use-margins': (bool, 'rw'),
'ass-vsfilter-aspect-compat': (bool, 'rw'),
'ass-style-override': (str, 'rw'),
# 'stream-capture': (str, 'rw'),
'tv-brightness': (int, 'rw'),
'tv-contrast': (int, 'rw'),
'tv-saturation': (int, 'rw'),
'tv-hue': (int, 'rw'),
'tv-freq': (float, 'rw'),
'tv-norm': (str, 'rw'),
'tv-scan': (bool, 'rw'),
'tv-channel': (str, 'rw'),
'dvb-channel': (_node, 'rw'),
'dvb-channel-name': (str, 'rw'),
'playlist-pos': (int, 'rw'),
'playlist-pos-1': (int, 'rw'), # ugh.
'playlist-count': (int, 'r'),
# 'quvi-format': (str, 'rw'),
'seekable': (bool, 'r'),
'seeking': (bool, 'r'),
'partially-seekable': (bool, 'r'),
'playback-abort': (bool, 'r'),
'cursor-autohide': (str, 'rw'),
'audio-device': (str, 'rw'),
'current-vo': (str, 'r'),
'current-ao': (str, 'r'),
'audio-out-detected-device': (str, 'r'),
'protocol-list': (str, 'r'),
'mpv-version': (str, 'r'),
'mpv-configuration': (str, 'r'),
'ffmpeg-version': (str, 'r'),
'display-sync-active': (bool, 'r'),
'stream-open-filename': (bytes, 'rw'), # Undocumented
'file-format': (_commalist,'r'), # Be careful with this one.
'mistimed-frame-count': (int, 'r'),
'vsync-ratio': (float, 'r'),
# 'vo-drop-frame-count': (int, 'r'),
'frame-drop-count': (int, 'r'),
'vo-delayed-frame-count': (int, 'r'),
'playback-time': (float, 'rw'),
'demuxer-cache-duration': (float, 'r'),
'demuxer-cache-time': (float, 'r'),
'demuxer-cache-idle': (bool, 'r'),
'demuxer-start-time': (float, 'r'),
'demuxer-via-network': (bool, 'r'),
# 'idle': (bool, 'r'),
'idle-active': (bool, 'r'), # dat name
'disc-title-list': (_commalist,'r'),
'field-dominance': (str, 'rw'),
'taskbar-progress': (bool, 'rw'),
'on-all-workspaces': (bool, 'rw'),
'video-output-levels': (str, 'r'),
'vo-configured': (bool, 'r'),
'hwdec-current': (str, 'r'),
'hwdec-interop': (str, 'r'),
'estimated-frame-count': (int, 'r'),
'estimated-frame-number': (int, 'r'),
'sub-use-margins': (bool, 'rw'),
'ass-force-margins': (bool, 'rw'),
'video-rotate': (str, 'rw'),
'video-stereo-mode': (str, 'rw'),
'ab-loop-a': (str, 'r'), # What a mess...
'ab-loop-b': (str, 'r'),
'dvb-channel': (str, 'w'),
'dvb-channel-name': (str, 'rw'),
'window-minimized': (bool, 'r'),
'display-names': (_commalist, 'r'),
'display-fps': (float, 'r'), # access apparently misdocumented in the manpage
'estimated-display-fps': (float, 'r'),
'vsync-jitter': (float, 'r'),
'profile-list': (_node, 'r', False),
'video-params': (_node, 'r', True),
'video-dec-params': (_node, 'r', True),
'video-out-params': (_node, 'r', True),
'track-list': (_node, 'r', False),
'playlist': (_node, 'r', False),
'chapter-list': (_node, 'r', False),
'vo-performance': (_node, 'r', True),
'filtered-metadata': (_node, 'r', False),
'metadata': (_node, 'r', False),
'chapter-metadata': (_node, 'r', False),
'vf-metadata': (_node, 'r', False),
'af-metadata': (_node, 'r', False),
'edition-list': (_node, 'r', False),
'disc-titles': (_node, 'r', False),
'audio-params': (_node, 'r', True),
'audio-out-params': (_node, 'r', True),
'audio-device-list': (_node, 'r', True),
'video-frame-info': (_node, 'r', True),
'decoder-list': (_node, 'r', True),
'encoder-list': (_node, 'r', True),
'vf': (_node, 'r', True),
'af': (_node, 'r', True),
'options': (_node, 'r', True),
'file-local-options': (_node, 'r', True),
'vo-passes': (_node, 'r', True),
'property-list': (_commalist,'r')}
def _bindproperty(MPV, name, proptype, access, decode_str=False):
getter = lambda self: self._get_property(name, proptype, decode_str)
osdgetter = lambda osdself: osdself.mpv._get_property(name, force_format=MpvFormat.OSD_STRING)
setter = lambda self, value: self._set_property(name, value, proptype)
def barf(*args):
raise NotImplementedError('Access denied')
setattr(MPV, name.replace('-', '_'), property(getter if 'r' in access else barf, setter if 'w' in access else barf))
setattr(OSDPropertyProxy, name.replace('-', '_'), property(osdgetter if 'r' in access else barf, barf))
for name, (proptype, access, *args) in ALL_PROPERTIES.items():
_bindproperty(MPV, name, proptype, access, *args)
try:
return self._get_property('option-info/'+name)
except AttributeError:
return None