From 4ef416f3eecf74292367396a85886ff04c17abc2 Mon Sep 17 00:00:00 2001 From: gourav Date: Fri, 21 Jun 2024 16:20:08 +0530 Subject: [PATCH] Queries in OOP structure --- queries.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 130 insertions(+), 15 deletions(-) diff --git a/queries.py b/queries.py index d391cce..eea72de 100644 --- a/queries.py +++ b/queries.py @@ -1,4 +1,6 @@ import sqlite3 +from dataclasses import dataclass +from typing import Literal def connect_db(): @@ -7,19 +9,132 @@ def connect_db(): return conn -CREATE_USER_QUERY = """ - INSERT INTO - users ( - name, email, username, password - ) - VALUES ( - :name, :email, :username, :password - ) - returning id -""" +class DbQuery: + def __init__( + self, + query: str, + type: Literal['read', 'write'], + returns: bool, + rows: Literal['single', 'multi'] + ): + self.query = query + self.type = type + self.returns = returns + self.rows = rows -CREATE_KEY_QUERY = """ - INSERT INTO - keys (user_id, encryption_key, encryption_key_salt) - VALUES (:user_id, :encryption_key, :encryption_key_salt) -""" \ No newline at end of file + def __str__(self) -> str: + return self.query + + def __repr__(self) -> str: + return f" list[dict]|dict: + with connect_db() as conn: + cur = conn.execute(self.query, params) + data = cur.fetchall() + if not data: + return None + keys = [i[0] for i in cur.description] + + data_dict = [dict(zip(keys, i)) for i in data] + + if self.rows == 'single': + if len(data_dict) > 1: + raise ValueError('Query returned more than one rows.') + + data_dict = data_dict[0] + + return data_dict + + def _write(self, params: dict) -> dict|bool: + if not isinstance(params, dict): + raise TypeError("Only dict type is supported for params") + + with connect_db() as conn: + cur = conn.execute(self.query, params) + if self.returns: + data = cur.fetchone() + keys = keys = [i[0] for i in cur.description] + data_dict = dict(zip(keys, data)) + conn.commit() + + if self.returns: + return data_dict + return True + + def __call__(self, params): + if self.type == 'read': + func = self._read + else: + func = self._write + + return func(params) + + +CREATE_USER = DbQuery( + query=""" + INSERT INTO + users (name, email, username, password) + VALUES (:name, :email, :username, :password) + RETURNING id + """, + type='write', + returns=True, + rows='single' +) + +CREATE_KEY = DbQuery( + query=""" + INSERT INTO + keys (user_id, encryption_key, encryption_key_salt) + VALUES (:user_id, :encryption_key, :encryption_key_salt) + """, + type='write', + returns=False, + rows=None +) + +GET_USER_WITH_KEY = DbQuery( + """ + SELECT * + FROM users u + JOIN keys k on u.id = k.user_id + WHERE u.username = :username + """, + type='read', + returns=True, + rows='single' +) + +GET_USER_BY_ID = DbQuery( + """ + SELECT * + FROM users + where id = :user_id + """, + type='read', + returns=True, + rows='single' +) + +CREATE_SECRET = DbQuery( + """ + INSERT INTO + secrets(user_id, data) + VALUES(:user_id, :data) + """, + type='write', + returns=False, + rows=None +) + +GET_SECRETS = DbQuery( + """ + SELECT * + FROM secrets + WHERE user_id = :user_id + """, + type='read', + returns=True, + rows=None +) \ No newline at end of file