Add encryption module
This commit is contained in:
22
server/app/models.py
Normal file
22
server/app/models.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from banjo.models import (
|
||||
Model,
|
||||
StringField,
|
||||
ForeignKey,
|
||||
BooleanField,
|
||||
)
|
||||
|
||||
class User(Model):
|
||||
name = StringField(unique=True)
|
||||
public_key = StringField(unique=True)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'name': self.name,
|
||||
'public_key': self.public_key,
|
||||
}
|
||||
|
||||
class Message(Model):
|
||||
sender = ForeignKey(User, related_name="messages_sent")
|
||||
recipient = ForeignKey(User, related_name="messages_received")
|
||||
ciphertext = StringField()
|
||||
read = BooleanField()
|
62
server/app/views.py
Normal file
62
server/app/views.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from banjo.urls import route_get, route_post
|
||||
from app.models import User, Message
|
||||
from banjo.http import NotFound, NotAllowed
|
||||
from datetime import datetime
|
||||
import rsa
|
||||
|
||||
@route_post("users/new", args={'name': str, 'public_key': str})
|
||||
def create_user(params):
|
||||
"Creates a new user"
|
||||
try:
|
||||
new_user = User.from_dict(params)
|
||||
new_user.save()
|
||||
return new_user.to_dict()
|
||||
except:
|
||||
raise NotAllowed("Username and public key must be unique.")
|
||||
|
||||
@route_get("users", args={'name': str})
|
||||
def get_user(params):
|
||||
"Get a user's public key"
|
||||
try:
|
||||
user = User.objects.get(name=params['name'])
|
||||
return user.to_dict()
|
||||
except User.DoesNotExist:
|
||||
raise NotFound(f"There is no user named {params['name']}")
|
||||
|
||||
@route_get("messages", args={'name': str})
|
||||
def get_messages(params):
|
||||
"Return all the messages for a user"
|
||||
try:
|
||||
user = User.objects.get(name=params['name'])
|
||||
except User.DoesNotExist:
|
||||
raise NotFound(f"There is no user named {params['name']}")
|
||||
messages = Message.objects.filter(user=user)
|
||||
return {'messages': [m.to_dict() for m in messages]}
|
||||
|
||||
@route_get("messages/send", args={'sender': str, 'recipient': str, 'ciphertext': str,
|
||||
'time_sent': str, 'auth': str})
|
||||
def send_message(params):
|
||||
"""Securely sends an encrypted message from `sender` to `recipient`
|
||||
Sender and recipient should be recognized usernames.
|
||||
Time sent should be the time the message was sent in isoformat.
|
||||
Auth should be the time sent, encrypted with the sender's private key.
|
||||
The ciphertext should be encrypted with the recipient's public key.
|
||||
"""
|
||||
try:
|
||||
sender = User.objects.get(name=params['sender'])
|
||||
recipient = User.objects.get(name=['recipient'])
|
||||
except User.DoesNotExist:
|
||||
raise NotFound(f"There is no user named {params['name']}")
|
||||
try:
|
||||
time_sent = datetime.fromisoformat(params['time_sent'])
|
||||
except ValueError:
|
||||
raise NotAllowed(f"Time sent ({params['time_sent']}) must be in isoformat")
|
||||
if (datetime.now() - time_sent).seconds > 10:
|
||||
raise NotAllowed(f"The message is too old. Time sent must be within ten seconds")
|
||||
if not
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
167
server/encryption.py
Normal file
167
server/encryption.py
Normal file
@@ -0,0 +1,167 @@
|
||||
# This module provides higher-level wrappers around cryptography,
|
||||
# handling intermediate steps and selecting parameters. This module
|
||||
# also allows users to work with strings instead of bytes, as users
|
||||
# will not yet have learned about encoding and decoding strings.
|
||||
|
||||
# NOTE: DO NOT USE THESE CLASSES IN SERIOUS APPLICATIONS.
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from base64 import b64encode, b64decode
|
||||
from pathlib import Path
|
||||
|
||||
PUBLIC_EXPONENT = 65537
|
||||
KEY_SIZE = 2048
|
||||
|
||||
class PrivateKey:
|
||||
|
||||
@classmethod
|
||||
def load(cls, pem):
|
||||
"""Loads an existing private key.
|
||||
When `pem` is of type bytes, assumes it is a pem serialization.
|
||||
Otherwise, assumes `pem` is a path to a pem file.
|
||||
"""
|
||||
if isinstance(pem, bytes):
|
||||
key = serialization.load_pem_private_key(pem)
|
||||
elif isinstance(pem, (str, Path)):
|
||||
with open(pem, 'rb') as key_file:
|
||||
key = serialization.load_pem_private_key(
|
||||
key_file.read(),
|
||||
password=None
|
||||
)
|
||||
else:
|
||||
raise TypeError("PrivateKey.load requires pem bytes or a file path")
|
||||
return PrivateKey(key)
|
||||
|
||||
@classmethod
|
||||
def generate(cls):
|
||||
"Generates a new private key."
|
||||
key = rsa.generate_private_key(
|
||||
public_exponent=PUBLIC_EXPONENT,
|
||||
key_size=KEY_SIZE,
|
||||
)
|
||||
return PrivateKey(key)
|
||||
|
||||
def __init__(self, key):
|
||||
if not isinstance(key, rsa.RSAPrivateKey):
|
||||
err = (
|
||||
"PrivateKey is initialized with a rsa.RSAPrivateKey. " +
|
||||
"You probably want to use PrivateKey.load or PrivateKey.generate instead."
|
||||
)
|
||||
raise ValueError(err)
|
||||
self.key = key
|
||||
|
||||
def __str__(self):
|
||||
"Returns a string representation of the key in PEM format"
|
||||
return self.key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
).decode('utf8')
|
||||
|
||||
def __repr__(self):
|
||||
return "<PrivateKey>"
|
||||
|
||||
def save(self, filepath):
|
||||
"Saves the key as a pem file"
|
||||
Path(filepath).write_text(str(self))
|
||||
|
||||
def get_public_key(self):
|
||||
"Gets the matching public key."
|
||||
return PublicKey(self.key.public_key())
|
||||
|
||||
def sign(self, message):
|
||||
"""Create an encrypted signature of the message.
|
||||
Anyone with the public key can verify that the signer had the matching
|
||||
private key.
|
||||
"""
|
||||
message_bytes = message.encode('utf8')
|
||||
signature = self.key.sign(
|
||||
message_bytes,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()),
|
||||
salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256()
|
||||
)
|
||||
return b64encode(signature).decode('ascii')
|
||||
|
||||
def decrypt(self, ciphertext):
|
||||
"""Decrypts a message encrypted with the matching PublicKey.
|
||||
"""
|
||||
ciphertext_bytes = b64decode(ciphertext.encode('ascii'))
|
||||
plaintext = self.key.decrypt(
|
||||
ciphertext_bytes,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
return plaintext.decode('utf8')
|
||||
|
||||
class PublicKey:
|
||||
|
||||
@classmethod
|
||||
def load(self, pem):
|
||||
"""Loads an existing public key.
|
||||
When `pem` is of type bytes, assumes it is a pem serialization.
|
||||
Otherwise, assumes `pem` is a path to a pem file.
|
||||
"""
|
||||
if isinstance(pem, bytes):
|
||||
key = serialization.load_pem_public_key(pem)
|
||||
elif isinstance(pem, (str, Path)):
|
||||
with open(pem, 'rb') as key_file:
|
||||
key = serialization.load_pem_public_key(key_file.read())
|
||||
else:
|
||||
raise TypeError("PublicKey.load requires pem bytes or a file path")
|
||||
return PublicKey(key)
|
||||
|
||||
def __init__(self, key):
|
||||
self.key = key
|
||||
|
||||
def __str__(self):
|
||||
"Returns a string representation of the key in PEM format"
|
||||
return self.key.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
).decode('utf8')
|
||||
|
||||
def __repr__(self):
|
||||
return "<PublicKey>"
|
||||
|
||||
def save(self, filepath):
|
||||
"""Saves this key to a file in PEM format.
|
||||
"""
|
||||
Path(filepath).write_text(str(self))
|
||||
|
||||
def verify_signature(self, message, signature):
|
||||
"""Verifies that `message` was signed using the matching private key.
|
||||
"""
|
||||
message_bytes = message.encode('utf8')
|
||||
signature_bytes = b64decode(signature.encode('ascii'))
|
||||
self.key.verify(
|
||||
signature_bytes,
|
||||
message_bytes,
|
||||
padding.PSS(
|
||||
mgf=padding.MGF1(hashes.SHA256()),
|
||||
salt_length=padding.PSS.MAX_LENGTH
|
||||
),
|
||||
hashes.SHA256()
|
||||
)
|
||||
|
||||
def encrypt(self, message):
|
||||
"""Encrypts a message so it can be decrypted with the matching PrivateKey.
|
||||
If encryption fails, your message is probably too long.
|
||||
"""
|
||||
message_bytes = message.encode('utf8')
|
||||
ciphertext = self.key.encrypt(
|
||||
message_bytes,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None
|
||||
)
|
||||
)
|
||||
return b64encode(ciphertext).decode('ascii')
|
Reference in New Issue
Block a user