Moving on with the host program

This commit is contained in:
jaseg 2013-12-18 18:22:00 +01:00
parent 037b3fc1d4
commit 93592ee439
6 changed files with 257 additions and 32 deletions

View file

@ -1,32 +0,0 @@
from pyusb import usb
import colorsys
import numpy as np
CRATE_WIDTH = 5
CRATE_HEIGHT = 4
CRATES_X = 16
CRATES_Y = 2
DISPLAY_WIDTH = CRATES_X * CRATE_WIDTH
DISPLAY_HEIGHT = CRATES_Y * CRATE_HEIGHT
FRAME_SIZE = CRATE_WIDTH*CRATE_HEIGHT*3
dev = usb.core.find(idVendor=0x1cbe, idProduct=0x0003)
def sendframe(framedata):
""" Send a frame to the display
The argument contains a h * w array of 3-tuples of (r, g, b)-data
"""
def chunks(l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
for cx, cy in itertools.product(range(16), range(2)):
data = [ v for x in range(CRATE_WIDTH) for y in range(CRATE_HEIGHT) for v in framedata[cy*CRATE_HEIGHT + y][cx*CRATE_WIDTH + x] ]
if len(data) != FRAME_SIZE:
raise ValueError('Invalid frame data. Expected {} bytes, got {}.'.format(FRAME_SIZE, len(data)))
# Send framebuffer data
dev.write(0x01, bytes([0, x, y])+bytes(data))
# Send latch command
dev.write(0x01, b'\x01')

27
host/matelight/config.py Normal file
View file

@ -0,0 +1,27 @@
# Hard timeout in seconds after which (approximately) the rendering of a single item will be cut off
RENDERER_TIMEOUT = 20.0
# How long to show an image by default
DEFAULT_IMAGE_DURATION = 10.0
# Default scrolling speed in pixels/second
DEFAULT_SCROLL_SPEED = 4
# Pixels to leave blank between two letters
LETTER_SPACING = 1
# Display geometry
# ┌─────────┐ ┌──┬──┬──┬ ⋯ ┬──┬──┬──┐
# │1 o o o 5│ │ 1│ │ │ │ │ │16│
# │6 o o o o│ ├──┼──┼──┼ ⋯ ┼──┼──┼──┤
# │o o o o o│ │17│ │ │ │ │ │32│
# │o o o o20│ └──┴──┴──┴ ⋯ ┴──┴──┴──┘
# └─────────┘
CRATE_WIDTH = 5
CRATE_HEIGHT = 4
CRATES_X = 16
CRATES_Y = 2
# Computed values
DISPLAY_WIDTH = CRATES_X * CRATE_WIDTH
DISPLAY_HEIGHT = CRATES_Y * CRATE_HEIGHT
FRAME_SIZE = DISPLAY_WIDTH*DISPLAY_HEIGHT*3

16
host/matelight/host.py Normal file
View file

@ -0,0 +1,16 @@
from pyusb import usb
import colorsys
import numpy as np
dev = usb.core.find(idVendor=0x1cbe, idProduct=0x0003)
def sendframe(framedata):
if not isinstance(framedata, np.array) or framedata.shape != (DISPLAY_WIDTH, DISPLAY_HEIGHT, 3) or framedata.dtype != np.int8:
raise ValueError('framedata must be a ({}, {}, 3)-numpy array of int8s'.format(DISPLAY_WIDTH, DISPLAY_HEIGHT))
for cx, cy in itertools.product(range(16), range(2)):
cratedata = framedata[cx*CRATE_WIDTH:(cx+1)*CRATE_WIDTH, cy*CRATE_HEIGHT:(cy+1)*CRATE_HEIGHT]
# Send framebuffer data
dev.write(0x01, bytes([0, x, y])+bytes(list(cratedata.flatten())))
# Send latch command
dev.write(0x01, b'\x01')

View file

@ -0,0 +1,19 @@
from socketserver import *
import zlib
import struct
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class MateLightUDPHandler(BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
if len(data) != FRAME_SIZE+4:
raise ValueError('Invalid frame size: Expected {}, got {}'.format(FRAME_SIZE+4, len(frame)))
frame = data[:-4]
crc1, = struct.unpack('!I', data[-4:])
crc2 = zlib.crc32(frame),
if crc1 != crc2:
raise ValueError('Invalid frame CRC checksum: Expected {}, got {}'.format(crc2, crc1))
socket.sendto(b'ACK', self.client_address)

View file

@ -0,0 +1,52 @@
#!/usr/bin/env python3
from renderers import TextRenderer, ImageRenderer
import host, config
import time, math
score = lambda now, last, lifetime, priority, item: priority*math.log(now-last)/(10.0+item.duration)
class FuzzyQueue:
def __init__(self, default):
self._default = default
self.put(default, 0.0, 0)
self._l = []
def put(self, item, priority=1.0, lifetime=0):
lifetime += time.time()
self._l.append((0, lifetime, priority, item))
def pop(self):
""" Get an item from the queue
NOTE: This is *not* a regular pop, as it does not necessarily remove the item from the queue.
"""
now = time.time()
# Choose item based on last used and priority
_, index, (_, lifetime, priority, item) = max(sorted([(score(now, *v), i, v) for i, v in self._l]))
# If item's lifetime is exceeded, remove
if lifetime < now and item is not self._default:
del self._l[index]
# Otherwise, set item's last played time
self._l[index] = (now, lifetime, prioity, item)
# Finally, return
return item
q = FuzzyQueue()
def insert_text(text, priority=1.0, lifetime=0, escapes=True):
q.put(TextRenderer(text, escapes), priority, lifetime)
def insert_image(image, priority=1.0, lifetime=0):
q.put(ImageRenderer(image), priority, lifetime)
def render_thread():
while True:
start = time.time()
for frame, delay in q.pop().frames(start):
then = time.time()
if then-start+delay > RENDERER_TIMEOUT:
break
sendframe(frame)
now = time.time()
time.sleep(min(RENDERER_TIMEOUT, delay - (now-then)))

143
host/matelight/renderers.py Normal file
View file

@ -0,0 +1,143 @@
import numpy as np
try:
import re2 as re
except ImportError:
import re
from PIL import Image
from pixelterm import xtermcolors
default_palette = [
(0x00, 0x00, 0x00), # 0 normal colors
(0xcd, 0x00, 0x00), # 1
(0x00, 0xcd, 0x00), # 2
(0xcd, 0xcd, 0x00), # 3
(0x00, 0x00, 0xee), # 4
(0xcd, 0x00, 0xcd), # 5
(0x00, 0xcd, 0xcd), # 6
(0xe5, 0xe5, 0xe5), # 7
(0x7f, 0x7f, 0x7f), # 8 bright colors
(0xff, 0x00, 0x00), # 9
(0x00, 0xff, 0x00), # 10
(0xff, 0xff, 0x00), # 11
(0x5c, 0x5c, 0xff), # 12
(0xff, 0x00, 0xff), # 13
(0x00, 0xff, 0xff), # 14
(0xff, 0xff, 0xff)] # 15
class CharGenerator:
def __init__(self, seq=None, lg=None, text=''):
settings = False, False, False, default_palette[8], default_palette[0]
if lg:
settings = lg,bold, lg.blink, lg.underscore, lg.fg, lg.bg
self.bold, self.blink, self.underscore, self.fg, self.bg = settings
self.text = text
self.parse_escape_sequence(seq)
def parse_escape_sequence(seq):
codes = list(map(int, seq[2:-1].split(';')))
fg, bg, reverse, i = self.fg, self.bg, False, 0
while i<len(codes):
a = codes[i]
if a in [38, 48]:
if codes[i+1] == 5:
c = xtermcolors.xterm_colors[codes[i+2]]
fg, bg = (c, bg) if a == 38 else (fg, c)
i += 2
elif a == 39:
fg = (0,0,0)
elif a == 49:
bg = (0,0,0)
elif a == 0:
fg, bg = (0,0,0), (0,0,0)
self.bold, self.blink, self.underscore = False, False, False
elif a in range(30, 38):
fg = default_palette[a-30]
elif a in range(90, 98):
fg = default_palette[a-90+8]
elif a in range(40, 48):
bg = default_palette[a-40]
elif a in range(100, 108):
bg = default_palette[a-100+8]
elif a == 7:
reverse = True
elif a == 5:
self.blink = True
elif a == 4:
self.underscore = True
elif a == 1: # Literally "bright", not bold.
self.bold = True
i += 1
fg, bg = bg, fg if reverse else fg, bg
self.fg, self.bg = fg, bg
def generate_char(self, c, now):
fg, bg = self.bg, self.bg if self.blink and now%1.0 < 0.3 else self.fg, self.bg
...
def generate(self, now):
chars = [self.generate_char(c, now) for c in self.text]
# This refers to inter-letter spacing
space = np.zeros((LETTER_SPACING, DISPLAY_HEIGHT, 3))
spaces = [space]*(len(chars)-1)
everything = chars + spaces
everything[::2] = chars
everything[1::2] = spaces
return np.concatenate(everything)
class TextRenderer:
def __init__(self, text, escapes=True):
"""Renders text into a frame buffer
"escapes" tells the renderer whether to interpret escape sequences (True) or not (False).
"""
generators = []
current_generator = CharGenerator()
for esc, char in r'(\x1B\[[0-9;]+m)|(.)'.finditer(text):
if esc:
if current_generator.text != '':
generators.append(current_generator)
current_generator = CharGenerator(esc, current_generator)
elif char:
current_generator.text += char
self.generators = generators + [current_generator]
def frames(self, start):
now = time.time()
zeros = [np.zeros((DISPLAY_WIDTH, DISPLAY_HEIGHT, 3))]
# Pad the array with one screen's worth of zeros on both sides so the text fully scrolls through.
raw = np.concatenate([zeros]+[g.generate(now) for g in generators]+[zeros])
w,h,_ = raw.size
for i in range(DISPLAY_WIDTH+w, 0, -1):
frame = raw[i:i+DISPLAY_WIDTH, :, :]
yield frame, 1/DEFAULT_SCROLL_SPEED
raw = np.concatenate([zeros]+[g.generate(now) for g in generators]+[zeros])
class ImageRenderer:
def __new__(cls, image_data):
img = Image.open(io.BytesIO(image_data))
self.img = img
def frames(self):
img = self.img
palette = img.getpalette()
last_frame = Image.new("RGB", img.size)
# FIXME set delay to 1/10s if the image is animated, only use DEFAULT_IMAGE_DURATION for static images.
delay = img.info.get('duration', DEFAULT_IMAGE_DURATION*1000.0)/1000.0
for frame in ImageSequence.Iterator(img):
#This works around a known bug in Pillow
#See also: http://stackoverflow.com/questions/4904940/python-converting-gif-frames-to-png
frame.putpalette(palette)
c = frame.convert("RGB")
if img.info['background'] != img.info['transparency']:
last_frame.paste(c, c)
else:
last_frame = c
im = last_frame.copy()
im.thumbnail((DISPLAY_WIDTH, DISPLAY_HEIGHT), Image.NEAREST)
data = np.array(im.getdata(), dtype=np.int8)
data.reshape((DISPLAY_WIDTH, DISPLAY_HEIGHT, 3))
yield data, delay