Crypto v2 draft working

This commit is contained in:
jaseg 2020-03-10 12:20:55 +01:00
parent 6880468862
commit 0cd07d397f
8 changed files with 211 additions and 135 deletions

View file

@ -30,7 +30,7 @@ int main(int argc, char **argv) {
uint8_t auth_key[16];
for (size_t i=0; argv[1][i+0] != '\0' && argv[1][i+1] != '\0'; i+= 2) {
for (size_t i=0; argv[1][i+0] != '\0' && argv[1][i+1] != '\0' && i/2<sizeof(auth_key); i+= 2) {
char buf[3] = { argv[1][i+0], argv[1][i+1], 0};
char *endptr;
auth_key[i/2] = strtoul(buf, &endptr, 16);

View file

@ -0,0 +1,46 @@
#!/usr/bin/env python3
import subprocess
from os import path
import binascii
import re
import presig_gen
def do_test(domain, value, height, root_key, binary, expect_fail=False):
auth = presig_gen.gen_at_height(domain, value, height, root_key)
auth = binascii.hexlify(auth).decode()
output = subprocess.check_output([binary, auth])
*lines, rc_line = output.decode().splitlines()
rc = int(re.match('^rc=(\d+)$', rc_line).group(1))
assert expect_fail == (rc == 0)
def run_tests(root_key, max_height, binary):
for domain, value in {
'all': 'all',
'vendor': presig_gen.TEST_VENDOR,
'series': presig_gen.TEST_SERIES,
'country': presig_gen.TEST_COUNTRY,
'region': presig_gen.TEST_REGION,
}.items():
for height in range(max_height):
do_test(domain, value, height, root_key, binary)
do_test(domain, 'fail', height, root_key, binary, expect_fail=True)
do_test('fail', 'fail', height, root_key, binary, expect_fail=True)
do_test('', '', height, root_key, binary, expect_fail=True)
do_test(domain, value, max_height, root_key, binary, expect_fail=True)
do_test(domain, value, max_height+1, root_key, binary, expect_fail=True)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('keyfile', help='Root key file')
parser.add_argument('max_height', type=int, default=8, nargs='?', help='Height of generated prekeys')
default_binary = path.abspath(path.join(path.dirname(__file__), '../build/tools/crypto_test'))
parser.add_argument('binary', default=default_binary, nargs='?', help='crypto_test binary to use')
args = parser.parse_args()
with open(args.keyfile, 'r') as f:
root_key = binascii.unhexlify(f.read().strip())
run_tests(root_key, args.max_height, args.binary)

View file

@ -4,19 +4,14 @@ import os
import sys
import textwrap
import uuid
import hashlib
import hmac
import binascii
import sqlite3
import time
from datetime import datetime
import nacl.signing
import nacl.encoding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
LINKING_KEY_SIZE = 16
PRESIG_VERSION = '000.001'
DOMAINS = ['all', 'country', 'region', 'vendor', 'series']
def format_hex(data, indent=4, wrap=True):
indent = ' '*indent
@ -28,48 +23,47 @@ def format_hex(data, indent=4, wrap=True):
return f'{{\n{par}\n}}'
return par
def domain_string(domain_name, value, serial):
return f'smart reset domain string v{PRESIG_VERSION}: domain:{domain_name}={value}@{serial}'
def domain_string(domain, value):
return f'smart reset domain string v{PRESIG_VERSION}: domain:{domain}={value}'
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('keyfile', help='Key file to use')
parser.add_argument('presig_db', nargs='?', help='sqlite3 dbfile for generated presig authorization keys')
parser.add_argument('-g', '--generate', action='store_true', help='Generate signing keypair')
parser.add_argument('-v', '--vendor', type=str, default='Darthenschmidt Cyberei und Verschleierungstechnik GmbH', help='Vendor name for vendor domain')
parser.add_argument('-s', '--series', type=str, default='Frobnicator v0.23.7', help='Series identifier for series domain')
parser.add_argument('-r', '--region', type=str, default='Neuland', help='Region name for region domain')
parser.add_argument('-c', '--country', type=str, default='Germany', help='Country name for country domain')
parser.add_argument('-p', '--start-serial', type=int, default=0, help='First presig serial number to use')
parser.add_argument('-n', '--presig-count', type=int, default=3, help='Number of presigs to generate')
parser.add_argument('-i', '--iv', type=str, default='safety reset oob presig iv', help='IV for presig generation')
args = parser.parse_args()
def keygen_cmd(args):
if os.path.exists(args.keyfile) and not args.force:
print("Error: keyfile already exists. We won't overwrite it. Instead please remove it manually.",
file=sys.stderr)
return 1
if args.generate:
if os.path.exists(args.keyfile):
print("Error: keyfile already exists. We won't overwrite it. Instead please remove it manually.",
file=sys.stderr)
sys.exit(1)
signing_key = nacl.signing.SigningKey.generate()
with open(args.keyfile, 'wb') as f:
f.write(signing_key.encode(encoder=nacl.encoding.Base64Encoder))
f.write(b'\n')
sys.exit(0)
root_key = os.urandom(LINKING_KEY_SIZE)
with open(args.keyfile, 'wb') as f:
f.write(binascii.hexlify(root_key))
f.write(b'\n')
return 0
def gen_at_height(domain, value, height, key):
# nanananananana BLOCKCHAIN!
ds = domain_string(domain, value).encode('utf-8')
for height in range(height+1):
key = hmac.digest(key, ds, 'sha512')[:LINKING_KEY_SIZE]
return key
def auth_cmd(args):
with open(args.keyfile, 'r') as f:
signing_key = nacl.signing.SigningKey(f.read().strip(), encoder=nacl.encoding.Base64Encoder)
pubkey_bytes = signing_key.verify_key.encode(encoder=nacl.encoding.RawEncoder)
pubkey_hash = hashlib.sha512(pubkey_bytes).digest()[:16]
root_key = binascii.unhexlify(f.read().strip())
if not args.presig_db:
print('The presig_db parameter is required.', file=sys.stderr)
sys.exit(1)
vals = [ (domain, getattr(args, domain)) for domain in DOMAINS if getattr(args, domain) is not None ]
if not vals:
vals = [('all', 'all')]
for domain, value in vals:
auth = gen_at_height(domain, value, args.height, root_key)
print(f'{domain}="{value}" @{args.height}: {binascii.hexlify(auth).decode()}')
db = sqlite3.connect(args.presig_db)
db.execute('CREATE TABLE IF NOT EXISTS presig_authkey (timestamp, pubkey_hash, bundle_id, presig_ver, domain, value, serial, authkey)')
bundle_id = uuid.uuid4().bytes
def prekey_cmd(args):
with open(args.keyfile, 'r') as f:
root_key = binascii.unhexlify(f.read().strip())
print('#include <stdint.h>')
print('#include <assert.h>')
@ -77,67 +71,71 @@ if __name__ == '__main__':
print('#include "crypto.h"')
print()
bundle_id = uuid.uuid4().bytes
print(f'/* bundle id {binascii.hexlify(bundle_id).decode()} */')
print(f'uint8_t presig_bundle_id[16] = {format_hex(bundle_id)};')
print(f'int presig_first_serial = {args.start_serial};')
print()
print(f'/* generated on {datetime.now()} */')
print(f'uint64_t bundle_timestamp = {int(time.time())};')
print()
print(f'int presig_height = {args.max_height};')
print()
print(f'uint8_t oob_trigger_pubkey[crypto_sign_PUBLICKEYBYTES] = {format_hex(pubkey_bytes)};')
print()
print('uint8_t presig_messages[_TRIGGER_DOMAIN_COUNT][PRESIG_STORE_SIZE][PRESIG_MSG_LEN] = {')
device_domains = {
'all': 'all',
'country': args.country,
'region': args.region,
'vendor': args.vendor,
'series': args.series
}
presigs = { dom: [] for dom in device_domains }
for dom, val in device_domains.items():
print(' {')
for i in range(args.presig_count):
serial = args.start_serial + i
ds = domain_string(dom, val, serial)
ds_hash = hashlib.sha512(ds.encode()).digest()[:16]
presigs[dom].append((ds_hash, val, serial))
print(f' {{ /* "{ds}" */')
print(format_hex(ds_hash, indent=8, wrap=False))
print(f' }},')
print(' },')
print('const char *presig_domain_strings[_TRIGGER_DOMAIN_COUNT] = {')
for domain in DOMAINS:
ds = domain_string(domain, getattr(args, domain))
assert '"' not in ds
print(f' [TRIGGER_DOMAIN_{domain.upper()}] = "{ds}",')
print('};')
print()
presig_iv = hashlib.sha512(args.iv.encode()).digest()[:16]
print(f'uint8_t oob_presig_iv[16] = {{ /* sha512("{args.iv}")[:16] */')
print(format_hex(presig_iv, wrap=False))
print(f'}};')
print()
print('uint8_t presig_store[_TRIGGER_DOMAIN_COUNT][PRESIG_STORE_SIZE][crypto_sign_BYTES] = {')
for dom, hashes in presigs.items():
print(f' {{ /* domain {dom} */')
for ds_hash, val, serial in hashes:
authkey = os.urandom(16)
cipher = Cipher(algorithms.AES(authkey), modes.CTR(presig_iv), backend=default_backend())
enc = cipher.encryptor()
ciphertext = enc.update(ds_hash)
assert len(enc.finalize()) == 0
with db:
db.execute('INSERT INTO presig_authkey VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
(int(time.time()*1000), pubkey_hash, binascii.hexlify(bundle_id).decode(), PRESIG_VERSION, dom,
print(format_hex(ciphertext, indent=8, wrap=False))
print(f' }},')
print(f' }},')
print(f'}};')
print('uint8_t presig_keys[_TRIGGER_DOMAIN_COUNT][PRESIG_MSG_LEN] = {')
for domain in DOMAINS:
key = gen_at_height(domain, getattr(args, domain), args.max_height, root_key)
print(f' [TRIGGER_DOMAIN_{domain.upper()}] = {{{format_hex(key, indent=0, wrap=False)}}},')
print('};')
print()
print('static inline void __hack_asserts_only(void) {')
print(f' static_assert(_TRIGGER_DOMAIN_COUNT == {len(presigs)});')
print(f' static_assert(PRESIG_STORE_SIZE == {args.presig_count});')
print(f' static_assert(_TRIGGER_DOMAIN_COUNT == {len(DOMAINS)});')
print(f' static_assert(PRESIG_MSG_LEN == {LINKING_KEY_SIZE});')
print('}')
print()
TEST_VENDOR = 'Darthenschmidt Cyberei und Verschleierungstechnik GmbH'
TEST_SERIES = 'Frobnicator v0.23.7'
TEST_REGION = 'Neuland'
TEST_COUNTRY = 'Germany'
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('keyfile', help='Key file to use')
subparsers = parser.add_subparsers(title='subcommands')
keygen_parser = subparsers.add_parser('keygen', help='Generate a new key')
keygen_parser.add_argument('-f', '--force', action='store_true', help='Force overwriting existing keyfile')
keygen_parser.set_defaults(func=keygen_cmd)
auth_parser = subparsers.add_parser('auth', help='Generate one-time authentication string')
auth_parser.add_argument('height', type=int, help='Authentication string height, counting from 0 (root key)')
auth_parser.set_defaults(func=auth_cmd)
auth_parser.add_argument('-a', '--all', action='store_const', const='all', help='Vendor name for vendor domain')
auth_parser.add_argument('-v', '--vendor', type=str, nargs='?', const=test_vendor, help='Vendor name for vendor domain')
auth_parser.add_argument('-s', '--series', type=str, nargs='?', const=test_series, help='Series identifier for series domain')
auth_parser.add_argument('-r', '--region', type=str, nargs='?', const=test_region, help='Region name for region domain')
auth_parser.add_argument('-c', '--country', type=str, nargs='?', const=test_country, help='Country name for country domain')
prekey_parser = subparsers.add_parser('prekey', help='Generate prekey data .C source code file')
prekey_parser.add_argument('-m', '--max-height', type=int, default=8, help='Height of generated prekey')
prekey_parser.add_argument('-v', '--vendor', type=str, default=test_vendor, help='Vendor name for vendor domain')
prekey_parser.add_argument('-s', '--series', type=str, default=test_series, help='Series identifier for series domain')
prekey_parser.add_argument('-r', '--region', type=str, default=test_region, help='Region name for region domain')
prekey_parser.add_argument('-c', '--country', type=str, default=test_country, help='Country name for country domain')
prekey_parser.set_defaults(func=prekey_cmd, all='all')
args = parser.parse_args()
sys.exit(args.func(args))