Move test files into a 'tests' dir
This commit is contained in:
parent
e29e042998
commit
2b66d0d97b
5 changed files with 1 additions and 1 deletions
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
9
tests/sub_test.srt
Normal file
9
tests/sub_test.srt
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
1
|
||||
00:00:00,500 --> 00:00:01,000
|
||||
This is
|
||||
a subtitle test.
|
||||
|
||||
2
|
||||
00:00:01,000 --> 00:00:02,000
|
||||
This is the second subtitle line.
|
||||
|
||||
BIN
tests/test.webm
Normal file
BIN
tests/test.webm
Normal file
Binary file not shown.
752
tests/test_mpv.py
Executable file
752
tests/test_mpv.py
Executable file
|
|
@ -0,0 +1,752 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
# vim: ts=4 sw=4 et
|
||||
|
||||
import unittest
|
||||
from unittest import mock
|
||||
import math
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
import gc
|
||||
import os.path
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import io
|
||||
import platform
|
||||
import ctypes
|
||||
|
||||
import mpv
|
||||
|
||||
from xvfbwrapper import Xvfb
|
||||
|
||||
|
||||
# stdout magic to suppress useless libmpv warning messages in unittest output
|
||||
# See https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
|
||||
@contextmanager
|
||||
def devnull_libmpv():
|
||||
""" Redirect libmpv stdout into /dev/null while still allowing python to print to stdout as usual """
|
||||
if platform.system() != 'Linux':
|
||||
# This is only a linux-specific convenience function.
|
||||
yield
|
||||
return
|
||||
|
||||
libc = ctypes.CDLL("libc.so.6")
|
||||
|
||||
stderr_fd, stdout_fd = sys.stderr.fileno(), sys.stdout.fileno()
|
||||
sys.stderr.flush()
|
||||
sys.stdout.flush()
|
||||
libc.fflush(None)
|
||||
|
||||
# Preserve a copy so python can continue printing
|
||||
stderr_copy, stdout_copy = os.dup(stderr_fd), os.dup(stdout_fd)
|
||||
sys.stderr = io.TextIOWrapper(open(stderr_copy, 'wb', closefd=False), write_through=True)
|
||||
sys.stdout = io.TextIOWrapper(open(stdout_copy, 'wb', closefd=False))
|
||||
with open(os.devnull, 'w') as devnull:
|
||||
os.dup2(devnull.fileno(), stderr_fd) # closes old stderr
|
||||
os.dup2(devnull.fileno(), stdout_fd) # closes old stdout
|
||||
|
||||
yield
|
||||
|
||||
sys.stderr.flush()
|
||||
sys.stdout.flush()
|
||||
libc.fflush(None)
|
||||
os.dup2(stderr_copy, stderr_fd)
|
||||
os.dup2(stdout_copy, stdout_fd)
|
||||
os.close(stderr_copy)
|
||||
os.close(stdout_copy)
|
||||
sys.stderr = io.TextIOWrapper(open(stderr_fd, 'wb', closefd=False), write_through=True)
|
||||
sys.stdout = io.TextIOWrapper(open(stdout_fd, 'wb', closefd=False))
|
||||
|
||||
TESTVID = os.path.join(os.path.dirname(__file__), 'test.webm')
|
||||
MPV_ERRORS = [ l(ec) for ec, l in mpv.ErrorCode.EXCEPTION_DICT.items() if l ]
|
||||
|
||||
class MpvTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
self.m = mpv.MPV(vo='x11')
|
||||
|
||||
def tearDown(self):
|
||||
self.m.terminate()
|
||||
self.disp.stop()
|
||||
|
||||
class TestProperties(MpvTestCase):
|
||||
@contextmanager
|
||||
def swallow_mpv_errors(self, exception_exceptions=[]):
|
||||
try:
|
||||
yield
|
||||
except Exception as e:
|
||||
if any(e.args[:2] == ex.args for ex in MPV_ERRORS):
|
||||
if e.args[1] not in exception_exceptions:
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_read(self):
|
||||
self.m.loop = 'inf'
|
||||
self.m.play(TESTVID)
|
||||
while self.m.core_idle:
|
||||
time.sleep(0.05)
|
||||
for name in sorted(self.m.property_list):
|
||||
name = name.replace('-', '_')
|
||||
with self.subTest(property_name=name), self.swallow_mpv_errors([
|
||||
mpv.ErrorCode.PROPERTY_UNAVAILABLE,
|
||||
mpv.ErrorCode.PROPERTY_ERROR,
|
||||
mpv.ErrorCode.PROPERTY_NOT_FOUND]):
|
||||
getattr(self.m, name)
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_write(self):
|
||||
self.m.loop = 'inf'
|
||||
self.m.play(TESTVID)
|
||||
while self.m.core_idle:
|
||||
time.sleep(0.05)
|
||||
check_canaries = lambda: os.path.exists('100') or os.path.exists('foo')
|
||||
for name in sorted(self.m.property_list):
|
||||
# See issue #108 and upstream mpv issues #7919 and #7920.
|
||||
if name in ('demuxer', 'audio-demuxer', 'audio-files'):
|
||||
continue
|
||||
# These may cause files to be created
|
||||
if name in ('external-file', 'heartbeat-cmd', 'wid', 'dump-stats', 'log-file') or name.startswith('input-'):
|
||||
continue
|
||||
name = name.replace('-', '_')
|
||||
old_canaries = check_canaries()
|
||||
with self.subTest(property_name=name), self.swallow_mpv_errors([
|
||||
mpv.ErrorCode.PROPERTY_UNAVAILABLE,
|
||||
mpv.ErrorCode.PROPERTY_ERROR,
|
||||
mpv.ErrorCode.PROPERTY_FORMAT,
|
||||
mpv.ErrorCode.PROPERTY_NOT_FOUND]): # This is due to a bug with option-mapped properties in mpv 0.18.1
|
||||
setattr(self.m, name, 100)
|
||||
setattr(self.m, name, 1)
|
||||
setattr(self.m, name, 0)
|
||||
setattr(self.m, name, -1)
|
||||
setattr(self.m, name, 1)
|
||||
setattr(self.m, name, 1.0)
|
||||
setattr(self.m, name, 0.0)
|
||||
setattr(self.m, name, -1.0)
|
||||
setattr(self.m, name, float('nan'))
|
||||
setattr(self.m, name, 'foo')
|
||||
setattr(self.m, name, '')
|
||||
setattr(self.m, name, 'bazbazbaz'*1000)
|
||||
setattr(self.m, name, b'foo')
|
||||
setattr(self.m, name, b'')
|
||||
setattr(self.m, name, b'bazbazbaz'*1000)
|
||||
setattr(self.m, name, True)
|
||||
setattr(self.m, name, False)
|
||||
if not old_canaries and check_canaries():
|
||||
raise UserWarning('Property test for {} produced files on file system, might not be safe.'.format(name))
|
||||
|
||||
def test_property_bounce(self):
|
||||
self.m.aid = False
|
||||
self.assertEqual(self.m.audio, False)
|
||||
self.m.aid = 'auto'
|
||||
self.assertEqual(self.m.audio, 'auto')
|
||||
self.m.aid = 'no'
|
||||
self.assertEqual(self.m.audio, False)
|
||||
self.m.audio = 'auto'
|
||||
self.assertEqual(self.m.aid, 'auto')
|
||||
self.m.audio = False
|
||||
self.assertEqual(self.m.aid, False)
|
||||
self.m.audio = 'auto'
|
||||
self.assertEqual(self.m.aid, 'auto')
|
||||
self.m.audio = 'no'
|
||||
self.assertEqual(self.m.aid, False)
|
||||
|
||||
def test_array_property_bounce(self):
|
||||
self.m.alang = 'en'
|
||||
self.assertEqual(self.m.alang, ['en'])
|
||||
self.m.alang = 'de'
|
||||
self.assertEqual(self.m.alang, ['de'])
|
||||
self.m.alang = ['de', 'en']
|
||||
self.assertEqual(self.m.alang, ['de', 'en'])
|
||||
self.m.alang = 'de,en'
|
||||
self.assertEqual(self.m.alang, ['de', 'en'])
|
||||
self.m.alang = ['de,en']
|
||||
self.assertEqual(self.m.alang, ['de,en'])
|
||||
|
||||
def test_osd_property_bounce(self):
|
||||
self.m.alang = ['en']
|
||||
self.assertEqual(self.m.osd.alang, 'en')
|
||||
self.m.alang = ['de']
|
||||
self.assertEqual(self.m.osd.alang, 'de')
|
||||
self.m.alang = ['en', 'de']
|
||||
self.assertEqual(self.m.osd.alang, 'en,de')
|
||||
|
||||
def test_raw_property_bounce(self):
|
||||
self.m.alang = 'en'
|
||||
self.assertEqual(self.m.raw.alang, [b'en'])
|
||||
self.m.alang = 'de'
|
||||
self.assertEqual(self.m.raw.alang, [b'de'])
|
||||
self.m.alang = ['de', 'en']
|
||||
self.assertEqual(self.m.raw.alang, [b'de', b'en'])
|
||||
self.m.alang = 'de,en'
|
||||
self.assertEqual(self.m.raw.alang, [b'de', b'en'])
|
||||
self.m.alang = ['de,en']
|
||||
self.assertEqual(self.m.raw.alang, [b'de,en'])
|
||||
|
||||
def test_property_decoding_invalid_utf8(self):
|
||||
invalid_utf8 = b'foo\xc3\x28bar'
|
||||
self.m.alang = invalid_utf8
|
||||
self.assertEqual(self.m.raw.alang, [invalid_utf8])
|
||||
with self.assertRaises(UnicodeDecodeError):
|
||||
self.m.strict.alang
|
||||
with self.assertRaises(UnicodeDecodeError):
|
||||
# alang is considered safe and pasted straight into the OSD string. But OSD strings should always be valid
|
||||
# UTF-8. This test may be removed in case OSD encoding sanitization is handled differently in the future.
|
||||
self.m.osd.alang
|
||||
|
||||
def test_property_decoding_valid_utf8(self):
|
||||
valid_utf8 = 'pröpérty'
|
||||
self.m.alang = valid_utf8
|
||||
self.assertEqual(self.m.alang, [valid_utf8])
|
||||
self.assertEqual(self.m.raw.alang, [valid_utf8.encode('utf-8')])
|
||||
self.assertEqual(self.m.osd.alang, valid_utf8)
|
||||
self.assertEqual(self.m.strict.alang, [valid_utf8])
|
||||
|
||||
def test_property_decoding_multi(self):
|
||||
valid_utf8 = 'pröpérty'
|
||||
invalid_utf8 = b'foo\xc3\x28bar'
|
||||
self.m.alang = [valid_utf8, 'foo', invalid_utf8]
|
||||
self.assertEqual(self.m.alang, [valid_utf8, 'foo', invalid_utf8])
|
||||
self.assertEqual(self.m.raw.alang, [valid_utf8.encode('utf-8'), b'foo', invalid_utf8])
|
||||
with self.assertRaises(UnicodeDecodeError):
|
||||
self.m.strict.alang
|
||||
with self.assertRaises(UnicodeDecodeError):
|
||||
# See comment in test_property_decoding_invalid_utf8
|
||||
self.m.osd.alang
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_option_read(self):
|
||||
self.m.loop = 'inf'
|
||||
self.m.play(TESTVID)
|
||||
while self.m.core_idle:
|
||||
time.sleep(0.05)
|
||||
for name in sorted(self.m):
|
||||
with self.subTest(option_name=name), self.swallow_mpv_errors([
|
||||
mpv.ErrorCode.PROPERTY_UNAVAILABLE, mpv.ErrorCode.PROPERTY_NOT_FOUND, mpv.ErrorCode.PROPERTY_ERROR]):
|
||||
self.m[name]
|
||||
|
||||
def test_multivalued_option(self):
|
||||
self.m['external-files'] = ['test.webm', b'test.webm']
|
||||
self.assertEqual(self.m['external-files'], ['test.webm', 'test.webm'])
|
||||
|
||||
|
||||
class ObservePropertyTest(MpvTestCase):
|
||||
@devnull_libmpv()
|
||||
def test_observe_property(self):
|
||||
handler = mock.Mock()
|
||||
|
||||
m = self.m
|
||||
m.observe_property('vid', handler)
|
||||
|
||||
time.sleep(0.1)
|
||||
m.play(TESTVID)
|
||||
|
||||
time.sleep(0.5) #couple frames
|
||||
m.unobserve_property('vid', handler)
|
||||
|
||||
time.sleep(0.1) #couple frames
|
||||
m.terminate() # needed for synchronization of event thread
|
||||
handler.assert_has_calls([mock.call('vid', 'auto')])
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_property_observer_decorator(self):
|
||||
handler = mock.Mock()
|
||||
|
||||
m = self.m
|
||||
m.play(TESTVID)
|
||||
|
||||
m.slang = 'ru'
|
||||
m.mute = True
|
||||
|
||||
@m.property_observer('mute')
|
||||
@m.property_observer('slang')
|
||||
def foo(*args, **kwargs):
|
||||
handler(*args, **kwargs)
|
||||
|
||||
m.mute = False
|
||||
m.slang = 'jp'
|
||||
self.assertEqual(m.mute, False)
|
||||
self.assertEqual(m.slang, ['jp'])
|
||||
|
||||
# Wait for tick. AFAICT property events are only generated at regular
|
||||
# intervals, and if we change a property too fast we don't get any
|
||||
# events. This is a limitation of the upstream API.
|
||||
time.sleep(0.1)
|
||||
# Another API limitation is that the order of property change events on
|
||||
# different properties does not necessarily exactly match the order in
|
||||
# which these properties were previously accessed. Thus, any_order.
|
||||
handler.assert_has_calls([
|
||||
mock.call('mute', False),
|
||||
mock.call('slang', ['jp'])],
|
||||
any_order=True)
|
||||
handler.reset_mock()
|
||||
|
||||
m.mute = True
|
||||
m.slang = 'ru'
|
||||
self.assertEqual(m.mute, True)
|
||||
self.assertEqual(m.slang, ['ru'])
|
||||
|
||||
time.sleep(0.05)
|
||||
foo.unobserve_mpv_properties()
|
||||
|
||||
m.mute = False
|
||||
m.slang = 'jp'
|
||||
m.mute = True
|
||||
m.slang = 'ru'
|
||||
m.terminate() # needed for synchronization of event thread
|
||||
handler.assert_has_calls([
|
||||
mock.call('mute', True),
|
||||
mock.call('slang', ['ru'])],
|
||||
any_order=True)
|
||||
|
||||
class KeyBindingTest(MpvTestCase):
|
||||
def test_register_direct_cmd(self):
|
||||
self.m.register_key_binding('a', 'playlist-clear')
|
||||
self.assertEqual(self.m._key_binding_handlers, {})
|
||||
self.m.register_key_binding('Ctrl+Shift+a', 'playlist-clear')
|
||||
self.m.unregister_key_binding('a')
|
||||
self.m.unregister_key_binding('Ctrl+Shift+a')
|
||||
|
||||
def test_register_direct_fun(self):
|
||||
b = mpv.MPV._binding_name
|
||||
|
||||
def reg_test_fun(state, name, char):
|
||||
pass
|
||||
|
||||
self.m.register_key_binding('a', reg_test_fun)
|
||||
self.assertIn(b('a'), self.m._key_binding_handlers)
|
||||
self.assertEqual(self.m._key_binding_handlers[b('a')], reg_test_fun)
|
||||
|
||||
self.m.unregister_key_binding('a')
|
||||
self.assertNotIn(b('a'), self.m._key_binding_handlers)
|
||||
|
||||
def test_register_direct_bound_method(self):
|
||||
b = mpv.MPV._binding_name
|
||||
|
||||
class RegTestCls:
|
||||
def method(self, state, name, char):
|
||||
pass
|
||||
instance = RegTestCls()
|
||||
|
||||
self.m.register_key_binding('a', instance.method)
|
||||
self.assertIn(b('a'), self.m._key_binding_handlers)
|
||||
self.assertEqual(self.m._key_binding_handlers[b('a')], instance.method)
|
||||
|
||||
self.m.unregister_key_binding('a')
|
||||
self.assertNotIn(b('a'), self.m._key_binding_handlers)
|
||||
|
||||
def test_register_decorator_fun(self):
|
||||
b = mpv.MPV._binding_name
|
||||
|
||||
@self.m.key_binding('a')
|
||||
def reg_test_fun(state, name, char):
|
||||
pass
|
||||
self.assertEqual(reg_test_fun.mpv_key_bindings, ['a'])
|
||||
self.assertIn(b('a'), self.m._key_binding_handlers)
|
||||
self.assertEqual(self.m._key_binding_handlers[b('a')], reg_test_fun)
|
||||
|
||||
reg_test_fun.unregister_mpv_key_bindings()
|
||||
self.assertNotIn(b('a'), self.m._key_binding_handlers)
|
||||
|
||||
def test_register_decorator_fun_chaining(self):
|
||||
b = mpv.MPV._binding_name
|
||||
|
||||
@self.m.key_binding('a')
|
||||
@self.m.key_binding('b')
|
||||
def reg_test_fun(state, name, char):
|
||||
pass
|
||||
|
||||
@self.m.key_binding('c')
|
||||
def reg_test_fun_2_stay_intact(state, name, char):
|
||||
pass
|
||||
|
||||
self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a'])
|
||||
self.assertIn(b('a'), self.m._key_binding_handlers)
|
||||
self.assertIn(b('b'), self.m._key_binding_handlers)
|
||||
self.assertIn(b('c'), self.m._key_binding_handlers)
|
||||
self.assertEqual(self.m._key_binding_handlers[b('a')], reg_test_fun)
|
||||
self.assertEqual(self.m._key_binding_handlers[b('b')], reg_test_fun)
|
||||
|
||||
reg_test_fun.unregister_mpv_key_bindings()
|
||||
self.assertNotIn(b('a'), self.m._key_binding_handlers)
|
||||
self.assertNotIn(b('b'), self.m._key_binding_handlers)
|
||||
self.assertIn(b('c'), self.m._key_binding_handlers)
|
||||
|
||||
def test_register_simple_decorator_fun_chaining(self):
|
||||
self.m.loop = 'inf'
|
||||
self.m.play(TESTVID)
|
||||
self.m.wait_until_playing()
|
||||
|
||||
handler1, handler2 = mock.Mock(), mock.Mock()
|
||||
|
||||
@self.m.on_key_press('a')
|
||||
@self.m.on_key_press('b')
|
||||
def reg_test_fun(*args, **kwargs):
|
||||
handler1(*args, **kwargs)
|
||||
|
||||
@self.m.on_key_press('c')
|
||||
def reg_test_fun_2_stay_intact(*args, **kwargs):
|
||||
handler2(*args, **kwargs)
|
||||
|
||||
self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a'])
|
||||
|
||||
def keypress_and_sync(key):
|
||||
with self.m.prepare_and_wait_for_event('client_message'):
|
||||
self.m.keypress(key)
|
||||
|
||||
keypress_and_sync('a')
|
||||
handler1.assert_has_calls([ mock.call() ])
|
||||
handler2.assert_has_calls([])
|
||||
handler1.reset_mock()
|
||||
|
||||
self.m.keypress('x')
|
||||
self.m.keypress('X')
|
||||
keypress_and_sync('b')
|
||||
handler1.assert_has_calls([ mock.call() ])
|
||||
handler2.assert_has_calls([])
|
||||
handler1.reset_mock()
|
||||
|
||||
keypress_and_sync('c')
|
||||
self.m.keypress('B')
|
||||
handler1.assert_has_calls([])
|
||||
handler2.assert_has_calls([ mock.call() ])
|
||||
handler2.reset_mock()
|
||||
|
||||
reg_test_fun.unregister_mpv_key_bindings()
|
||||
self.m.keypress('a')
|
||||
keypress_and_sync('c')
|
||||
self.m.keypress('x')
|
||||
self.m.keypress('A')
|
||||
handler1.assert_has_calls([])
|
||||
handler2.assert_has_calls([ mock.call() ])
|
||||
|
||||
class TestStreams(unittest.TestCase):
|
||||
@devnull_libmpv()
|
||||
def test_python_stream(self):
|
||||
handler = mock.Mock()
|
||||
|
||||
disp = Xvfb()
|
||||
disp.start()
|
||||
m = mpv.MPV()
|
||||
m.register_event_callback(handler)
|
||||
|
||||
@m.python_stream('foo')
|
||||
def foo_gen():
|
||||
with open(TESTVID, 'rb') as f:
|
||||
yield f.read()
|
||||
|
||||
@m.python_stream('bar')
|
||||
def bar_gen():
|
||||
yield b''
|
||||
|
||||
m.play('python://foo')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.EOF, 'error': mpv.ErrorCode.SUCCESS}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.play('python://bar')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.play('python://baz')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.play('foo://foo')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
|
||||
handler.reset_mock()
|
||||
|
||||
foo_gen.unregister()
|
||||
|
||||
m.play('python://foo')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.play('python://bar')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.terminate()
|
||||
disp.stop()
|
||||
|
||||
def test_custom_stream(self):
|
||||
handler = mock.Mock()
|
||||
fail_mock = mock.Mock(side_effect=ValueError)
|
||||
stream_mock = mock.Mock()
|
||||
stream_mock.seek = mock.Mock(return_value=0)
|
||||
stream_mock.read = mock.Mock(return_value=b'')
|
||||
|
||||
disp = Xvfb()
|
||||
disp.start()
|
||||
m = mpv.MPV(video=False)
|
||||
m.register_event_callback(handler)
|
||||
|
||||
m.register_stream_protocol('pythonfail', fail_mock)
|
||||
|
||||
@m.register_stream_protocol('pythonsuccess')
|
||||
def open_fn(uri):
|
||||
self.assertEqual(uri, 'pythonsuccess://foo')
|
||||
return stream_mock
|
||||
|
||||
m.play('pythondoesnotexist://foo')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.play('pythonfail://foo')
|
||||
m.wait_for_playback()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
|
||||
handler.reset_mock()
|
||||
|
||||
m.play('pythonsuccess://foo')
|
||||
m.wait_for_playback()
|
||||
stream_mock.seek.assert_any_call(0)
|
||||
stream_mock.read.assert_called()
|
||||
handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
|
||||
|
||||
m.terminate()
|
||||
disp.stop()
|
||||
|
||||
class TestLifecycle(unittest.TestCase):
|
||||
def test_create_destroy(self):
|
||||
thread_names = lambda: [ t.name for t in threading.enumerate() ]
|
||||
self.assertNotIn('MPVEventHandlerThread', thread_names())
|
||||
m = mpv.MPV()
|
||||
self.assertIn('MPVEventHandlerThread', thread_names())
|
||||
m.terminate()
|
||||
self.assertNotIn('MPVEventHandlerThread', thread_names())
|
||||
|
||||
def test_flags(self):
|
||||
with self.assertRaises(AttributeError):
|
||||
mpv.MPV('this-option-does-not-exist')
|
||||
m = mpv.MPV('cursor-autohide-fs-only', 'fs', video=False)
|
||||
self.assertTrue(m.fullscreen)
|
||||
self.assertEqual(m.cursor_autohide, 1000)
|
||||
m.terminate()
|
||||
|
||||
def test_options(self):
|
||||
with self.assertRaises(AttributeError):
|
||||
mpv.MPV(this_option_does_not_exists=23)
|
||||
m = mpv.MPV(osd_level=0, loop='inf', deinterlace=False)
|
||||
self.assertEqual(m.osd_level, 0)
|
||||
# For compatibility with mpv master (v0.32.0-585-gfba1c681b8) accept both
|
||||
self.assertIn(m.loop, ['inf', True])
|
||||
self.assertEqual(m.deinterlace, False)
|
||||
m.terminate()
|
||||
|
||||
def test_event_callback(self):
|
||||
handler = mock.Mock()
|
||||
m = mpv.MPV(video=False)
|
||||
m.register_event_callback(handler)
|
||||
m.play(TESTVID)
|
||||
m.wait_for_playback()
|
||||
|
||||
m.unregister_event_callback(handler)
|
||||
handler.assert_has_calls([
|
||||
mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 6, 'event': None}),
|
||||
mock.call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.NOTHING_TO_PLAY}})
|
||||
], any_order=True)
|
||||
handler.reset_mock()
|
||||
|
||||
m.terminate()
|
||||
handler.assert_not_called()
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_wait_for_property_negative(self):
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
m = mpv.MPV()
|
||||
m.play(TESTVID)
|
||||
def run():
|
||||
nonlocal self
|
||||
try:
|
||||
m.wait_for_property('mute')
|
||||
self.fail()
|
||||
except mpv.ShutdownError:
|
||||
pass
|
||||
t = threading.Thread(target=run, daemon=True)
|
||||
t.start()
|
||||
time.sleep(1)
|
||||
m.terminate()
|
||||
t.join()
|
||||
self.disp.stop()
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_wait_for_property_positive(self):
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
handler = mock.Mock()
|
||||
m = mpv.MPV()
|
||||
m.play(TESTVID)
|
||||
def run():
|
||||
nonlocal self
|
||||
m.wait_for_property('mute')
|
||||
handler()
|
||||
t = threading.Thread(target=run, daemon=True)
|
||||
t.start()
|
||||
m.wait_until_playing()
|
||||
m.mute = True
|
||||
t.join()
|
||||
m.terminate()
|
||||
handler.assert_called()
|
||||
self.disp.stop()
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_wait_for_event(self):
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
handler = mock.Mock()
|
||||
m = mpv.MPV()
|
||||
m.play(TESTVID)
|
||||
def run():
|
||||
nonlocal self
|
||||
try:
|
||||
m.wait_for_event('seek')
|
||||
self.fail()
|
||||
except mpv.ShutdownError:
|
||||
pass
|
||||
t = threading.Thread(target=run, daemon=True)
|
||||
t.start()
|
||||
time.sleep(1)
|
||||
m.terminate()
|
||||
t.join()
|
||||
self.disp.stop()
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_wait_for_property_shutdown(self):
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
handler = mock.Mock()
|
||||
m = mpv.MPV()
|
||||
m.play(TESTVID)
|
||||
with self.assertRaises(mpv.ShutdownError):
|
||||
# level_sensitive=false needed to prevent get_property on dead
|
||||
# handle
|
||||
with m.prepare_and_wait_for_property('mute', level_sensitive=False):
|
||||
m.terminate()
|
||||
self.disp.stop()
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_wait_for_event_shutdown(self):
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
handler = mock.Mock()
|
||||
m = mpv.MPV()
|
||||
m.play(TESTVID)
|
||||
with self.assertRaises(mpv.ShutdownError):
|
||||
with m.prepare_and_wait_for_event('seek'):
|
||||
m.terminate()
|
||||
self.disp.stop()
|
||||
|
||||
@devnull_libmpv()
|
||||
def test_log_handler(self):
|
||||
handler = mock.Mock()
|
||||
self.disp = Xvfb()
|
||||
self.disp.start()
|
||||
m = mpv.MPV(log_handler=handler)
|
||||
m.play(TESTVID)
|
||||
# Wait for playback to start
|
||||
m.wait_until_playing()
|
||||
m.command("print-text", 'This is a python-mpv test')
|
||||
m.wait_for_playback()
|
||||
m.terminate()
|
||||
for call in handler.mock_calls:
|
||||
_1, (a, b, c), _2 = call
|
||||
if a == 'info' and b == 'cplayer' and 'This is a python-mpv test' in c:
|
||||
break
|
||||
else:
|
||||
self.fail('"Test log entry not found in log handler calls: '+','.join(repr(call) for call in handler.mock_calls))
|
||||
self.disp.stop()
|
||||
|
||||
|
||||
class CommandTests(MpvTestCase):
|
||||
|
||||
def test_loadfile_with_subtitles(self):
|
||||
handler = mock.Mock()
|
||||
self.m.property_observer('sub-text')(handler)
|
||||
|
||||
self.m.loadfile(TESTVID, sub_file='sub_test.srt')
|
||||
|
||||
self.m.wait_for_playback()
|
||||
handler.assert_any_call('sub-text', 'This is\na subtitle test.')
|
||||
handler.assert_any_call('sub-text', 'This is the second subtitle line.')
|
||||
|
||||
def test_sub_add(self):
|
||||
handler = mock.Mock()
|
||||
self.m.property_observer('sub-text')(handler)
|
||||
|
||||
self.m.loadfile(TESTVID)
|
||||
self.m.wait_until_playing()
|
||||
self.m.sub_add('sub_test.srt')
|
||||
|
||||
self.m.wait_for_playback()
|
||||
handler.assert_any_call('sub-text', 'This is\na subtitle test.')
|
||||
handler.assert_any_call('sub-text', 'This is the second subtitle line.')
|
||||
|
||||
|
||||
class RegressionTests(MpvTestCase):
|
||||
|
||||
def test_unobserve_property_runtime_error(self):
|
||||
"""
|
||||
Ensure a `RuntimeError` is not thrown within
|
||||
`unobserve_property`.
|
||||
"""
|
||||
handler = mock.Mock()
|
||||
|
||||
self.m.observe_property('loop', handler)
|
||||
|
||||
try:
|
||||
self.m.unobserve_property('loop', handler)
|
||||
except RuntimeError:
|
||||
self.fail(
|
||||
"""
|
||||
"RuntimeError" exception thrown within
|
||||
`unobserve_property`
|
||||
""",
|
||||
)
|
||||
|
||||
def test_instance_method_property_observer(self):
|
||||
"""
|
||||
Ensure that bound method objects can be used as property observers.
|
||||
See issue #26
|
||||
"""
|
||||
handler = mock.Mock()
|
||||
m = self.m
|
||||
|
||||
class T(object):
|
||||
def t(self, *args, **kw):
|
||||
handler(*args, **kw)
|
||||
t = T()
|
||||
|
||||
m.slang = 'ru'
|
||||
time.sleep(0.5)
|
||||
|
||||
m.observe_property('slang', t.t)
|
||||
time.sleep(0.5)
|
||||
|
||||
m.slang = 'jp'
|
||||
time.sleep(0.5)
|
||||
|
||||
m.slang = 'ru'
|
||||
time.sleep(0.5)
|
||||
|
||||
m.unobserve_property('slang', t.t)
|
||||
time.sleep(0.5)
|
||||
|
||||
m.slang = 'jp'
|
||||
m.slang = 'ru'
|
||||
m.terminate() # needed for synchronization of event thread
|
||||
handler.assert_has_calls([mock.call('slang', ['jp']), mock.call('slang', ['ru'])])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue