Initial commit

This commit is contained in:
jaseg 2020-01-20 11:52:25 +01:00
commit 5291970420
4 changed files with 1888 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
*.mpy
main.py
.ipynb_checkpoints

41
Makefile Normal file
View file

@ -0,0 +1,41 @@
ESPTOOL = esptool.py
# TOOL can be 'mpfshell' or 'ampy'
TOOL = mpfshell
FIRMWARE = esp32-idf3-20191220-v1.12.bin
PORT = /dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0
SHELL = /bin/sh
MPY := $(patsubst %.py,%.mpy,$(wildcard *.py))
.PHONY: erase flash all install clean repl help sanitize
all: $(MPY)
%.mpy: %.py ## Compile all python files
mpy-cross -march=xtensa -o $@ $<
install: $(MPY) ## Copy all compiled python files to the ESP32
if [ "${TOOL}" = "mpfshell" ]; then ${TOOL} -n -c "open ${PORT:/dev/%=%}; mput ${MPY}"; fi
if [ "${TOOL}" = "ampy" ]; then ${TOOL} -p ${PORT} put *.mpy; fi
sanitize: main.py.example
%.py.example: %.py
sed -n '/#### CONFIG ####/,/#### END ####/{/^#/p;d};p' $< > $@
clean: ## Delete all compiled python files
rm -f *.mpy
erase: ## Erase the flash of the ESP32
${ESPTOOL} --chip esp32 --port ${PORT} erase_flash
flash: ## Download micropython firmware and flash to ESP32
curl -sSLO https://micropython.org/resources/firmware/${FIRMWARE}
${ESPTOOL} --chip esp32 --port ${PORT} --baud 460800 write_flash -z 0x1000 ${FIRMWARE}
repl: ## Start repl shell
if [ "${TOOL}" = "mpfshell" ]; then ${TOOL} -n -o ${BOARD} -c repl; fi
if [ "${TOOL}" != "mpfshell" ]; then screen /dev/${BOARD} ; fi
help: ## Show this help
grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'

125
main.py.example Normal file
View file

@ -0,0 +1,125 @@
from machine import Pin, ADC, Timer
import network
import time
import ujson
from uhashlib import sha256
import binascii
import math
import urequests
######################################################## CONFIG ########################################################
# Wifi settings:
# WIFI_ESSID = 'Your wifi ESSID (name)'
# WIFI_PSK = 'Your wifi key'
#
# Notification settings:
# NOTIFICATION_URL = 'https://automation.jaseg.de/notify/klingel'
# NOTIFICATION_SECRET = b'Your notification proxy secret for this endpoint'
#
# NOTIFICATION_COOLDOWN = 60 # how long to wait after sending a notification before sending the next, in seconds
#
# Detection settings
# MEAN_LEN = 8 # Window length for DC offset determination in seconds (1024ms to be exact)
# RMS_THRESHOLD = 1000 # Threshold for rms detection threshold over 1s window in ADC counts
######################################################### END ##########################################################
def wifi_connect():
iface = network.WLAN(network.STA_IF)
if not iface.isconnected():
print('Connecting to wifi... ')
iface.active(True)
iface.connect(WIFI_ESSID, WIFI_PSK)
for i in range(20):
if iface.isconnected():
print('Wifi connected. IP config: ', iface.ifconfig())
break
time.sleep(0.5)
else:
print("Couldn't connect to wifi.")
buf = [0] * 1024
capture = None
mean = 0
rms = 0
sample_tim = Timer(-1)
def start_sampling():
global sample_tim
buf_pos = 0
buf_sum = 0
mean_acc = []
adc = ADC(Pin(34))
adc.atten(ADC.ATTN_11DB)
def sample_cb(tim):
global buf, mean, rms, capture
nonlocal adc, buf_pos, buf_sum, mean_acc
val = adc.read()
buf[buf_pos] = val
buf_sum += val
buf_pos += 1
if buf_pos == len(buf):
buf_pos = 0
mean_acc = [buf_sum/len(buf)] + mean_acc[:MEAN_LEN-1]
mean = sum(mean_acc)/len(mean_acc)
buf_sum = 0
rms = math.sqrt( sum( (x-mean)**2 for x in buf )/len(buf) )
capture = list(buf) # Make a copy
sample_tim.init(period=1, mode=Timer.PERIODIC, callback=sample_cb) # period in ms
def uhmac(key, data):
blocksize = 64
key += bytes(64 - len(key))
tx = lambda s, x: bytes( b ^ x for b in s )
outer = sha256(tx(key, 0x5C))
inner = sha256(tx(key, 0x36))
inner.update(data)
outer.update(inner.digest())
return outer.digest()
def usign(secret, payload=None, seq=None):
payload = {'time': int(time.time()), 'd': payload}
if seq is not None:
payload['seq'] = seq
payload = ujson.dumps(payload).encode()
auth = binascii.hexlify(uhmac(secret, payload))
return ujson.dumps({'payload': payload, 'auth': auth})
def notify(**kwargs):
data = usign(NOTIFICATION_SECRET, kwargs)
print(time.time(), 'Notifying', NOTIFICATION_URL)
urequests.post(NOTIFICATION_URL, data=data, headers={'Content-Type': 'application/json'})
def klingel_notify(rms, capture):
notify(rms=rms, capture=capture)
def loop():
global rms, capture
while True:
if rms > RMS_THRESHOLD:
wifi_connect()
old_capture = capture
rms = 0
while rms == 0:
time.sleep(0.1)
rms = 0
klingel_notify(rms, [old_capture, capture])
time.sleep(NOTIFICATION_COOLDOWN)
time.sleep(0.1)
wifi_connect()
start_sampling()
loop()

1719
viz.ipynb Normal file

File diff suppressed because one or more lines are too long