import uuid import psycopg2 import psycopg2.extras from datetime import timedelta from psycopg2 import IntegrityError from collections import Counter from decimal import InvalidOperation from utils import * from type import * from consts import * def update_token_expiry_date(token: str, expiry_date: date) -> None: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() expiry_date_str = expiry_date.strftime("%Y.%m.%d") try: cursor.execute( "UPDATE users SET token_expiry_date = %s WHERE token = %s", (expiry_date_str, token) ) connection.commit() except Exception as e: connection.rollback() print("Ошибка при обновлении токена:", e) raise finally: cursor.close() connection.close() def validate_token(token: str) -> bool: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: return check_token(cursor, token) except Exception as e: connection.rollback() print("Ошибка при проверке токена:", e) raise finally: cursor.close() connection.close() def login(email: str, password: str) -> str: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: cursor.execute("SELECT password,token FROM users WHERE email = %s", (email,)) result = cursor.fetchone() if result is None: raise AuthError("Неверный email или пароль") print(result) stored_hash, token = result if not bcrypt.checkpw(password.encode('utf-8'), stored_hash.encode('utf-8')): raise AuthError("Неверный email или пароль") if token is None: # Опционально: можно сгенерировать новый токен здесь raise AuthError("У пользователя отсутствует токен") token_live_time = timedelta(days=TOKEN_LIVE_TIME) update_token_expiry_date(token, date.today() + token_live_time) return token finally: cursor.close() def logout(token) -> None: try: # Обновляем token_expiry_date на текущую дату (как в регистрации) today = date.today() update_token_expiry_date(token, today) print(f"Пользователь {token} вышел из системы") except Exception as e: print("Ошибка при выходе:", e) raise def registration(nickname: str, password: str, email: str) -> str: # Хэшируем пароль hashed = hash_password(password) token = str(uuid.uuid4()) today = date.today().strftime("%Y.%m.%d") token_live_time = timedelta(days=TOKEN_LIVE_TIME) token_expiry_date = date.today() + token_live_time token_expiry_date_str = token_expiry_date.strftime("%Y.%m.%d") money = "0" histories_id = "{}" connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: cursor.execute( "INSERT INTO users (nickname, email, password, token, token_expiry_date, money, histories_id) " "VALUES (%s, %s, %s, %s, %s, %s, %s)", (nickname, email, hashed, token, token_expiry_date_str, money, histories_id) ) connection.commit() print("Пользователь успешно создан") return token except IntegrityError as e: connection.rollback() if "email_uniq" in str(e) or "users_email_key" in str(e): print("Ошибка: пользователь с таким email уже существует") raise AuthError("Пользователь с таким email уже существует") from e else: print("Другая ошибка базы данных:", e) raise finally: cursor.close() def unregister(token: str) -> None: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: user_id: int = get_user_id(cursor, token) # Получаем массив histories_id пользователя cursor.execute("SELECT histories_id FROM users WHERE id = %s", (user_id,)) row = cursor.fetchone() if row is None: raise ValueError("Пользователь не найден") histories_id = row[0] # Может быть None или список # Удаляем все связанные истории, если они есть if histories_id and len(histories_id) > 0: # Используем ANY для удаления по массиву ID cursor.execute(""" DELETE FROM history WHERE id = ANY(%s::BIGINT[]) """, (histories_id,)) # Удаляем пользователя (корзина удалится автоматически через CASCADE) cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) if cursor.rowcount == 0: raise ValueError("Не удалось удалить пользователя") connection.commit() print("Пользователь успешно удалён") except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) connection.rollback() raise except ValueError as e: connection.rollback() raise finally: cursor.close() connection.close() def get_products() -> list[dict]: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() res = None try: res = fetch_table_as_json(cursor, "shop") except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) finally: cursor.close() connection.close() return res def get_product(product_id: int): connection = None cursor = None res = None try: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() res = get_product_by_id(cursor, product_id) except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) finally: cursor.close() connection.close() return res def get_basket(token: str): connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,)) res = cursor.fetchone() if res is None or res[0] is None: return json.dumps([]) products_id = res[0] # Это список, например: [1, 1, 3] if len(products_id) == 0: return json.dumps([]) # Распаковываем массив и соединяем каждый элемент с таблицей shop cursor.execute(""" SELECT json_agg(row_to_json(s)) AS result FROM unnest(%s) AS pid JOIN shop s ON s.id = pid """, (products_id,)) result = cursor.fetchone()[0] if result is None: return json.dumps([]) return result except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) raise finally: cursor.close() connection.close() def get_histories(token: str): connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) cursor.execute("SELECT histories_id FROM users WHERE id = %s", (user_id,)) res = cursor.fetchone() if res is None or res[0] is None or len(res[0]) == 0: return json.dumps([]) histories_id = res[0] cursor.execute(""" SELECT json_agg(row_to_json(h)) AS result FROM unnest(%s::INTEGER[]) AS hid JOIN history h ON h.id = hid """, (histories_id,)) result = cursor.fetchone()[0] if result is None: return json.dumps([]) return result except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) raise finally: cursor.close() connection.close() def add_product_to_basket(token: str, product_id: int) -> None: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) # Атомарно уменьшаем count и увеличиваем reserved, только если count > 0 cursor.execute(""" UPDATE shop SET count = count - 1, reserved = reserved + 1 WHERE id = %s AND count > 0 """, (product_id,)) if cursor.rowcount == 0: # Проверим, существует ли товар вообще cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,)) if cursor.fetchone() is None: raise ValueError("Товар не найден") else: raise ValueError("Недостаточно товара в наличии (count <= 0)") # Добавляем товар в корзину пользователя cursor.execute( "UPDATE basket SET products_id = array_append(products_id, %s) WHERE user_id = %s", (product_id, user_id) ) if cursor.rowcount == 0: # Откатываем транзакцию, так как корзина не обновлена connection.rollback() raise ValueError("Пользователь не найден") connection.commit() except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) connection.rollback() raise except ValueError: connection.rollback() raise finally: cursor.close() connection.close() def delete_product_from_basket(token: str, product_id: int) -> None: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) # Получаем текущую корзину cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,)) row = cursor.fetchone() if row is None: raise ValueError("Пользователь не найден") products = list(row[0]) if product_id not in products: raise ValueError("Товар отсутствует в корзине") # Удаляем первое вхождение products.remove(product_id) # Возвращаем товар в наличие cursor.execute(""" UPDATE shop SET count = count + 1, reserved = reserved - 1 WHERE id = %s AND reserved > 0 """, (product_id,)) if cursor.rowcount == 0: # Откатываем изменения в корзине connection.rollback() cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,)) if cursor.fetchone() is None: raise ValueError("Товар не найден") else: raise ValueError("Невозможно вернуть товар: reserved <= 0") # Сохраняем обновлённую корзину cursor.execute( "UPDATE basket SET products_id = %s WHERE user_id = %s", (products, user_id) ) connection.commit() except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) connection.rollback() raise except ValueError: connection.rollback() raise finally: cursor.close() connection.close() def clear_basket(token: str) -> None: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) # Получаем текущую корзину cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,)) row = cursor.fetchone() if row is None: raise ValueError("Пользователь не найден") products_id = row[0] # Это list[int] или None if not products_id: # Пустой или None # Нечего очищать — просто убедимся, что корзина пуста cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,)) connection.commit() return # Считаем количество каждого product_id product_counts = Counter(products_id) # {product_id: quantity} # Для каждого уникального товара возвращаем его на склад for product_id, quantity in product_counts.items(): cursor.execute(""" UPDATE shop SET count = count + %s, reserved = reserved - %s WHERE id = %s AND reserved >= %s """, (quantity, quantity, product_id, quantity)) if cursor.rowcount == 0: # Проверяем, существует ли товар cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,)) if cursor.fetchone() is None: raise ValueError(f"Товар с id={product_id} не найден") else: raise ValueError( f"Недостаточно зарезервированных товаров для id={product_id}: " f"попытка вернуть {quantity}, но reserved < {quantity}" ) # Полностью очищаем корзину cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,)) connection.commit() except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) connection.rollback() raise except ValueError: connection.rollback() raise finally: cursor.close() connection.close() def buy_products(token: str) -> int: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) # 1. Получаем корзину cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,)) basket_row = cursor.fetchone() if basket_row is None: raise ValueError("Пользователь не найден") products_id = basket_row[0] if not products_id: raise ValueError("Корзина пуста") # 2. Получаем стоимость товаров и проверяем их наличие product_counts = Counter(products_id) total_cost = Decimal('0.00') for product_id in product_counts: cursor.execute("SELECT cost::NUMERIC FROM shop WHERE id = %s", (product_id,)) shop_row = cursor.fetchone() if shop_row is None: raise ValueError(f"Товар с id={product_id} не найден") cost = shop_row[0] # Decimal total_cost += cost * product_counts[product_id] # 3. Проверяем баланс пользователя cursor.execute("SELECT money::NUMERIC FROM users WHERE id = %s", (user_id,)) user_row = cursor.fetchone() if user_row is None: raise ValueError("Пользователь не найден") user_money = user_row[0] # Decimal if user_money < total_cost: raise ValueError("Недостаточно средств на счету") # 4. Списываем деньги new_balance = user_money - total_cost cursor.execute( "UPDATE users SET money = %s WHERE id = %s", (new_balance, user_id) ) # 5. Вставляем запись в history и получаем её id cursor.execute(""" INSERT INTO history (user_id, products_id, products_cost, date) VALUES (%s, %s, %s, CURRENT_DATE) RETURNING id """, (user_id, products_id, total_cost)) history_id = cursor.fetchone()[0] # Получаем сгенерированный id # 6. Добавляем history_id в users.histories_id cursor.execute(""" UPDATE users SET histories_id = array_append(COALESCE(histories_id, '{}'), %s) WHERE id = %s """, (history_id, user_id)) if cursor.rowcount == 0: raise ValueError("Не удалось обновить историю пользователя") # 7. Уменьшаем reserved в shop для каждого купленного товара for product_id, quantity in product_counts.items(): cursor.execute(""" UPDATE shop SET reserved = reserved - %s WHERE id = %s AND reserved >= %s """, (quantity, product_id, quantity)) if cursor.rowcount == 0: cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,)) if cursor.fetchone() is None: raise ValueError(f"Товар с id={product_id} не найден") else: raise ValueError( f"Недостаточно зарезервированных товаров для id={product_id}: " f"попытка списать {quantity}, но reserved < {quantity}" ) # 8. Очищаем корзину (products_id = пустой массив) cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,)) connection.commit() return history_id except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) connection.rollback() raise except ValueError: connection.rollback() raise finally: cursor.close() connection.close() def add_money(token: str, money: str) -> None: # Валидация и преобразование входной суммы try: # Удаляем возможные пробелы и заменяем запятую на точку (для удобства) money_clean = money.strip().replace(',', '.') amount = Decimal(money_clean) if amount < 0: raise ValueError("Сумма должна быть неотрицательной") except (InvalidOperation, ValueError) as e: raise ValueError("Некорректный формат суммы") from e connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) # Обновляем баланс: money = money + amount cursor.execute(""" UPDATE users SET money = money + %s WHERE id = %s """, (amount, user_id)) if cursor.rowcount == 0: raise ValueError("Пользователь не найден") connection.commit() except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) connection.rollback() raise except ValueError: connection.rollback() raise finally: cursor.close() connection.close() def get_user(token: str) -> dict: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: # Проверяем, авторизован ли пользователь (опционально, но рекомендуется) if not check_token(cursor, token): raise AuthError("Пользователь не авторизован или токен истёк") # Получаем данные пользователя cursor.execute(""" SELECT id, nickname, email, token, token_expiry_date, money, histories_id FROM users WHERE token = %s """, (token,)) row = cursor.fetchone() if row is None: raise ValueError("Пользователь не найден") # Формируем словарь с именами столбцов columns = [desc[0] for desc in cursor.description] user_data = dict(zip(columns, row)) return user_data except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) raise except ValueError as e: raise finally: cursor.close() connection.close() def get_products_by_id(products_id: list[int]) -> list[dict]: if not products_id: return [] connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: # Считаем количество каждого product_id product_counts = Counter(products_id) # Получаем продукты из shop по id cursor.execute(""" SELECT id, name, cost, count, reserved, picture_url, description, type FROM shop WHERE id = ANY(%s) """, (products_id,)) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] result = [] for row in rows: product_dict = dict(zip(columns, row)) # Добавляем user_count — сколько раз этот товар встречается в корзине product_dict["user_count"] = product_counts.get(product_dict["id"], 0) result.append(product_dict) return result except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) raise finally: cursor.close() connection.close() def get_products_id(token: str, table_name: str) -> list[int]: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) query = sql.SQL("SELECT products_id FROM {} WHERE user_id = %s").format( sql.Identifier(table_name) ) cursor.execute(query, (user_id,)) res = cursor.fetchone() if res is None or res[0] is None: return [] products_id = res[0] # Это list[int], например: [1, 1, 3] return products_id except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) raise finally: cursor.close() connection.close() def get_histories_with_products(token: str) -> list[dict]: connection = psycopg2.connect(**DB_CONFIG) cursor = connection.cursor() try: if not check_token(cursor, token): raise AuthError("Пользователь не авторизован") user_id: int = get_user_id(cursor, token) # Получаем histories_id пользователя cursor.execute("SELECT histories_id FROM users WHERE id = %s", (user_id,)) res = cursor.fetchone() if res is None or res[0] is None or len(res[0]) == 0: return [] histories_id = res[0] # Получаем истории cursor.execute(""" SELECT id, user_id, products_id, date, products_cost FROM history WHERE id = ANY(%s::BIGINT[]) """, (histories_id,)) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] result = [] for row in rows: history_dict = dict(zip(columns, row)) # Добавляем поле "products" — список товаров с user_count products_id = history_dict["products_id"] history_dict["products"] = get_products_by_id(products_id) # type: ignore result.append(history_dict) return result except psycopg2.Error as e: print("Ошибка при работе с PostgreSQL:", e) raise finally: cursor.close() connection.close()