import datetime import json import os from typing import Any, Union import bcrypt import jwt from fastapi import Cookie, Depends, HTTPException, status from fastapi.security import HTTPBearer, OAuth2PasswordRequestForm from jwt.exceptions import InvalidTokenError ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 30 minutes ALGORITHM = "HS256" JWT_SECRET_KEY = 'abcdefghijklmnopqrstuvwxyz' security = HTTPBearer() def create_access_token(subject: str|int, encryption_key: str, expires_delta: int = None) -> str: """Creates a jwt token for the logged in user""" if expires_delta is not None: expires_delta = datetime.datetime.now(datetime.timezone.utc) + expires_delta else: expires_delta = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode = {"exp": expires_delta, "sub": subject, "key": encryption_key} encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, ALGORITHM) return encoded_jwt def get_current_user(token: str = Depends(security)) -> dict: """Parses a jwt token and if it's valid, returns the user ID from it""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={'WWW-Authenticate': 'Bearer'} ) credential = token.credentials try: payload = jwt.decode(credential, JWT_SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except InvalidTokenError: raise credentials_exception with open('database/users.json', 'r') as f: text = f.read() if text: data = json.loads(text) else: raise credentials_exception user = [i for i in data if i['id']==user_id] if not user: raise credentials_exception cur_user = {'id': user_id} cur_user['username'] = user[0]['username'] cur_user['encryption_key'] = payload['key'] return cur_user def get_current_user2(token: str = Cookie(default=None)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={'WWW-Authenticate': 'Bearer'} ) print('TOKEN: ', token) try: payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except InvalidTokenError: raise credentials_exception with open('database/users.json', 'r') as f: text = f.read() if text: data = json.loads(text) else: raise credentials_exception user = [i for i in data if i['id']==user_id] if not user: raise credentials_exception cur_user = {'id': user_id} cur_user['username'] = user[0]['username'] cur_user['encryption_key'] = payload['key'] return cur_user class Hasher: """Class for hashing and verifying passwords""" @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: encoded_password = plain_password.encode('utf-8') encoded_hash = hashed_password.encode('utf-8') return bcrypt.checkpw(encoded_password, encoded_hash) @staticmethod def get_password_hash(password: str) -> str: salt = bcrypt.gensalt() encoded_password = password.encode('utf-8') hash = bcrypt.hashpw(encoded_password, salt) return hash.decode('utf-8')