import json from sqlite3 import IntegrityError from typing import Literal from fastapi import Depends, FastAPI, HTTPException, status from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware import queries from auth import Hasher, create_access_token, get_current_user from crypto import (deserialize_into_bytes, fernet_decrypt, fernet_encrypt, generate_random_encryption_key, generate_user_passkey, serialize_bytes) from models import Key, Secret, User, UserLogin app = FastAPI() origins = [ 'http://localhost', 'http://localhost:5173', "*" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=['*'], allow_headers=['*'] ) @app.get("/") async def root(): return {"'message'": "Hello World"} @app.post('/register') async def register(user: User): """Registers a user""" plain_text_password = user.password user.password = Hasher.get_password_hash(plain_text_password) try: user = queries.CREATE_USER(user.model_dump()) user_id = user['id'] except IntegrityError as e: raise HTTPException(status_code=400, detail="Username already in use") encryption_key = generate_random_encryption_key() salt, master_key = generate_user_passkey(plain_text_password) key = Key( user_id=user_id, encryption_key=fernet_encrypt(encryption_key, master_key).decode('utf-8'), encryption_key_salt=serialize_bytes(salt) ) try: queries.CREATE_KEY(key.model_dump()) except Exception as e: print('failed to create key', e) return {'user_id': user_id} @app.post('/login') async def login(user: UserLogin): """logs in the user""" login_exception = HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="username or password is incorrect" ) cur_user = queries.GET_USER_WITH_KEY(user.model_dump()) if cur_user is None: raise login_exception password_match = Hasher.verify_password(user.password, cur_user['password']) if not password_match: raise login_exception encrypted_encryption_key = cur_user['encryption_key'].encode() salt = deserialize_into_bytes(cur_user['encryption_key_salt']) _, master_key = generate_user_passkey(user.password, salt) encryption_key = fernet_decrypt(encrypted_encryption_key, master_key) access_token = create_access_token(subject=cur_user['id'], encryption_key=encryption_key) response = { 'message': 'authenticated', 'accessToken': access_token } return response @app.post("/secret") async def create_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """ Stores an encrypted secret for the user. """ secret.user_id = current_user['id'] encryption_key = current_user['encryption_key'].encode() encrypted_data = fernet_encrypt(secret.data.encode(), encryption_key) secret.data = encrypted_data.decode('utf-8') queries.CREATE_SECRET(secret.model_dump()) return secret @app.put("/secret") async def update_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """ Updates an encrypted secret for the user. """ if secret.id is None: raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="Id must be passed for updating secret") secret.user_id = current_user['id'] encryption_key = current_user['encryption_key'].encode() encrypted_data = fernet_encrypt(secret.data.encode(), encryption_key) secret.data = encrypted_data.decode('utf-8') token = queries.UPDATE_SECRET(secret.model_dump()) if not token: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Token not found for this user") return token @app.delete('/secret') async def delete_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """Deletes the secret with the given id""" secret.user_id = current_user['id'] # print(secret.model_dump()) user_id, secret_id = queries.DELETE_SECRET(secret.model_dump()) @app.get('/secret') async def list_secret(current_user: dict = Depends(get_current_user)): """Returns the encrypted secrets of the user.""" user_secrets = queries.GET_SECRETS({'user_id': current_user['id']}) encryption_key = current_user['encryption_key'].encode() if not user_secrets: return {'message': 'No tokens stored yet'} for secret in user_secrets: cur_data = secret['data'] decrypted_data = fernet_decrypt(cur_data, encryption_key) secret['data'] = decrypted_data return user_secrets @app.get('/validate-token') async def validate_token(current_user: dict = Depends(get_current_user)): user_id = current_user['id'] if user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return {'message': 'authenticated'}