Add stream protocol handling

This allows you to directly feed bytes into mpv without going through a
file, FIFO etc. first. The new API is:

@player.register_stream_protocol(name)
@player.python_stream(name, size)
@player.python_stream_catchall

See their docstrings for their usage.
This commit is contained in:
jaseg 2019-11-30 21:15:57 +01:00
parent 3d61558714
commit 49e6f6f6d0
3 changed files with 365 additions and 6 deletions

265
mpv.py
View file

@ -72,6 +72,14 @@ class ErrorCode(object):
PROPERTY_UNAVAILABLE = -10
PROPERTY_ERROR = -11
COMMAND = -12
LOADING_FAILED = -13
AO_INIT_FAILED = -14
VO_INIT_FAILED = -15
NOTHING_TO_PLAY = -16
UNKNOWN_FORMAT = -17
UNSUPPORTED = -18
NOT_IMPLEMENTED = -19
GENERIC = -20
EXCEPTION_DICT = {
0: None,
@ -88,7 +96,17 @@ class ErrorCode(object):
-9: lambda *a: TypeError('Tried to get/set mpv property using wrong format, or passed invalid value', *a),
-10: lambda *a: PropertyUnavailableError('mpv property is not available', *a),
-11: lambda *a: RuntimeError('Generic error getting or setting mpv property', *a),
-12: lambda *a: SystemError('Error running mpv command', *a) }
-12: lambda *a: SystemError('Error running mpv command', *a),
-14: lambda *a: RuntimeError('Initializing the audio output failed', *a),
-15: lambda *a: RuntimeError('Initializing the video output failed'),
-16: lambda *a: RuntimeError('There was no audio or video data to play. This also happens if the file '
'was recognized, but did not contain any audio or video streams, or no '
'streams were selected.'),
-17: lambda *a: RuntimeError('When trying to load the file, the file format could not be determined, '
'or the file was too broken to open it'),
-18: lambda *a: ValueError('Generic error for signaling that certain system requirements are not fulfilled'),
-19: lambda *a: NotImplementedError('The API function which was called is a stub only'),
-20: lambda *a: RuntimeError('Unspecified error') }
@staticmethod
def default_error_handler(ec, *args):
@ -282,14 +300,24 @@ class MpvEventLogMessage(Structure):
'level': self.level.decode('utf-8'),
'text': decoder(self.text).rstrip() }
class MpvEventEndFile(c_int):
EOF_OR_INIT_FAILURE = 0
class MpvEventEndFile(Structure):
_fields_ = [('reason', c_int),
('error', c_int)]
EOF = 0
RESTARTED = 1
ABORTED = 2
QUIT = 3
ERROR = 4
REDIRECT = 5
# For backwards-compatibility
@property
def value(self):
return self.reason
def as_dict(self, decoder=identity_decoder):
return {'reason': self.value}
return {'reason': self.reason, 'error': self.error}
class MpvEventScriptInputDispatch(Structure):
_fields_ = [('arg0', c_int),
@ -305,6 +333,22 @@ class MpvEventClientMessage(Structure):
def as_dict(self, decoder=identity_decoder):
return { 'args': [ self.args[i].decode('utf-8') for i in range(self.num_args) ] }
StreamReadFn = CFUNCTYPE(c_int64, c_void_p, POINTER(c_char), c_uint64)
StreamSeekFn = CFUNCTYPE(c_int64, c_void_p, c_int64)
StreamSizeFn = CFUNCTYPE(c_int64, c_void_p)
StreamCloseFn = CFUNCTYPE(None, c_void_p)
StreamCancelFn = CFUNCTYPE(None, c_void_p)
class StreamCallbackInfo(Structure):
_fields_ = [('cookie', c_void_p),
('read', StreamReadFn),
('seek', StreamSeekFn),
('size', StreamSizeFn),
('close', StreamCloseFn), ]
# ('cancel', StreamCancelFn)]
StreamOpenFn = CFUNCTYPE(c_int, c_void_p, c_char_p, POINTER(StreamCallbackInfo))
WakeupCallback = CFUNCTYPE(None, c_void_p)
OpenGlCbUpdateFn = CFUNCTYPE(None, c_void_p)
@ -387,6 +431,8 @@ _handle_func('mpv_wakeup', [],
_handle_func('mpv_set_wakeup_callback', [WakeupCallback, c_void_p], None, errcheck=None)
_handle_func('mpv_get_wakeup_pipe', [], c_int, errcheck=None)
_handle_func('mpv_stream_cb_add_ro', [c_char_p, c_void_p, StreamOpenFn], c_int, ec_errcheck)
_handle_func('mpv_get_sub_api', [MpvSubApi], c_void_p, notnull_errcheck)
_handle_gl_func('mpv_opengl_cb_set_update_callback', [OpenGlCbUpdateFn, c_void_p])
@ -517,6 +563,40 @@ class _DecoderPropertyProxy(_PropertyProxy):
def __setattr__(self, name, value):
setattr(self.mpv, _py_to_mpv(name), value)
class GeneratorStream:
"""Transform a python generator into an mpv-compatible stream object. This only supports size() and read(), and
does not support seek(), close() or cancel().
"""
def __init__(self, generator_fun, size=None):
self._generator_fun = generator_fun
self.size = size
def seek(self, offset):
self._read_iter = iter(self._generator_fun())
self._read_chunk = b''
return 0 # We only support seeking to the first byte atm
# implementation in case seeking to arbitrary offsets would be necessary
# while offset > 0:
# offset -= len(self.read(offset))
# return offset
def read(self, size):
if not self._read_chunk:
try:
self._read_chunk += next(self._read_iter)
except StopIteration:
return b''
rv, self._read_chunk = self._read_chunk[:size], self._read_chunk[size:]
return rv
def close(self):
self._read_iter = iter([]) # make next read() call return EOF
def cancel(self):
self._read_iter = iter([]) # make next read() call return EOF
# TODO?
class MPV(object):
"""See man mpv(1) for the details of the implemented commands. All mpv properties can be accessed as
``my_mpv.some_property`` and all mpv options can be accessed as ``my_mpv['some-option']``.
@ -565,6 +645,11 @@ class MPV(object):
self._event_handle = _mpv_create_client(self.handle, b'py_event_handler')
self._loop = partial(_event_loop, self._event_handle, self._playback_cond, self._event_callbacks,
self._message_handlers, self._property_handlers, log_handler)
self._stream_protocol_cbs = {}
self._stream_protocol_frontends = collections.defaultdict(lambda: {})
self.register_stream_protocol('python', self._python_stream_open)
self._python_streams = {}
self._python_stream_catchall = None
if loglevel is not None or log_handler is not None:
self.set_loglevel(loglevel or 'terminal-default')
if start_event_thread:
@ -1028,6 +1113,87 @@ class MPV(object):
if not self._key_binding_handlers:
self.unregister_message_handler('key-binding')
def register_stream_protocol(self, proto, open_fn=None):
""" Register a custom stream protocol as documented in libmpv/stream_cb.h:
https://github.com/mpv-player/mpv/blob/master/libmpv/stream_cb.h
proto is the protocol scheme, e.g. "foo" for "foo://" urls.
This function can either be used with two parameters or it can be used as a decorator on the target
function.
open_fn is a function taking an URI string and returning an mpv stream object.
open_fn may raise a ValueError to signal libmpv the URI could not be opened.
The mpv stream protocol is as follows:
class Stream:
@property
def size(self):
return None # unknown size
return size # int with size in bytes
def read(self, size):
...
return read # non-empty bytes object with input
return b'' # empty byte object signals permanent EOF
def seek(self, pos):
return new_offset # integer with new byte offset. The new offset may be before the requested offset
in case an exact seek is inconvenient.
def close(self):
...
# def cancel(self): (future API versions only)
# Abort a running read() or seek() operation
# ...
"""
def decorator(open_fn):
@StreamOpenFn
def open_backend(_userdata, uri, cb_info):
try:
frontend = open_fn(uri.decode('utf-8'))
except ValueError:
return ErrorCode.LOADING_FAILED
def read_backend(_userdata, buf, bufsize):
data = frontend.read(bufsize)
for i in range(len(data)):
buf[i] = data[i]
return len(data)
cb_info.contents.cookie = None
read = cb_info.contents.read = StreamReadFn(read_backend)
close = cb_info.contents.close = StreamCloseFn(lambda _userdata: frontend.close())
seek, size, cancel = None, None, None
if hasattr(frontend, 'seek'):
seek = cb_info.contents.seek = StreamSeekFn(lambda _userdata, offx: frontend.seek(offx))
if hasattr(frontend, 'size') and frontend.size is not None:
size = cb_info.contents.size = StreamSizeFn(lambda _userdata: frontend.size)
# Future API versions only
# if hasattr(frontend, 'cancel'):
# cb_info.contents.cancel = StreamCancelFn(lambda _userdata: frontend.cancel())
# keep frontend and callbacks in memory forever (TODO)
frontend._registered_callbacks = [read, close, seek, size, cancel]
self._stream_protocol_frontends[proto][uri] = frontend
return 0
if proto in self._stream_protocol_cbs:
raise KeyError('Stream protocol already registered')
self._stream_protocol_cbs[proto] = [open_backend]
_mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend)
return open_fn
if open_fn is not None:
decorator(open_fn)
return decorator
# Convenience functions
def play(self, filename):
"""Play a path or URL (requires ``ytdl`` option to be set)."""
@ -1043,6 +1209,97 @@ class MPV(object):
``MPV.loadfile(filename, 'append-play')``."""
self.loadfile(filename, 'append', **options)
# "Python stream" logic. This is some porcelain for directly playing data from python generators.
def _python_stream_open(self, uri):
"""Internal handler for python:// protocol streams registered through @python_stream(...) and
@python_stream_catchall
"""
name, = re.fullmatch('python://(.*)', uri).groups()
if name in self._python_streams:
generator_fun, size = self._python_streams[name]
else:
if self._python_stream_catchall is not None:
generator_fun, size = self._python_stream_catchall(name)
else:
raise ValueError('Python stream name not found and no catch-all defined')
return GeneratorStream(generator_fun, size)
def python_stream(self, name=None, size=None):
"""Register a generator for the python stream with the given name.
name is the name, i.e. the part after the "python://" in the URI, that this generator is registered as.
size is the total number of bytes in the stream (if known).
Any given name can only be registered once. The catch-all can also only be registered once. To unregister a
stream, call the .unregister function set on the callback.
The generator signals EOF by returning, manually raising StopIteration or by yielding b'', an empty bytes
object.
The generator may be called multiple times if libmpv seeks or loops.
See also: @mpv.python_stream_catchall
@mpv.python_stream('foobar')
def reader():
for chunk in chunks:
yield chunk
mpv.play('python://foobar')
mpv.wait_for_playback()
reader.unregister()
"""
def register(cb):
if name in self._python_streams:
raise KeyError(f'Python stream name "{name}" is already registered')
self._python_streams[name] = (cb, size)
def unregister():
if name not in self._python_streams or\
self._python_streams[name][0] is not cb: # This is just a basic sanity check
raise RuntimeError('Python stream has already been unregistered')
del self._python_streams[name]
cb.unregister = unregister
return cb
return register
def python_stream_catchall(self, cb):
""" Register a catch-all python stream to be called when no name matches can be found. Use this decorator on a
function that takes a name argument and returns a (generator, size) tuple (with size being None if unknown).
An invalid URI can be signalled to libmpv by raising a ValueError inside the callback.
See also: @mpv.python_stream(name, size)
@mpv.python_stream_catchall
def catchall(name):
if not name.startswith('foo'):
raise ValueError('Unknown Name')
def foo_reader():
with open(name, 'rb') as f:
while True:
chunk = f.read(1024)
if not chunk:
break
yield chunk
return foo_reader, None
mpv.play('python://foo23')
mpv.wait_for_playback()
catchall.unregister()
"""
if self._python_stream_catchall is not None:
raise KeyError('A catch-all python stream is already registered')
self._python_stream_catchall = cb
def unregister():
if self._python_stream_catchall is not cb:
raise RuntimeError('This catch-all python stream has already been unregistered')
self._python_stream_catchall = None
cb.unregister = unregister
return cb
# Property accessors
def _get_property(self, name, decoder=strict_decoder, fmt=MpvFormat.NODE):
out = create_string_buffer(sizeof(MpvNode))