from banjo.urls import route_get, route_post from app.models import User, Message from banjo.http import NotFound, NotAllowed from datetime import datetime from cryptography.exceptions import InvalidSignature import sys sys.path.insert(0, "..") from encryption import PrivateKey, PublicKey def get_or_create_admin(): if not User.objects.filter(name='subrosa_admin').exists(): private_key = PrivateKey.generate() public_key = private_key.get_public_key() admin = User(name='subrosa_admin', public_key=str(public_key)) admin.save() return User.objects.get(name='subrosa_admin') @route_post("users/new", args={'name': str, 'public_key': str}) def create_user(params): "Creates a new user" admin = get_or_create_admin() try: public_key = PublicKey.load(params['public_key']) except (ValueError, FileNotFoundError): raise NotAllowed("Invalid public key") try: new_user = User.from_dict(params) new_user.save() welcome = "Welcome to SubRosa! Please be a good community member." message = Message( sender=admin, recipient=new_user, ciphertext=public_key.encrypt(welcome) ) message.save() return new_user.to_dict() except: raise NotAllowed(f"Username {params['name']} is already in use.") @route_get("users", args={'name': str}) def get_user(params): "Get a user's public key" try: user = User.objects.get(name=params['name']) return user.to_dict() except User.DoesNotExist: raise NotFound(f"There is no user named {params['name']}") @route_get("messages", args={'name': str}) def get_messages(params): "Return all the messages for a user" try: recipient = User.objects.get(name=params['name']) except User.DoesNotExist: raise NotFound(f"There is no user named {params['name']}") messages = Message.objects.filter(recipient=recipient) return {'messages': [m.to_dict() for m in messages]} @route_get("messages/send", args={'sender': str, 'recipient': str, 'ciphertext': str, 'time_sent': str, 'time_sent_signature': str}) def send_message(params): """Securely sends an encrypted message from `sender` to `recipient` Sender and recipient should be recognized usernames. Time sent should be the time the message was sent in isoformat. Auth should be the time sent, encrypted with the sender's private key. The ciphertext should be encrypted with the recipient's public key. """ try: sender = User.objects.get(name=params['sender']) recipient = User.objects.get(name=params['recipient']) except User.DoesNotExist: raise NotFound(f"User not found.") try: time_sent = datetime.fromisoformat(params['time_sent']) except ValueError: raise NotAllowed(f"Time sent ({params['time_sent']}) must be in isoformat") now = datetime.utcnow() if (now - time_sent).seconds > 10: raise NotAllowed(f"The message is too old. Time sent must be within ten seconds") sender_public_key = PublicKey.load(sender.public_key) try: sender_public_key.verify_signature(params['time_sent'], params['time_sent_signature']) except InvalidSignature: raise NotAllowed("Invalid signature.") message = Message( sender=sender, recipient=recipient, ciphertext=params['ciphertext'], ) message.save() return message.to_dict()