from decimal import Decimal import bcrypt from datetime import date from psycopg2 import sql import json def fetch_table_as_json(cursor, table_name) -> list[dict]: cursor.execute(sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name))) columns = [desc[0] for desc in cursor.description] # Имена столбцов rows = cursor.fetchall() # Преобразуем каждую строку в словарь result = [dict(zip(columns, row)) for row in rows] return result def fetch_row_as_json(cursor, table_name, row_id) -> dict | None: query = sql.SQL("SELECT * FROM {} WHERE id = %s").format( sql.Identifier(table_name) ) cursor.execute(query, (row_id,)) columns = [desc[0] for desc in cursor.description] # Имена столбцов row = cursor.fetchone() # Получаем одну строку if row is None: return None # или raise Exception, если нужно return dict(zip(columns, row)) def hash_password(plain_password: str): hashed_bytes = bcrypt.hashpw(plain_password.encode('utf-8'), bcrypt.gensalt()) hashed_str = hashed_bytes.decode('utf-8') return hashed_str def check_token(cursor, token: str) -> bool: cursor.execute("SELECT token_expiry_date FROM users WHERE token = %s", (token,)) result = cursor.fetchone() today_date = date.today() token_expiry_date = result[0] return token_expiry_date > today_date def get_user_id(cursor, token: str) -> int: cursor.execute("SELECT id FROM users WHERE token = %s", (token,)) result = cursor.fetchone() return result[0] def get_product_by_id(cursor, product_id): fetch_row_as_json(cursor, "shop", product_id) def decimal_to_float(data: list[dict]): clean_data = [ {k: float(v) if isinstance(v, Decimal) else v for k, v in item.items()} for item in data ] return clean_data