FastAuth/backend/queries.py

168 lines
3.5 KiB
Python

import sqlite3
from dataclasses import dataclass
from typing import Literal
def connect_db():
conn = sqlite3.connect('database/database.sqlite')
conn.execute("PRAGMA foreign_keys = 1")
return conn
class DbQuery:
def __init__(
self,
query: str,
type: Literal['read', 'write'],
returns: bool,
rows: Literal['single', 'multi']
):
self.query = query
self.type = type
self.returns = returns
self.rows = rows
def __str__(self) -> str:
return self.query
def __repr__(self) -> str:
return f"<query: '{self.query}, {self.type}, returns {self.returns}, {self.rows} row(s)"
def _read(self, params: dict) -> list[dict]|dict:
with connect_db() as conn:
cur = conn.execute(self.query, params)
data = cur.fetchall()
if not data:
return None
keys = [i[0] for i in cur.description]
data_dict = [dict(zip(keys, i)) for i in data]
if self.rows == 'single':
if len(data_dict) > 1:
raise ValueError('Query returned more than one rows.')
data_dict = data_dict[0]
return data_dict
def _write(self, params: dict) -> dict|bool:
if not isinstance(params, dict):
raise TypeError("Only dict type is supported for params")
with connect_db() as conn:
cur = conn.execute(self.query, params)
if self.returns:
data = cur.fetchone()
keys = keys = [i[0] for i in cur.description]
data_dict = dict(zip(keys, data))
conn.commit()
if self.returns:
return data_dict
return True
def __call__(self, params):
if self.type == 'read':
func = self._read
else:
func = self._write
return func(params)
CREATE_USER = DbQuery(
query="""
INSERT INTO
users (name, email, username, password)
VALUES (:name, :email, :username, :password)
RETURNING id
""",
type='write',
returns=True,
rows='single'
)
CREATE_KEY = DbQuery(
query="""
INSERT INTO
keys (user_id, encryption_key, encryption_key_salt)
VALUES (:user_id, :encryption_key, :encryption_key_salt)
""",
type='write',
returns=False,
rows=None
)
GET_USER_WITH_KEY = DbQuery(
"""
SELECT *
FROM users u
JOIN keys k on u.id = k.user_id
WHERE u.username = :username
""",
type='read',
returns=True,
rows='single'
)
GET_USER_BY_ID = DbQuery(
"""
SELECT *
FROM users
where id = :user_id
""",
type='read',
returns=True,
rows='single'
)
CREATE_SECRET = DbQuery(
"""
INSERT INTO
secrets(user_id, data)
VALUES(:user_id, :data)
""",
type='write',
returns=False,
rows=None
)
UPDATE_SECRET = DbQuery(
"""
UPDATE secrets
SET
data = :data,
modified_on = CURRENT_TIMESTAMP
WHERE
id = :id
RETURNING *
""",
type='write',
returns=True,
rows='single'
)
DELETE_SECRET = DbQuery(
"""
DELETE from secrets
WHERE
user_id = :user_id
and id = :id
RETURNING user_id, id
""",
type='write',
returns=True,
rows='single'
)
GET_SECRETS = DbQuery(
"""
SELECT *
FROM secrets
WHERE user_id = :user_id
""",
type='read',
returns=True,
rows=None
)