import json from sqlite3 import IntegrityError from typing import Literal from fastapi import Depends, FastAPI, HTTPException, status from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware import queries from auth import Hasher, create_access_token, get_current_user from crypto import (deserialize_into_bytes, fernet_decrypt, fernet_encrypt, generate_random_encryption_key, generate_user_passkey, serialize_bytes) from models import Key, Secret, User, UserLogin app = FastAPI() origins = [ 'http://localhost', 'http://localhost:5173', "*" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=['*'], allow_headers=['*'] ) @app.get("/") async def root(): return {"'message'": "Hello World"} @app.post('/register') async def register(user: User): """Registers a user""" plain_text_password = user.password user.password = Hasher.get_password_hash(plain_text_password) try: user = queries.CREATE_USER(user.model_dump()) user_id = user['id'] except IntegrityError as e: raise HTTPException(status_code=400, detail="Username already in use") encryption_key = generate_random_encryption_key() salt, master_key = generate_user_passkey(plain_text_password) key = Key( user_id=user_id, encryption_key=fernet_encrypt(encryption_key, master_key).decode('utf-8'), encryption_key_salt=serialize_bytes(salt) ) try: queries.CREATE_KEY(key.model_dump()) except Exception as e: print('failed to create key', e) return {'user_id': user_id} @app.post('/login') async def login(user: UserLogin): """logs in the user""" login_exception = HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="username or password is incorrect" ) cur_user = queries.GET_USER_WITH_KEY(user.model_dump()) if cur_user is None: raise login_exception password_match = Hasher.verify_password(user.password, cur_user['password']) if not password_match: raise login_exception encrypted_encryption_key = cur_user['encryption_key'].encode() salt = deserialize_into_bytes(cur_user['encryption_key_salt']) _, master_key = generate_user_passkey(user.password, salt) encryption_key = fernet_decrypt(encrypted_encryption_key, master_key) access_token = create_access_token(subject=cur_user['id'], encryption_key=encryption_key) response = { 'message': 'authenticated', 'accessToken': access_token } return response @app.post("/secret") async def create_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """ Stores an encrypted secret for the user. """ secret.user_id = current_user['id'] encryption_key = current_user['encryption_key'].encode() encrypted_data = fernet_encrypt(secret.data.encode(), encryption_key) secret.data = encrypted_data.decode('utf-8') queries.CREATE_SECRET(secret.model_dump()) return secret @app.put("/secret") async def update_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """ Updates an encrypted secret for the user. """ data = [] with open('database/secrets.json', 'r') as f: text = f.read() if text: data.extend(json.loads(text)) if secret.id is None: raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="Id must be passed for updating secret") secret.user_id = current_user['id'] found_secrets = [(i, j) for i, j in enumerate(data) if j['user_id'] == secret.user_id and j['id']==secret.id] if not found_secrets: raise HTTPException(status.HTTP_400_BAD_REQUEST, deatil="Secret with this Id not found for this user") secret_pos = found_secrets[0][0] encryption_key = current_user['encryption_key'].encode() encrypted_data = fernet_encrypt(secret.data.encode(), encryption_key) secret.data = encrypted_data.decode('utf-8') data[secret_pos] = jsonable_encoder(secret) with open('database/secrets.json', 'w') as f: json.dump(data, f) return secret @app.get('/secret') async def list_secret(current_user: dict = Depends(get_current_user)): """Returns the encrypted secrets of the user.""" user_secrets = queries.GET_SECRETS({'user_id': current_user['id']}) encryption_key = current_user['encryption_key'].encode() for secret in user_secrets: cur_data = secret['data'] decrypted_data = fernet_decrypt(cur_data, encryption_key) secret['data'] = decrypted_data return user_secrets @app.get('/validate-token') async def validate_token(current_user: dict = Depends(get_current_user)): user_id = current_user['id'] if user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return {'message': 'authenticated'}