generated from mwc/lab_subrosa
96 lines
3.4 KiB
Python
96 lines
3.4 KiB
Python
from banjo.urls import route_get, route_post
|
|
from app.models import User, Message
|
|
from banjo.http import NotFound, NotAllowed
|
|
from datetime import datetime
|
|
from cryptography.exceptions import InvalidSignature
|
|
import sys
|
|
sys.path.insert(0, "..")
|
|
from encryption import PrivateKey, PublicKey
|
|
|
|
def get_or_create_admin():
|
|
if not User.objects.filter(name='subrosa_admin').exists():
|
|
private_key = PrivateKey.generate()
|
|
public_key = private_key.get_public_key()
|
|
admin = User(name='subrosa_admin', public_key=str(public_key))
|
|
admin.save()
|
|
return User.objects.get(name='subrosa_admin')
|
|
|
|
@route_post("users/new", args={'name': str, 'public_key': str})
|
|
def create_user(params):
|
|
"Creates a new user"
|
|
admin = get_or_create_admin()
|
|
try:
|
|
public_key = PublicKey.load(params['public_key'])
|
|
except (ValueError, FileNotFoundError):
|
|
raise NotAllowed("Invalid public key")
|
|
try:
|
|
new_user = User.from_dict(params)
|
|
new_user.save()
|
|
welcome = "Welcome to SubRosa! Please be a good community member."
|
|
message = Message(
|
|
sender=admin,
|
|
recipient=new_user,
|
|
ciphertext=public_key.encrypt(welcome)
|
|
)
|
|
message.save()
|
|
return new_user.to_dict()
|
|
except:
|
|
raise NotAllowed(f"Username {params['name']} is already in use.")
|
|
|
|
@route_get("users", args={'name': str})
|
|
def get_user(params):
|
|
"Get a user's public key"
|
|
try:
|
|
user = User.objects.get(name=params['name'])
|
|
return user.to_dict()
|
|
except User.DoesNotExist:
|
|
raise NotFound(f"There is no user named {params['name']}")
|
|
|
|
@route_get("messages", args={'name': str})
|
|
def get_messages(params):
|
|
"Return all the messages for a user"
|
|
try:
|
|
recipient = User.objects.get(name=params['name'])
|
|
except User.DoesNotExist:
|
|
raise NotFound(f"There is no user named {params['name']}")
|
|
messages = Message.objects.filter(recipient=recipient)
|
|
return {'messages': [m.to_dict() for m in messages]}
|
|
|
|
@route_get("messages/send", args={'sender': str, 'recipient': str, 'ciphertext': str,
|
|
'time_sent': str, 'time_sent_signature': str})
|
|
def send_message(params):
|
|
"""Securely sends an encrypted message from `sender` to `recipient`
|
|
Sender and recipient should be recognized usernames.
|
|
Time sent should be the time the message was sent in isoformat.
|
|
Auth should be the time sent, encrypted with the sender's private key.
|
|
The ciphertext should be encrypted with the recipient's public key.
|
|
"""
|
|
try:
|
|
sender = User.objects.get(name=params['sender'])
|
|
recipient = User.objects.get(name=params['recipient'])
|
|
except User.DoesNotExist:
|
|
raise NotFound(f"User not found.")
|
|
try:
|
|
time_sent = datetime.fromisoformat(params['time_sent'])
|
|
except ValueError:
|
|
raise NotAllowed(f"Time sent ({params['time_sent']}) must be in isoformat")
|
|
now = datetime.utcnow()
|
|
if (now - time_sent).seconds > 10:
|
|
raise NotAllowed(f"The message is too old. Time sent must be within ten seconds")
|
|
sender_public_key = PublicKey.load(sender.public_key)
|
|
try:
|
|
sender_public_key.verify_signature(params['time_sent'], params['time_sent_signature'])
|
|
except InvalidSignature:
|
|
raise NotAllowed("Invalid signature.")
|
|
message = Message(
|
|
sender=sender,
|
|
recipient=recipient,
|
|
ciphertext=params['ciphertext'],
|
|
)
|
|
message.save()
|
|
return message.to_dict()
|
|
|
|
|
|
|
|
|