Added basic text rendering & TCP server
This commit is contained in:
parent
a95e0305ae
commit
7e2c51dc26
6 changed files with 114 additions and 45 deletions
|
|
@ -1,4 +1,3 @@
|
|||
import bdflib # Used to read the bitmap font
|
||||
|
||||
# Hard timeout in seconds after which (approximately) the rendering of a single item will be cut off
|
||||
RENDERER_TIMEOUT = 20.0
|
||||
|
|
@ -7,13 +6,7 @@ DEFAULT_IMAGE_DURATION = 10.0
|
|||
# Default scrolling speed in pixels/second
|
||||
DEFAULT_SCROLL_SPEED = 4
|
||||
# Pixels to leave blank between two letters
|
||||
LETTER_SPACING = 1
|
||||
|
||||
#FONT = bdflib.reader.read_bdf(open('fonts/5x8.bdf').readlines())
|
||||
#FONT_WIDTH = 5
|
||||
|
||||
# Computed value
|
||||
#FONT_PADDED_BINARY = ('{:0'+str(FONT_WIDTH+'b}').format
|
||||
LETTER_SPACING = 0
|
||||
|
||||
# Display geometry
|
||||
# ┌─────────┐ ┌───┬───┬ ⋯ ┬───┬───┐
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ dev = usb.core.find(idVendor=0x1cbe, idProduct=0x0003)
|
|||
def sendframe(framedata):
|
||||
# not isinstance(framedata, np.array) or
|
||||
if framedata.shape != (DISPLAY_HEIGHT, DISPLAY_WIDTH, 3) or framedata.dtype != np.uint8:
|
||||
raise ValueError('framedata must be a ({}, {}, 3)-numpy array of int8s. Got a {}-numpy array of {}'.format(DISPLAY_WIDTH, DISPLAY_HEIGHT, framedata.shape, framedata.dtype))
|
||||
raise ValueError('framedata must be a ({}, {}, 3)-numpy array of uint8s. Got a {}-numpy array of {}'.format(DISPLAY_HEIGHT, DISPLAY_WIDTH, framedata.shape, framedata.dtype))
|
||||
|
||||
for cy, cx in itertools.product(range(CRATES_Y), range(CRATES_X)):
|
||||
cratedata = framedata[cy*CRATE_HEIGHT:(cy+1)*CRATE_HEIGHT, cx*CRATE_WIDTH:(cx+1)*CRATE_WIDTH]
|
||||
|
|
|
|||
82
host/matelight/listeners.py
Normal file → Executable file
82
host/matelight/listeners.py
Normal file → Executable file
|
|
@ -1,19 +1,79 @@
|
|||
#!/usr/bin/env python3
|
||||
from socketserver import *
|
||||
import threading
|
||||
import zlib
|
||||
import struct
|
||||
import host
|
||||
import numpy as np
|
||||
import time
|
||||
import renderers
|
||||
from PIL import Image, ImageSequence
|
||||
from config import *
|
||||
# Loading frame (for the big font file)
|
||||
img = Image.open(open('../nyancat.png', 'rb'))
|
||||
frame = np.array(img.convert('RGB').getdata(), dtype=np.uint8).reshape((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3))
|
||||
host.sendframe(frame)
|
||||
from font import *
|
||||
|
||||
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
|
||||
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
|
||||
|
||||
class MateLightUDPHandler(BaseRequestHandler):
|
||||
def handle(self):
|
||||
data = self.request[0].strip()
|
||||
if len(data) != FRAME_SIZE+4:
|
||||
raise ValueError('Invalid frame size: Expected {}, got {}'.format(FRAME_SIZE+4, len(frame)))
|
||||
frame = data[:-4]
|
||||
crc1, = struct.unpack('!I', data[-4:])
|
||||
crc2 = zlib.crc32(frame),
|
||||
if crc1 != crc2:
|
||||
raise ValueError('Invalid frame CRC checksum: Expected {}, got {}'.format(crc2, crc1))
|
||||
socket.sendto(b'ACK', self.client_address)
|
||||
default_renderer = renderers.TextRenderer('\x1B[91mFeed \x1B[92mme via \x1B[93mTCP on \x1B[94;101mml.jaseg.net:1337\x1B[0;91m ! \x1B[95mI can\x1B[96m parse\x1B[97m ANSI\x1B[92m color\x1B[93m codes\x1B[91m!\x1B[38;5;207m http://github.com/jaseg/matelight')
|
||||
global renderer, count
|
||||
renderer = default_renderer
|
||||
count = 0
|
||||
|
||||
class MateLightUDPHandler(BaseRequestHandler):
|
||||
def handle(self):
|
||||
data = self.request[0].strip()
|
||||
if len(data) != FRAME_SIZE: #+4
|
||||
raise ValueError('Invalid frame size: Expected {}, got {}'.format(FRAME_SIZE, len(frame))) #+4
|
||||
frame = data[:-4]
|
||||
#crc1, = struct.unpack('!I', data[-4:])
|
||||
crc2 = zlib.crc32(frame),
|
||||
if crc1 != crc2:
|
||||
raise ValueError('Invalid frame CRC checksum: Expected {}, got {}'.format(crc2, crc1))
|
||||
#socket.sendto(b'ACK', self.client_address)
|
||||
a = np.array(frame, dtype=np.uint8)
|
||||
a.reshape((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3))
|
||||
host.sendframe(a)
|
||||
|
||||
class MateLightTCPTextHandler(BaseRequestHandler):
|
||||
def handle(self):
|
||||
try:
|
||||
data = str(self.request.recv(1024).strip(), 'UTF-8')
|
||||
if len(data) > 140:
|
||||
self.request.sendall('TOO MUCH INFORMATION!\n')
|
||||
return
|
||||
global renderer, count
|
||||
print(data+'\x1B[0m')
|
||||
renderer = renderers.TextRenderer(data)
|
||||
count = 3
|
||||
self.request.sendall(b'KTHXBYE!\n')
|
||||
except:
|
||||
pass
|
||||
|
||||
TCPServer.allow_reuse_address = True
|
||||
server = TCPServer(('', 1337), MateLightTCPTextHandler)
|
||||
t = threading.Thread(target=server.serve_forever)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
UDPServer.allow_reuse_address = True
|
||||
userver = UDPServer(('', 1337), MateLightUDPHandler)
|
||||
t = threading.Thread(target=userver.serve_forever)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
while True:
|
||||
global renderer, count
|
||||
foo = renderer
|
||||
if count == 0:
|
||||
renderer = default_renderer
|
||||
else:
|
||||
count = count - 1
|
||||
for frame, delay in foo.frames():
|
||||
#print(list(frame.flatten()))
|
||||
host.sendframe(np.swapaxes(frame, 0, 1))
|
||||
#time.sleep(delay)
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ bar = np.array(im.getdata(), dtype=np.uint8)
|
|||
foo = bar.reshape((DISPLAY_HEIGHT, 300, 3))
|
||||
|
||||
while True:
|
||||
for i in range(20):
|
||||
for i in range(60):
|
||||
for data in datas:
|
||||
host.sendframe(data)
|
||||
time.sleep(0.1)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
import numpy as np
|
||||
import time
|
||||
try:
|
||||
import re2 as re
|
||||
except ImportError:
|
||||
import re
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from pixelterm import xtermcolors
|
||||
from config import *
|
||||
from font import *
|
||||
|
||||
default_palette = [
|
||||
(0x00, 0x00, 0x00), # 0 normal colors
|
||||
|
|
@ -24,17 +26,19 @@ default_palette = [
|
|||
(0xff, 0x00, 0xff), # 13
|
||||
(0x00, 0xff, 0xff), # 14
|
||||
(0xff, 0xff, 0xff)] # 15
|
||||
default_colors = (default_palette[8], default_palette[0])
|
||||
|
||||
class CharGenerator:
|
||||
def __init__(self, seq=None, lg=None, text=''):
|
||||
settings = False, False, False, default_palette[8], default_palette[0]
|
||||
settings = False, False, False, default_colors
|
||||
if lg:
|
||||
settings = lg,bold, lg.blink, lg.underscore, lg.fg, lg.bg
|
||||
self.bold, self.blink, self.underscore, self.fg, self.bg = settings
|
||||
settings = lg.bold, lg.blink, lg.underscore, (lg.fg, lg.bg)
|
||||
self.bold, self.blink, self.underscore, (self.fg, self.bg) = settings
|
||||
self.text = text
|
||||
self.parse_escape_sequence(seq)
|
||||
if seq:
|
||||
self.parse_escape_sequence(seq)
|
||||
|
||||
def parse_escape_sequence(seq):
|
||||
def parse_escape_sequence(self, seq):
|
||||
codes = list(map(int, seq[2:-1].split(';')))
|
||||
fg, bg, reverse, i = self.fg, self.bg, False, 0
|
||||
while i<len(codes):
|
||||
|
|
@ -49,7 +53,7 @@ class CharGenerator:
|
|||
elif a == 49:
|
||||
bg = (0,0,0)
|
||||
elif a == 0:
|
||||
fg, bg = (0,0,0), (0,0,0)
|
||||
fg, bg = default_colors
|
||||
self.bold, self.blink, self.underscore = False, False, False
|
||||
elif a in range(30, 38):
|
||||
fg = default_palette[a-30]
|
||||
|
|
@ -57,7 +61,7 @@ class CharGenerator:
|
|||
fg = default_palette[a-90+8]
|
||||
elif a in range(40, 48):
|
||||
bg = default_palette[a-40]
|
||||
elif a in range(100, 108):
|
||||
elif a in range(101, 108):
|
||||
bg = default_palette[a-100+8]
|
||||
elif a == 7:
|
||||
reverse = True
|
||||
|
|
@ -68,20 +72,22 @@ class CharGenerator:
|
|||
elif a == 1: # Literally "bright", not bold.
|
||||
self.bold = True
|
||||
i += 1
|
||||
fg, bg = bg, fg if reverse else fg, bg
|
||||
fg, bg = (bg, fg) if reverse else (fg, bg)
|
||||
self.fg, self.bg = fg, bg
|
||||
|
||||
def generate_char(self, c, now):
|
||||
fg, bg = self.bg, self.bg if self.blink and now%1.0 < 0.3 else self.fg, self.bg
|
||||
glyph = font.glyphs_by_codepoint[c]
|
||||
fg, bg = (self.bg, self.fg) if self.blink and now%1.0 < 0.3 else (self.fg, self.bg)
|
||||
glyph = FONT.glyphs_by_codepoint[ord(c)]
|
||||
# Please forgive the string manipulation below.
|
||||
lookup = {'0': bg, '1': fg}
|
||||
return [ list(map(lookup.get, FONT_PADDED_BINARY(int(row, 16)))) for row in glyph.get_data() ]
|
||||
FONT_PADDED_BINARY = ('{:0'+str(glyph.bbW)+'b}').format
|
||||
FONT_Y_PAD = [[bg]*glyph.bbW]*(DISPLAY_HEIGHT-FONT_HEIGHT)
|
||||
return np.swapaxes(np.array([ list(map(lookup.get, FONT_PADDED_BINARY(int(row, 16))[:glyph.bbW])) for row in glyph.get_data() ] + FONT_Y_PAD, dtype=np.uint8), 0, 1)
|
||||
|
||||
def generate(self, now):
|
||||
chars = [self.generate_char(c, now) for c in self.text]
|
||||
# This refers to inter-letter spacing
|
||||
space = np.zeros((LETTER_SPACING, DISPLAY_HEIGHT, 3))
|
||||
space = np.zeros((LETTER_SPACING, DISPLAY_HEIGHT, 3), dtype=np.uint8)
|
||||
spaces = [space]*(len(chars)-1)
|
||||
everything = chars + spaces
|
||||
everything[::2] = chars
|
||||
|
|
@ -96,25 +102,26 @@ class TextRenderer:
|
|||
"""
|
||||
generators = []
|
||||
current_generator = CharGenerator()
|
||||
for esc, char in r'(\x1B\[[0-9;]+m)|(.)'.finditer(text):
|
||||
for match in re.finditer('(\x1B\[[0-9;]+m)|(.)', text):
|
||||
esc, char = match.groups()
|
||||
if esc:
|
||||
if current_generator.text != '':
|
||||
generators.append(current_generator)
|
||||
current_generator = CharGenerator(esc, current_generator)
|
||||
elif char:
|
||||
current_generator.text += char
|
||||
self.generators = generators + [current_generator]
|
||||
|
||||
def frames(self, start):
|
||||
now = time.time()
|
||||
zeros = [np.zeros((DISPLAY_WIDTH, DISPLAY_HEIGHT, 3))]
|
||||
generators = generators + [current_generator]
|
||||
# Generate the actual frame buffer
|
||||
zeros = [np.zeros((DISPLAY_WIDTH, DISPLAY_HEIGHT, 3), dtype=np.uint8)]
|
||||
# Pad the array with one screen's worth of zeros on both sides so the text fully scrolls through.
|
||||
raw = np.concatenate([zeros]+[g.generate(now) for g in generators]+[zeros])
|
||||
w,h,_ = raw.size
|
||||
for i in range(DISPLAY_WIDTH+w, 0, -1):
|
||||
frame = raw[i:i+DISPLAY_WIDTH, :, :]
|
||||
now = time.time()
|
||||
self.raw = np.concatenate(zeros+[g.generate(now) for g in generators]+zeros)
|
||||
|
||||
def frames(self):
|
||||
w,h,_ = self.raw.shape
|
||||
for i in range(0, w-DISPLAY_WIDTH, 2):
|
||||
frame = self.raw[i:i+DISPLAY_WIDTH, :, :]
|
||||
yield frame, 1/DEFAULT_SCROLL_SPEED
|
||||
raw = np.concatenate([zeros]+[g.generate(now) for g in generators]+[zeros])
|
||||
|
||||
class ImageRenderer:
|
||||
def __new__(cls, image_data):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue