nostrmarket/helpers.py
2023-03-02 10:14:11 +02:00

81 lines
2.5 KiB
Python

import base64
import json
import secrets
from typing import Optional
import secp256k1
from cffi import FFI
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
def get_shared_secret(privkey: str, pubkey: str):
point = secp256k1.PublicKey(bytes.fromhex("02" + pubkey), True)
return point.ecdh(bytes.fromhex(privkey), hashfn=copy_x)
def decrypt_message(encoded_message: str, encryption_key) -> str:
encoded_data = encoded_message.split("?iv=")
encoded_content, encoded_iv = encoded_data[0], encoded_data[1]
iv = base64.b64decode(encoded_iv)
cipher = Cipher(algorithms.AES(encryption_key), modes.CBC(iv))
encrypted_content = base64.b64decode(encoded_content)
decryptor = cipher.decryptor()
decrypted_message = decryptor.update(encrypted_content) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_data = unpadder.update(decrypted_message) + unpadder.finalize()
return unpadded_data.decode()
def encrypt_message(message: str, encryption_key, iv: Optional[bytes]) -> str:
padder = padding.PKCS7(128).padder()
padded_data = padder.update(message.encode()) + padder.finalize()
iv = iv if iv else secrets.token_bytes(16)
cipher = Cipher(algorithms.AES(encryption_key), modes.CBC(iv))
encryptor = cipher.encryptor()
encrypted_message = encryptor.update(padded_data) + encryptor.finalize()
return f"{base64.b64encode(encrypted_message).decode()}?iv={base64.b64encode(iv).decode()}"
def sign_message_hash(private_key: str, hash: bytes) -> str:
privkey = secp256k1.PrivateKey(bytes.fromhex(private_key))
sig = privkey.schnorr_sign(hash, None, raw=True)
return sig.hex()
def test_decrypt_encrypt(encoded_message: str, encryption_key):
msg = decrypt_message(encoded_message, encryption_key)
# ecrypt using the same initialisation vector
iv = base64.b64decode(encoded_message.split("?iv=")[1])
ecrypted_msg = encrypt_message(msg, encryption_key, iv)
assert (
encoded_message == ecrypted_msg
), f"expected '{encoded_message}', but got '{ecrypted_msg}'"
print("### test_decrypt_encrypt", encoded_message == ecrypted_msg)
ffi = FFI()
@ffi.callback(
"int (unsigned char *, const unsigned char *, const unsigned char *, void *)"
)
def copy_x(output, x32, y32, data):
ffi.memmove(output, x32, 32)
return 1
def is_json(string: str):
try:
json.loads(string)
except ValueError as e:
return False
return True