import json from fastapi import Depends, FastAPI, HTTPException, status from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordBearer from auth import Hasher, create_access_token, get_current_user from crypto import ( deserialize_into_bytes, fernet_decrypt, fernet_encrypt, generate_random_encryption_key, generate_user_passkey, serialize_bytes, ) from models import Secret, User, UserLogin app = FastAPI() origins = [ 'http://localhost', 'http://localhost:5173', "*" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=['*'], allow_headers=['*'] ) @app.get("/") async def root(): return {"'message'": "Hello World"} @app.post('/register') async def register(user: User): """Registers a user""" users = [] with open('database/users.json', 'r') as f: text = f.read() if text: users = json.loads(text) if user.id is not None: raise HTTPException( status_code=400, detail="User id shall be auto generated, cannot be provided in request" ) if not users: user.id = 0 else: max_user_id = max([i['id'] for i in users]) user.id = max_user_id + 1 user_exists = [i for i in users if i['username'] == user.username] if user_exists: raise HTTPException(status_code=400, detail="Username already in use") encryption_key = generate_random_encryption_key() salt, master_key = generate_user_passkey(user.password) encrypted_encryption_key = fernet_encrypt(encryption_key, master_key) user.password = Hasher.get_password_hash(user.password) user.encryption_key = encrypted_encryption_key.decode('utf-8') user.salt = serialize_bytes(salt) users.append(jsonable_encoder(user)) # print(f"{salt=}\n{user.salt=}\n{encrypted_encryption_key=}\n{user.encryption_key=}\n{master_key=}") with open('database/users.json', 'w') as f: json.dump(users, f) return {'user_id': user.id} @app.post('/login') async def login(user: UserLogin): """logs in the user""" users = [] with open('database/users.json', 'r') as f: text = f.read() if text: users.extend(json.loads(text)) cur_user = [i for i in users if i['username']==user.username] if not cur_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="username or password is incorrect" ) else: cur_user = cur_user[0] password_match = Hasher.verify_password(user.password, cur_user['password']) if not password_match: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="username or password is incorrect" ) encrypted_encryption_key = cur_user['encryption_key'].encode() salt = deserialize_into_bytes(cur_user['salt']) _, master_key = generate_user_passkey(user.password, salt) encryption_key = fernet_decrypt(encrypted_encryption_key, master_key) access_token = create_access_token(subject=cur_user['id'], encryption_key=encryption_key) response = { 'message': 'authenticated', 'accessToken': access_token } return response @app.post("/secret") async def create_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """ Stores an encrypted secret for the user. """ data = [] with open('database/secrets.json', 'r') as f: text = f.read() if text: data.extend(json.loads(text)) secret_id = max(i['id'] for i in data) + 1 secret.id = secret_id secret.user_id = current_user['id'] encryption_key = current_user['encryption_key'].encode() encrypted_data = fernet_encrypt(secret.data.encode(), encryption_key) secret.data = encrypted_data.decode('utf-8') data.append(jsonable_encoder(secret)) with open('database/secrets.json', 'w') as f: json.dump(data, f) return secret @app.put("/secret") async def update_secret(secret: Secret, current_user: dict = Depends(get_current_user)): """ Updates an encrypted secret for the user. """ data = [] with open('database/secrets.json', 'r') as f: text = f.read() if text: data.extend(json.loads(text)) if secret.id is None: raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="Id must be passed for updating secret") secret.user_id = current_user['id'] found_secrets = [(i, j) for i, j in enumerate(data) if j['user_id'] == secret.user_id and j['id']==secret.id] if not found_secrets: raise HTTPException(status.HTTP_400_BAD_REQUEST, deatil="Secret with this Id not found for this user") secret_pos = found_secrets[0][0] encryption_key = current_user['encryption_key'].encode() encrypted_data = fernet_encrypt(secret.data.encode(), encryption_key) secret.data = encrypted_data.decode('utf-8') data[secret_pos] = jsonable_encoder(secret) with open('database/secrets.json', 'w') as f: json.dump(data, f) return secret @app.get('/secret') async def list_secret(current_user: dict = Depends(get_current_user)): """Returns the encrypted secrets of the user.""" data = [] with open('database/secrets.json', 'r') as f: text = f.read() if text: data.extend(json.loads(text)) user_id = current_user['id'] encryption_key = current_user['encryption_key'].encode() user_secrets = [i for i in data if i['user_id']==user_id and i['active']] for secret in user_secrets: cur_data = secret['data'] decrypted_data = fernet_decrypt(cur_data, encryption_key) secret['data'] = decrypted_data return user_secrets @app.get('/validate-token') async def validate_token(current_user: dict = Depends(get_current_user)): user_id = current_user['id'] print("user_id: ", user_id) if user_id is not None: return {'message': 'authenticated'} raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)