add folder

This commit is contained in:
callebtc 2023-02-13 15:28:47 +01:00
parent e8c9950cb0
commit f03284d4ea
29 changed files with 1171 additions and 0 deletions

0
nostr/client/__init__.py Normal file
View file

Binary file not shown.

Binary file not shown.

Binary file not shown.

41
nostr/client/cbc.py Normal file
View file

@ -0,0 +1,41 @@
from Cryptodome import Random
from Cryptodome.Cipher import AES
plain_text = "This is the text to encrypts"
# encrypted = "7mH9jq3K9xNfWqIyu9gNpUz8qBvGwsrDJ+ACExdV1DvGgY8q39dkxVKeXD7LWCDrPnoD/ZFHJMRMis8v9lwHfNgJut8EVTMuJJi8oTgJevOBXl+E+bJPwej9hY3k20rgCQistNRtGHUzdWyOv7S1tg==".encode()
# iv = "GzDzqOVShWu3Pl2313FBpQ==".encode()
key = bytes.fromhex("3aa925cb69eb613e2928f8a18279c78b1dca04541dfd064df2eda66b59880795")
BLOCK_SIZE = 16
class AESCipher(object):
"""This class is compatible with crypto.createCipheriv('aes-256-cbc')
"""
def __init__(self, key=None):
self.key = key
def pad(self, data):
length = BLOCK_SIZE - (len(data) % BLOCK_SIZE)
return data + (chr(length) * length).encode()
def unpad(self, data):
return data[: -(data[-1] if type(data[-1]) == int else ord(data[-1]))]
def encrypt(self, plain_text):
cipher = AES.new(self.key, AES.MODE_CBC)
b = plain_text.encode("UTF-8")
return cipher.iv, cipher.encrypt(self.pad(b))
def decrypt(self, iv, enc_text):
cipher = AES.new(self.key, AES.MODE_CBC, iv=iv)
return self.unpad(cipher.decrypt(enc_text).decode("UTF-8"))
if __name__ == "__main__":
aes = AESCipher(key=key)
iv, enc_text = aes.encrypt(plain_text)
dec_text = aes.decrypt(iv, enc_text)
print(dec_text)

138
nostr/client/client.py Normal file
View file

@ -0,0 +1,138 @@
from typing import *
import ssl
import time
import json
import os
import base64
from ..event import Event
from ..relay_manager import RelayManager
from ..message_type import ClientMessageType
from ..key import PrivateKey, PublicKey
from ..filter import Filter, Filters
from ..event import Event, EventKind, EncryptedDirectMessage
from ..relay_manager import RelayManager
from ..message_type import ClientMessageType
# from aes import AESCipher
from . import cbc
class NostrClient:
relays = [
"wss://lnbits.link/nostrrelay/client",
"wss://nostr-pub.wellorder.net",
"wss://nostr.zebedee.cloud",
"wss://nodestr.fmt.wiz.biz",
] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr"
relay_manager = RelayManager()
private_key: PrivateKey
public_key: PublicKey
def __init__(self, privatekey_hex: str = "", relays: List[str] = [], connect=True):
self.generate_keys(privatekey_hex)
if len(relays):
self.relays = relays
if connect:
self.connect()
def connect(self):
for relay in self.relays:
self.relay_manager.add_relay(relay)
self.relay_manager.open_connections(
{"cert_reqs": ssl.CERT_NONE}
) # NOTE: This disables ssl certificate verification
def close(self):
self.relay_manager.close_connections()
def generate_keys(self, privatekey_hex: str = None):
pk = bytes.fromhex(privatekey_hex) if privatekey_hex else None
self.private_key = PrivateKey(pk)
self.public_key = self.private_key.public_key
def post(self, message: str):
event = Event(message, self.public_key.hex(), kind=EventKind.TEXT_NOTE)
self.private_key.sign_event(event)
event_json = event.to_message()
# print("Publishing message:")
# print(event_json)
self.relay_manager.publish_message(event_json)
def get_post(
self, sender_publickey: PublicKey = None, callback_func=None, filter_kwargs={}
):
filter = Filter(
authors=[sender_publickey.hex()] if sender_publickey else None,
kinds=[EventKind.TEXT_NOTE],
**filter_kwargs,
)
filters = Filters([filter])
subscription_id = os.urandom(4).hex()
self.relay_manager.add_subscription(subscription_id, filters)
request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
message = json.dumps(request)
self.relay_manager.publish_message(message)
while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
if callback_func:
callback_func(event_msg.event)
time.sleep(0.1)
def dm(self, message: str, to_pubkey: PublicKey):
dm = EncryptedDirectMessage(
recipient_pubkey=to_pubkey.hex(), cleartext_content=message
)
self.private_key.sign_event(dm)
self.relay_manager.publish_event(dm)
def get_dm(self, sender_publickey: PublicKey, callback_func=None):
filters = Filters(
[
Filter(
kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE],
pubkey_refs=[sender_publickey.hex()],
)
]
)
subscription_id = os.urandom(4).hex()
self.relay_manager.add_subscription(subscription_id, filters)
request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
message = json.dumps(request)
self.relay_manager.publish_message(message)
while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
if "?iv=" in event_msg.event.content:
try:
shared_secret = self.private_key.compute_shared_secret(
event_msg.event.public_key
)
aes = cbc.AESCipher(key=shared_secret)
enc_text_b64, iv_b64 = event_msg.event.content.split("?iv=")
iv = base64.decodebytes(iv_b64.encode("utf-8"))
enc_text = base64.decodebytes(enc_text_b64.encode("utf-8"))
dec_text = aes.decrypt(iv, enc_text)
if callback_func:
callback_func(event_msg.event, dec_text)
except:
pass
break
time.sleep(0.1)
def subscribe(self, callback_func=None):
while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
if callback_func:
callback_func(event_msg.event)
time.sleep(0.1)