Host: Add console CRAP client
This commit is contained in:
parent
245fbf3d6b
commit
330e1eb20e
6 changed files with 237 additions and 192 deletions
56
host/bdf.py
Normal file
56
host/bdf.py
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
|
||||||
|
import config
|
||||||
|
import threading
|
||||||
|
import numpy
|
||||||
|
from ctypes import *
|
||||||
|
|
||||||
|
class COLOR(Structure):
|
||||||
|
_fields_ = [('r', c_uint8), ('g', c_uint8), ('b', c_uint8), ('a', c_uint8)]
|
||||||
|
|
||||||
|
class FRAMEBUFFER(Structure):
|
||||||
|
_fields_ = [('data', POINTER(COLOR)), ('w', c_size_t), ('h', c_size_t)]
|
||||||
|
|
||||||
|
lib = CDLL('./libbdf.so')
|
||||||
|
lib.read_bdf_file.restype = c_void_p
|
||||||
|
lib.framebuffer_render_text.restype = POINTER(FRAMEBUFFER)
|
||||||
|
lib.framebuffer_render_text.argtypes= [c_char_p, c_void_p, c_void_p, c_size_t, c_size_t, c_size_t]
|
||||||
|
|
||||||
|
dbuf = numpy.zeros(config.frame_size*4, dtype=numpy.uint8)
|
||||||
|
printlock = threading.Lock()
|
||||||
|
def printframe(fb):
|
||||||
|
with printlock:
|
||||||
|
print('\0337\033[H')
|
||||||
|
rgba = len(fb) == config.frame_size*4
|
||||||
|
ip = numpy.frombuffer(fb, dtype=numpy.uint8)
|
||||||
|
numpy.copyto(dbuf[0::4], ip[0::3+rgba])
|
||||||
|
numpy.copyto(dbuf[1::4], ip[1::3+rgba])
|
||||||
|
numpy.copyto(dbuf[2::4], ip[2::3+rgba])
|
||||||
|
lib.console_render_buffer(dbuf.ctypes.data_as(POINTER(c_uint8)), config.display_width, config.display_height)
|
||||||
|
|
||||||
|
|
||||||
|
class Font:
|
||||||
|
def __init__(self, fontfile='unifont.bdf'):
|
||||||
|
self.font = lib.read_bdf_file(fontfile)
|
||||||
|
assert self.font
|
||||||
|
# hack to prevent unlocalized memory leak arising from ctypes/numpy/cpython interaction
|
||||||
|
self.cbuf = create_string_buffer(config.frame_size*sizeof(COLOR))
|
||||||
|
self.cbuflock = threading.Lock()
|
||||||
|
|
||||||
|
def compute_text_bounds(text):
|
||||||
|
textbytes = text.encode()
|
||||||
|
textw, texth = c_size_t(0), c_size_t(0)
|
||||||
|
res = lib.framebuffer_get_text_bounds(textbytes, unifont, byref(textw), byref(texth))
|
||||||
|
if res:
|
||||||
|
raise ValueError('Invalid text')
|
||||||
|
return textw.value, texth.value
|
||||||
|
|
||||||
|
def render_text(text, offset):
|
||||||
|
with cbuflock:
|
||||||
|
textbytes = bytes(str(text), 'UTF-8')
|
||||||
|
res = lib.framebuffer_render_text(textbytes, self.font, self.cbuf,
|
||||||
|
config.display_width, config.display_height, offset)
|
||||||
|
if res:
|
||||||
|
raise ValueError('Invalid text')
|
||||||
|
return self.cbuf
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1,13 +1,4 @@
|
||||||
|
|
||||||
# Hard timeout in seconds after which (approximately) the rendering of a single item will be cut off
|
|
||||||
RENDERER_TIMEOUT = 20.0
|
|
||||||
# How long to show an image by default
|
|
||||||
DEFAULT_IMAGE_DURATION = 10.0
|
|
||||||
# Default scrolling speed in pixels/second
|
|
||||||
DEFAULT_SCROLL_SPEED = 4
|
|
||||||
# Pixels to leave blank between two letters
|
|
||||||
LETTER_SPACING = 0
|
|
||||||
|
|
||||||
# Display geometry
|
# Display geometry
|
||||||
# ┌─────────┐ ┌───┬───┬ ⋯ ┬───┬───┐
|
# ┌─────────┐ ┌───┬───┬ ⋯ ┬───┬───┐
|
||||||
# │1 o o o 5│ │ 1 │ │ │ │ 8│
|
# │1 o o o 5│ │ 1 │ │ │ │ 8│
|
||||||
|
|
@ -19,13 +10,34 @@ LETTER_SPACING = 0
|
||||||
# │25 │ │ │ │32 │
|
# │25 │ │ │ │32 │
|
||||||
# └───┴───┴ ⋯ ┴───┴───┘
|
# └───┴───┴ ⋯ ┴───┴───┘
|
||||||
|
|
||||||
CRATE_WIDTH = 5
|
# Physical display dimensions
|
||||||
CRATE_HEIGHT = 4
|
crate_width = 5
|
||||||
CRATES_X = 8
|
crate_height = 4
|
||||||
CRATES_Y = 4
|
crates_x = 8
|
||||||
|
crates_y = 4
|
||||||
|
|
||||||
# Computed values
|
# Computed values
|
||||||
DISPLAY_WIDTH = CRATES_X * CRATE_WIDTH
|
display_width = crates_x * crate_width
|
||||||
DISPLAY_HEIGHT = CRATES_Y * CRATE_HEIGHT
|
display_height = crates_y * crate_height
|
||||||
FRAME_SIZE = DISPLAY_WIDTH*DISPLAY_HEIGHT*3
|
crate_size = crate_width*crate_height
|
||||||
|
frame_size = display_width*display_height
|
||||||
|
|
||||||
|
# Display gamma factor
|
||||||
|
gamma = 2.5
|
||||||
|
|
||||||
|
# Brightness of the display. 0 to 1.0
|
||||||
|
brightness = 1.0
|
||||||
|
|
||||||
|
# Frame timeout for UDP clients
|
||||||
|
udp_timeout = 3.0
|
||||||
|
|
||||||
|
# Interval for rotation of multiple concurrent UDP clients
|
||||||
|
udp_switch_interval = 30.0
|
||||||
|
|
||||||
|
# Listening addr/port for UDP and TCP servers
|
||||||
|
udp_addr = tcp_addr = ''
|
||||||
|
udp_port = tcp_port = 1337
|
||||||
|
|
||||||
|
# Forward addr/port
|
||||||
|
crap_fw_addr, crap_fw_port = '127.0.0.1', 1338
|
||||||
|
|
||||||
|
|
|
||||||
68
host/crap.py
Normal file
68
host/crap.py
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
|
||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
import zlib
|
||||||
|
import io
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
import config
|
||||||
|
|
||||||
|
class CRAPClient:
|
||||||
|
def __init__(self, ip='127.0.0.1', port=1337):
|
||||||
|
self.ip, self.port = ip, port
|
||||||
|
self.sock = socket.Socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
self.close = self.sock.close
|
||||||
|
|
||||||
|
def sendframe(self, frame):
|
||||||
|
self.sock.sendto(frame, (self.ip, self.port))
|
||||||
|
|
||||||
|
|
||||||
|
def _timestamped_recv(sock):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data, addr = sock.recvfrom(config.frame_size*3+4)
|
||||||
|
except io.BlockingIOError as e:
|
||||||
|
raise StopIteration()
|
||||||
|
else:
|
||||||
|
yield time(), data, addr
|
||||||
|
|
||||||
|
|
||||||
|
class CRAPServer:
|
||||||
|
def __init__(self, ip='', port=1337, blocking=False, log=print):
|
||||||
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
self.sock.setblocking(blocking)
|
||||||
|
self.sock.bind((ip, port))
|
||||||
|
|
||||||
|
self.current_client = None
|
||||||
|
self.last_timestamp = 0
|
||||||
|
self.begin_timestamp = 0
|
||||||
|
self.log = log
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.sock.close()
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
for timestamp, data, (addr, sport) in _timestamped_recv(self.sock):
|
||||||
|
if data is None:
|
||||||
|
yield None
|
||||||
|
|
||||||
|
if timestamp - self.last_timestamp > config.udp_timeout\
|
||||||
|
or timestamp - self.begin_timestamp > config.udp_switch_interval:
|
||||||
|
self.current_client = addr
|
||||||
|
self.begin_timestamp = timestamp
|
||||||
|
self.log('\x1B[91mAccepting UDP data from\x1B[0m', addr)
|
||||||
|
|
||||||
|
if addr == self.current_client:
|
||||||
|
if len(data) == config.frame_size*3+4:
|
||||||
|
(crc1,), crc2 = struct.unpack('!I', data[-4:]), zlib.crc32(data, 0),
|
||||||
|
data = data[:-4] # crop CRC
|
||||||
|
if crc1 and crc1 != crc2: # crc1 zero-check for backward-compatibility
|
||||||
|
self.log('Error receiving UDP frame: Invalid frame CRC checksum: Expected {}, got {}'.format(crc2, crc1))
|
||||||
|
continue
|
||||||
|
elif len(data) != config.frame_size*3:
|
||||||
|
self.log('Error receiving UDP frame: Invalid frame size: {}'.format(len(data)))
|
||||||
|
self.last_timestamp = timestamp
|
||||||
|
yield data
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -4,21 +4,7 @@ from ctypes import c_size_t, c_uint8, c_void_p, c_float, CDLL, Structure, POINTE
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import time
|
import time
|
||||||
|
|
||||||
CRATE_WIDTH = 5
|
from config import *
|
||||||
CRATE_HEIGHT = 4
|
|
||||||
CRATES_X = 8
|
|
||||||
CRATES_Y = 4
|
|
||||||
|
|
||||||
DISPLAY_WIDTH = CRATES_X*CRATE_WIDTH
|
|
||||||
DISPLAY_HEIGHT = CRATES_Y*CRATE_HEIGHT
|
|
||||||
CRATE_SIZE = CRATE_WIDTH*CRATE_HEIGHT*3
|
|
||||||
FRAME_SIZE = DISPLAY_WIDTH*DISPLAY_HEIGHT
|
|
||||||
|
|
||||||
# Gamma factor
|
|
||||||
GAMMA = 2.5
|
|
||||||
|
|
||||||
# Brightness of the LEDs in percent. 1.0 means 100%.
|
|
||||||
BRIGHTNESS = 1.0
|
|
||||||
|
|
||||||
ml = CDLL('./libml.so')
|
ml = CDLL('./libml.so')
|
||||||
ml.matelight_open.restype = c_void_p
|
ml.matelight_open.restype = c_void_p
|
||||||
|
|
|
||||||
215
host/server.py
215
host/server.py
|
|
@ -1,191 +1,84 @@
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
from socketserver import *
|
|
||||||
import socket
|
import socket
|
||||||
import struct
|
from time import strftime
|
||||||
import zlib
|
|
||||||
from time import time, strftime, sleep
|
|
||||||
from collections import namedtuple, deque
|
|
||||||
import itertools
|
import itertools
|
||||||
import threading
|
|
||||||
import random
|
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
|
from contextlib import suppress
|
||||||
|
|
||||||
from ctypes import *
|
from config import *
|
||||||
|
|
||||||
from matelight import sendframe, DISPLAY_WIDTH, DISPLAY_HEIGHT, FRAME_SIZE
|
import matelight
|
||||||
|
import bdf
|
||||||
|
import crap
|
||||||
|
|
||||||
UDP_TIMEOUT = 3.0
|
|
||||||
UDP_SWITCH_INTERVAL = 30.0
|
|
||||||
|
|
||||||
class COLOR(Structure):
|
|
||||||
_fields_ = [('r', c_uint8), ('g', c_uint8), ('b', c_uint8), ('a', c_uint8)]
|
|
||||||
|
|
||||||
class FRAMEBUFFER(Structure):
|
|
||||||
_fields_ = [('data', POINTER(COLOR)), ('w', c_size_t), ('h', c_size_t)]
|
|
||||||
|
|
||||||
bdf = CDLL('./libbdf.so')
|
|
||||||
bdf.read_bdf_file.restype = c_void_p
|
|
||||||
bdf.framebuffer_render_text.restype = POINTER(FRAMEBUFFER)
|
|
||||||
bdf.framebuffer_render_text.argtypes= [c_char_p, c_void_p, c_void_p, c_size_t, c_size_t, c_size_t]
|
|
||||||
|
|
||||||
unifont = bdf.read_bdf_file('unifont.bdf')
|
|
||||||
|
|
||||||
def compute_text_bounds(text):
|
|
||||||
assert unifont
|
|
||||||
textbytes = bytes(str(text), 'UTF-8')
|
|
||||||
textw, texth = c_size_t(0), c_size_t(0)
|
|
||||||
res = bdf.framebuffer_get_text_bounds(textbytes, unifont, byref(textw), byref(texth))
|
|
||||||
if res:
|
|
||||||
raise ValueError('Invalid text')
|
|
||||||
return textw.value, texth.value
|
|
||||||
|
|
||||||
cbuf = create_string_buffer(FRAME_SIZE*sizeof(COLOR))
|
|
||||||
cbuflock = threading.Lock()
|
|
||||||
def render_text(text, offset):
|
|
||||||
global cbuf
|
|
||||||
cbuflock.acquire()
|
|
||||||
textbytes = bytes(str(text), 'UTF-8')
|
|
||||||
res = bdf.framebuffer_render_text(textbytes, unifont, cbuf, DISPLAY_WIDTH, DISPLAY_HEIGHT, offset)
|
|
||||||
if res:
|
|
||||||
raise ValueError('Invalid text')
|
|
||||||
cbuflock.release()
|
|
||||||
return cbuf
|
|
||||||
|
|
||||||
printlock = threading.Lock()
|
|
||||||
|
|
||||||
def printframe(fb):
|
|
||||||
printlock.acquire()
|
|
||||||
print('\0337\033[H', end='')
|
|
||||||
print('Rendering frame @{}'.format(time()))
|
|
||||||
bdf.console_render_buffer(fb, DISPLAY_WIDTH, DISPLAY_HEIGHT)
|
|
||||||
#print('\033[0m\033[KCurrently rendering', current_entry.entrytype, 'from', current_entry.remote, ':', current_entry.text, '\0338', end='')
|
|
||||||
printlock.release()
|
|
||||||
|
|
||||||
def log(*args):
|
def log(*args):
|
||||||
printlock.acquire()
|
|
||||||
print(strftime('\x1B[93m[%m-%d %H:%M:%S]\x1B[0m'), ' '.join(str(arg) for arg in args), '\x1B[0m')
|
print(strftime('\x1B[93m[%m-%d %H:%M:%S]\x1B[0m'), ' '.join(str(arg) for arg in args), '\x1B[0m')
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
printlock.release()
|
|
||||||
|
|
||||||
class TextRenderer:
|
class TextRenderer:
|
||||||
def __init__(self, text):
|
def __init__(self, text):
|
||||||
self.text = text
|
self.text = text
|
||||||
self.width, _ = compute_text_bounds(text)
|
self.width, _ = unifont.compute_text_bounds(text)
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
for i in range(-DISPLAY_WIDTH, self.width):
|
for i in range(-DISPLAY_WIDTH, self.width):
|
||||||
#print('Rendering text @ pos {}'.format(i))
|
|
||||||
yield render_text(self.text, i)
|
yield render_text(self.text, i)
|
||||||
|
|
||||||
class MateLightUDPServer:
|
class MatelightTCPServer:
|
||||||
def __init__(self, port=1337, ip=''):
|
def __init__(self, port, ip):
|
||||||
self.current_client = None
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
self.last_timestamp = 0
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
self.begin_timestamp = 0
|
self.sock.setblocking(blocking)
|
||||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
self.sock.bind((ip, port))
|
||||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
self.conns = set()
|
||||||
self.socket.bind((ip, port))
|
self.renderqueue = []
|
||||||
self.thread = threading.Thread(target = self.udp_receive)
|
|
||||||
self.thread.daemon = True
|
|
||||||
self.start = self.thread.start
|
|
||||||
self.frame_condition = threading.Condition()
|
|
||||||
self.frame = None
|
|
||||||
|
|
||||||
def frame_da(self):
|
|
||||||
return self.frame is not None
|
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
while True:
|
q, self.renderqueue = self.renderqueue, []
|
||||||
with self.frame_condition:
|
for frame in itertools.chain(*q):
|
||||||
if not self.frame_condition.wait_for(self.frame_da, timeout=UDP_TIMEOUT):
|
|
||||||
raise StopIteration()
|
|
||||||
frame, self.frame = self.frame, None
|
|
||||||
yield frame
|
yield frame
|
||||||
|
|
||||||
def udp_receive(self):
|
def handle_connections(self):
|
||||||
while True:
|
for conn in self.conns:
|
||||||
try:
|
try:
|
||||||
data, (addr, sport) = self.socket.recvfrom(FRAME_SIZE*3+4)
|
line = conn.recv(1024).decode('UTF-8').strip()
|
||||||
timestamp = time()
|
if len(data) > 140: # Unicode string length, *not* byte length of encoded UTF-8
|
||||||
if timestamp - self.last_timestamp > UDP_TIMEOUT \
|
conn.sendall(b'TOO MUCH INFORMATION!\n')
|
||||||
or timestamp - self.begin_timestamp > UDP_SWITCH_INTERVAL:
|
else:
|
||||||
self.current_client = addr
|
log('\x1B[95mText from\x1B[0m {}: {}\x1B[0m'.format(addr, data))
|
||||||
self.begin_timestamp = timestamp
|
renderqueue.append(TextRenderer(data))
|
||||||
log('\x1B[91mAccepting UDP data from\x1B[0m', addr)
|
conn.sendall(b'KTHXBYE!\n')
|
||||||
if addr == self.current_client:
|
except socket.error, e:
|
||||||
if len(data) == FRAME_SIZE*3+4:
|
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
|
||||||
frame = data[:-4]
|
continue
|
||||||
crc1, = struct.unpack('!I', data[-4:])
|
with suppress(socket.error):
|
||||||
if crc1:
|
conn.close()
|
||||||
crc2, = zlib.crc32(frame, 0),
|
self.conns.remove(conn)
|
||||||
if crc1 != crc2:
|
|
||||||
raise ValueError('Invalid frame CRC checksum: Expected {}, got {}'.format(crc2, crc1))
|
|
||||||
elif len(data) == FRAME_SIZE*3:
|
|
||||||
frame = data
|
|
||||||
else:
|
|
||||||
raise ValueError('Invalid frame size: {}'.format(len(data)))
|
|
||||||
self.last_timestamp = timestamp
|
|
||||||
with self.frame_condition:
|
|
||||||
self.frame = frame
|
|
||||||
self.frame_condition.notify()
|
|
||||||
except Exception as e:
|
|
||||||
log('Error receiving UDP frame:', e)
|
|
||||||
|
|
||||||
renderqueue = deque()
|
def _fallbackiter(it, fallback):
|
||||||
|
for fel in fallback:
|
||||||
class MateLightTCPTextHandler(BaseRequestHandler):
|
for el in it:
|
||||||
def handle(self):
|
yield el
|
||||||
global render_deque
|
yield fel
|
||||||
data = str(self.request.recv(1024).strip(), 'UTF-8')
|
|
||||||
addr = self.client_address[0]
|
|
||||||
if len(data) > 140:
|
|
||||||
self.request.sendall(b'TOO MUCH INFORMATION!\n')
|
|
||||||
return
|
|
||||||
log('\x1B[95mText from\x1B[0m {}: {}\x1B[0m'.format(addr, data))
|
|
||||||
renderqueue.append(TextRenderer(data))
|
|
||||||
self.request.sendall(b'KTHXBYE!\n')
|
|
||||||
|
|
||||||
TCPServer.allow_reuse_address = True
|
|
||||||
tserver = TCPServer(('', 1337), MateLightTCPTextHandler)
|
|
||||||
t = threading.Thread(target=tserver.serve_forever)
|
|
||||||
t.daemon = True
|
|
||||||
t.start()
|
|
||||||
|
|
||||||
userver = MateLightUDPServer()
|
|
||||||
userver.start()
|
|
||||||
|
|
||||||
defaultlines = [ TextRenderer(l[:-1].replace('\\x1B', '\x1B')) for l in open('default.lines').readlines() ]
|
|
||||||
#random.shuffle(defaultlines)
|
|
||||||
defaulttexts = itertools.chain(*defaultlines)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
print('\033[?1049h'+'\n'*9)
|
tcp_server = MatelightTCPServer(config.tcp_addr, config.tcp_port)
|
||||||
while True:
|
udp_server = crap.CRAPServer(config.udp_addr, config.udp_port)
|
||||||
if renderqueue:
|
forwarder = crap.CRAPClient(config.crap_fw_addr, config.crap_fw_port) if config.crap_fw_addr is not None else None
|
||||||
renderer = renderqueue.popleft()
|
|
||||||
elif userver.frame_da():
|
|
||||||
renderer = userver
|
|
||||||
else:
|
|
||||||
static_noise = time() % 300 < 60
|
|
||||||
if False:
|
|
||||||
foo = os.urandom(640)
|
|
||||||
frame = bytes([v for c in zip(list(foo), list(foo), list(foo)) for v in c ])
|
|
||||||
sleep(0.05)
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
frame = next(defaulttexts)
|
|
||||||
except StopIteration:
|
|
||||||
defaultlines = [ TextRenderer(l[:-1].replace('\\x1B', '\x1B')) for l in open('default.lines').readlines() ]
|
|
||||||
#random.shuffle(defaultlines)
|
|
||||||
defaulttexts = itertools.chain(*defaultlines)
|
|
||||||
sendframe(frame)
|
|
||||||
# printframe(frame)
|
|
||||||
continue
|
|
||||||
# sleep(0.1)
|
|
||||||
for frame in renderer:
|
|
||||||
sendframe(frame)
|
|
||||||
# printframe(frame)
|
|
||||||
# sleep(0.1)
|
|
||||||
|
|
||||||
|
def defaulttexts(filename='default.lines'):
|
||||||
|
with open(filename) as f:
|
||||||
|
return itertools.chain.from_iterable(( TextRenderer(l[:-1].replace('\\x1B', '\x1B')) for l in f.readlines() ))
|
||||||
|
|
||||||
|
with suppress(KeyboardInterrupt):
|
||||||
|
for renderer in _fallbackiter(tcp_server, defaulttexts()):
|
||||||
|
for frame in _fallbackiter(udp_server, renderer):
|
||||||
|
matelight.sendframe(frame)
|
||||||
|
if forwarder:
|
||||||
|
forwarder.sendframe(frame)
|
||||||
|
|
||||||
|
tcp_server.close()
|
||||||
|
udp_server.close()
|
||||||
|
forwarder.close()
|
||||||
|
|
|
||||||
30
host/viewer.py
Executable file
30
host/viewer.py
Executable file
|
|
@ -0,0 +1,30 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import socket
|
||||||
|
from time import time
|
||||||
|
import sys
|
||||||
|
from contextlib import suppress
|
||||||
|
import argparse
|
||||||
|
import atexit
|
||||||
|
|
||||||
|
import config
|
||||||
|
|
||||||
|
import bdf
|
||||||
|
import crap
|
||||||
|
|
||||||
|
atexit.register(print, '\033[?1049l') # Restore normal screen buffer at exit
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('addr', default='127.0.0.1', nargs='?')
|
||||||
|
parser.add_argument('port', type=int, default=1337, nargs='?')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
print('\033[?1049h'+'\n'*9)
|
||||||
|
udp_server = crap.CRAPServer(args.addr, args.port, blocking=True, log=lambda *_a: None)
|
||||||
|
|
||||||
|
with suppress(KeyboardInterrupt):
|
||||||
|
for frame in udp_server:
|
||||||
|
bdf.printframe(frame)
|
||||||
|
|
||||||
|
udp_server.close()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue