generated from mwc/lab_subrosa
	
		
			
				
	
	
		
			96 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			96 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from banjo.urls import route_get, route_post
 | 
						|
from app.models import User, Message
 | 
						|
from banjo.http import NotFound, NotAllowed
 | 
						|
from datetime import datetime
 | 
						|
from cryptography.exceptions import InvalidSignature
 | 
						|
import sys
 | 
						|
sys.path.insert(0, "..")
 | 
						|
from encryption import PrivateKey, PublicKey
 | 
						|
 | 
						|
def get_or_create_admin():
 | 
						|
    if not User.objects.filter(name='subrosa_admin').exists():
 | 
						|
        private_key = PrivateKey.generate()
 | 
						|
        public_key = private_key.get_public_key()
 | 
						|
        admin = User(name='subrosa_admin', public_key=str(public_key))
 | 
						|
        admin.save()
 | 
						|
    return User.objects.get(name='subrosa_admin')
 | 
						|
 | 
						|
@route_post("users/new", args={'name': str, 'public_key': str})
 | 
						|
def create_user(params):
 | 
						|
    "Creates a new user"
 | 
						|
    admin = get_or_create_admin()
 | 
						|
    try:
 | 
						|
        public_key = PublicKey.load(params['public_key'])
 | 
						|
    except (ValueError, FileNotFoundError):
 | 
						|
        raise NotAllowed("Invalid public key")
 | 
						|
    try:
 | 
						|
        new_user = User.from_dict(params)
 | 
						|
        new_user.save()
 | 
						|
        welcome = "Welcome to SubRosa! Please be a good community member."
 | 
						|
        message = Message(
 | 
						|
            sender=admin, 
 | 
						|
            recipient=new_user,
 | 
						|
            ciphertext=public_key.encrypt(welcome)
 | 
						|
        )
 | 
						|
        message.save()
 | 
						|
        return new_user.to_dict()
 | 
						|
    except:
 | 
						|
        raise NotAllowed(f"Username {params['name']} is already in use.")
 | 
						|
 | 
						|
@route_get("users", args={'name': str})
 | 
						|
def get_user(params):
 | 
						|
    "Get a user's public key"
 | 
						|
    try:
 | 
						|
        user = User.objects.get(name=params['name'])
 | 
						|
        return user.to_dict()
 | 
						|
    except User.DoesNotExist:
 | 
						|
        raise NotFound(f"There is no user named {params['name']}")
 | 
						|
 | 
						|
@route_get("messages", args={'name': str})
 | 
						|
def get_messages(params):
 | 
						|
    "Return all the messages for a user"
 | 
						|
    try:
 | 
						|
        recipient = User.objects.get(name=params['name'])
 | 
						|
    except User.DoesNotExist:
 | 
						|
        raise NotFound(f"There is no user named {params['name']}")
 | 
						|
    messages = Message.objects.filter(recipient=recipient)
 | 
						|
    return {'messages': [m.to_dict() for m in messages]}
 | 
						|
 | 
						|
@route_get("messages/send", args={'sender': str, 'recipient': str, 'ciphertext': str,
 | 
						|
        'time_sent': str, 'time_sent_signature': str})
 | 
						|
def send_message(params):
 | 
						|
    """Securely sends an encrypted message from `sender` to `recipient`
 | 
						|
    Sender and recipient should be recognized usernames. 
 | 
						|
    Time sent should be the time the message was sent in isoformat.
 | 
						|
    Auth should be the time sent, encrypted with the sender's private key.
 | 
						|
    The ciphertext should be encrypted with the recipient's public key.
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        sender = User.objects.get(name=params['sender'])
 | 
						|
        recipient = User.objects.get(name=params['recipient'])
 | 
						|
    except User.DoesNotExist:
 | 
						|
        raise NotFound(f"User not found.")
 | 
						|
    try: 
 | 
						|
        time_sent = datetime.fromisoformat(params['time_sent'])
 | 
						|
    except ValueError:
 | 
						|
        raise NotAllowed(f"Time sent ({params['time_sent']}) must be in isoformat")
 | 
						|
    now = datetime.utcnow()
 | 
						|
    if (now - time_sent).seconds > 10:
 | 
						|
        raise NotAllowed(f"The message is too old. Time sent must be within ten seconds")
 | 
						|
    sender_public_key = PublicKey.load(sender.public_key)
 | 
						|
    try:
 | 
						|
        sender_public_key.verify_signature(params['time_sent'], params['time_sent_signature'])
 | 
						|
    except InvalidSignature:
 | 
						|
        raise NotAllowed("Invalid signature.")
 | 
						|
    message = Message(
 | 
						|
        sender=sender,
 | 
						|
        recipient=recipient,
 | 
						|
        ciphertext=params['ciphertext'],
 | 
						|
    )
 | 
						|
    message.save()
 | 
						|
    return message.to_dict()
 | 
						|
 | 
						|
        
 | 
						|
 | 
						|
 |