You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

595 lines
21 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import uuid
import psycopg2
import psycopg2.extras
import json
from datetime import timedelta
from psycopg2 import IntegrityError
from collections import Counter
from decimal import Decimal, InvalidOperation
from utils import *
from type import *
from const import *
def update_token_expiry_date(token: str, expiry_date: date) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
expiry_date_str = expiry_date.strftime("%Y.%m.%d")
try:
cursor.execute(
"UPDATE users SET token_expiry_date = %s WHERE token = %s",
(expiry_date_str, token)
)
connection.commit()
except Exception as e:
connection.rollback()
print("Ошибка при обновлении токена:", e)
raise
finally:
cursor.close()
connection.close()
def validate_token(token: str) -> bool:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
return check_token(cursor, token)
except Exception as e:
connection.rollback()
print("Ошибка при проверке токена:", e)
raise
finally:
cursor.close()
connection.close()
def login(email: str, password: str) :
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
cursor.execute("SELECT password,token FROM users WHERE email = %s", (email,))
result = cursor.fetchone()
if result is None:
raise AuthError("Неверный email или пароль")
print(result)
stored_hash, token = result
if not bcrypt.checkpw(password.encode('utf-8'), stored_hash.encode('utf-8')):
raise AuthError("Неверный email или пароль")
if token is None:
# Опционально: можно сгенерировать новый токен здесь
raise AuthError("У пользователя отсутствует токен")
token_live_time = timedelta(days=TOKEN_LIVE_TIME)
update_token_expiry_date(token, date.today() + token_live_time)
return token
finally:
cursor.close()
def logout(token):
try:
# Обновляем token_expiry_date на текущую дату (как в регистрации)
today = date.today()
update_token_expiry_date(token, today)
print(f"Пользователь {token} вышел из системы")
except Exception as e:
print("Ошибка при выходе:", e)
raise
def registration(nickname: str, password: str, email: str) -> str:
# Хэшируем пароль
hashed = hash_password(password)
token = str(uuid.uuid4())
today = date.today().strftime("%Y.%m.%d")
token_live_time = timedelta(days=TOKEN_LIVE_TIME)
token_expiry_date = date.today() + token_live_time
token_expiry_date_str = token_expiry_date.strftime("%Y.%m.%d")
money = "0"
histories_id = "{}"
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
cursor.execute(
"INSERT INTO users (nickname, email, password, token, token_expiry_date, money, histories_id) "
"VALUES (%s, %s, %s, %s, %s, %s, %s)",
(nickname, email, hashed, token, token_expiry_date_str, money, histories_id)
)
connection.commit()
print("Пользователь успешно создан")
return token
except IntegrityError as e:
connection.rollback()
if "email_uniq" in str(e) or "users_email_key" in str(e):
print("Ошибка: пользователь с таким email уже существует")
raise AuthError("Пользователь с таким email уже существует") from e
else:
print("Другая ошибка базы данных:", e)
raise
finally:
cursor.close()
def unregister(token: str) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Получаем массив histories_id пользователя
cursor.execute("SELECT histories_id FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
if row is None:
raise ValueError("Пользователь не найден")
histories_id = row[0] # Может быть None или список
# Удаляем все связанные истории, если они есть
if histories_id and len(histories_id) > 0:
# Используем ANY для удаления по массиву ID
cursor.execute("""
DELETE FROM history
WHERE id = ANY(%s::BIGINT[])
""", (histories_id,))
# Удаляем пользователя (корзина удалится автоматически через CASCADE)
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
if cursor.rowcount == 0:
raise ValueError("Не удалось удалить пользователя")
connection.commit()
print("Пользователь успешно удалён")
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError as e:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def get_products() -> list[dict]:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
res = None
try:
res = fetch_table_as_json(cursor, "shop")
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
finally:
cursor.close()
connection.close()
return res
def get_product(product_id: int):
connection = None
cursor = None
res = None
try:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
res = get_product_by_id(cursor, product_id)
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
finally:
cursor.close()
connection.close()
return res
def get_basket(token: str):
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
res = cursor.fetchone()
if res is None or res[0] is None:
return json.dumps([])
products_id = res[0] # Это список, например: [1, 1, 3]
if len(products_id) == 0:
return json.dumps([])
# Распаковываем массив и соединяем каждый элемент с таблицей shop
cursor.execute("""
SELECT json_agg(row_to_json(s)) AS result
FROM unnest(%s) AS pid
JOIN shop s ON s.id = pid
""", (products_id,))
result = cursor.fetchone()[0]
if result is None:
return json.dumps([])
return result
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
raise
finally:
cursor.close()
connection.close()
def get_histories(token: str):
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
cursor.execute("SELECT histories_id FROM users WHERE id = %s", (user_id,))
res = cursor.fetchone()
if res is None or res[0] is None or len(res[0]) == 0:
return json.dumps([])
histories_id = res[0]
cursor.execute("""
SELECT json_agg(row_to_json(h)) AS result
FROM unnest(%s::INTEGER[]) AS hid
JOIN history h ON h.id = hid
""", (histories_id,))
result = cursor.fetchone()[0]
if result is None:
return json.dumps([])
return result
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
raise
finally:
cursor.close()
connection.close()
def add_product_to_basket(token: str, product_id: int) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Атомарно уменьшаем count и увеличиваем reserved, только если count > 0
cursor.execute("""
UPDATE shop
SET count = count - 1, reserved = reserved + 1
WHERE id = %s AND count > 0
""", (product_id,))
if cursor.rowcount == 0:
# Проверим, существует ли товар вообще
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError("Товар не найден")
else:
raise ValueError("Недостаточно товара в наличии (count <= 0)")
# Добавляем товар в корзину пользователя
cursor.execute(
"UPDATE basket SET products_id = array_append(products_id, %s) WHERE user_id = %s",
(product_id, user_id)
)
if cursor.rowcount == 0:
# Откатываем транзакцию, так как корзина не обновлена
connection.rollback()
raise ValueError("Пользователь не найден")
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def delete_product_from_basket(token: str, product_id: int) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Получаем текущую корзину
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
row = cursor.fetchone()
if row is None:
raise ValueError("Пользователь не найден")
products = list(row[0])
if product_id not in products:
raise ValueError("Товар отсутствует в корзине")
# Удаляем первое вхождение
products.remove(product_id)
# Возвращаем товар в наличие
cursor.execute("""
UPDATE shop
SET count = count + 1, reserved = reserved - 1
WHERE id = %s AND reserved > 0
""", (product_id,))
if cursor.rowcount == 0:
# Откатываем изменения в корзине
connection.rollback()
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError("Товар не найден")
else:
raise ValueError("Невозможно вернуть товар: reserved <= 0")
# Сохраняем обновлённую корзину
cursor.execute(
"UPDATE basket SET products_id = %s WHERE user_id = %s",
(products, user_id)
)
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def clear_basket(token: str) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Получаем текущую корзину
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
row = cursor.fetchone()
if row is None:
raise ValueError("Пользователь не найден")
products_id = row[0] # Это list[int] или None
if not products_id: # Пустой или None
# Нечего очищать — просто убедимся, что корзина пуста
cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,))
connection.commit()
return
# Считаем количество каждого product_id
product_counts = Counter(products_id) # {product_id: quantity}
# Для каждого уникального товара возвращаем его на склад
for product_id, quantity in product_counts.items():
cursor.execute("""
UPDATE shop
SET count = count + %s, reserved = reserved - %s
WHERE id = %s AND reserved >= %s
""", (quantity, quantity, product_id, quantity))
if cursor.rowcount == 0:
# Проверяем, существует ли товар
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError(f"Товар с id={product_id} не найден")
else:
raise ValueError(
f"Недостаточно зарезервированных товаров для id={product_id}: "
f"попытка вернуть {quantity}, но reserved < {quantity}"
)
# Полностью очищаем корзину
cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,))
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def buy_products(token: str) -> int:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# 1. Получаем корзину
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
basket_row = cursor.fetchone()
if basket_row is None:
raise ValueError("Пользователь не найден")
products_id = basket_row[0]
if not products_id:
raise ValueError("Корзина пуста")
# 2. Получаем стоимость товаров и проверяем их наличие
product_counts = Counter(products_id)
total_cost = Decimal('0.00')
for product_id in product_counts:
cursor.execute("SELECT cost::NUMERIC FROM shop WHERE id = %s", (product_id,))
shop_row = cursor.fetchone()
if shop_row is None:
raise ValueError(f"Товар с id={product_id} не найден")
cost = shop_row[0] # Decimal
total_cost += cost * product_counts[product_id]
# 3. Проверяем баланс пользователя
cursor.execute("SELECT money::NUMERIC FROM users WHERE id = %s", (user_id,))
user_row = cursor.fetchone()
if user_row is None:
raise ValueError("Пользователь не найден")
user_money = user_row[0] # Decimal
if user_money < total_cost:
raise ValueError("Недостаточно средств на счету")
# 4. Списываем деньги
new_balance = user_money - total_cost
cursor.execute(
"UPDATE users SET money = %s WHERE id = %s",
(new_balance, user_id)
)
# 5. Вставляем запись в history и получаем её id
cursor.execute("""
INSERT INTO history (user_id, products_id, products_cost, date)
VALUES (%s, %s, %s, CURRENT_DATE)
RETURNING id
""", (user_id, products_id, total_cost))
history_id = cursor.fetchone()[0] # Получаем сгенерированный id
# 6. Добавляем history_id в users.histories_id
cursor.execute("""
UPDATE users
SET histories_id = array_append(COALESCE(histories_id, '{}'), %s)
WHERE id = %s
""", (history_id, user_id))
if cursor.rowcount == 0:
raise ValueError("Не удалось обновить историю пользователя")
# 7. Уменьшаем reserved в shop для каждого купленного товара
for product_id, quantity in product_counts.items():
cursor.execute("""
UPDATE shop
SET reserved = reserved - %s
WHERE id = %s AND reserved >= %s
""", (quantity, product_id, quantity))
if cursor.rowcount == 0:
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError(f"Товар с id={product_id} не найден")
else:
raise ValueError(
f"Недостаточно зарезервированных товаров для id={product_id}: "
f"попытка списать {quantity}, но reserved < {quantity}"
)
# 8. Очищаем корзину (products_id = пустой массив)
cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,))
connection.commit()
return history_id
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def add_money(token: str, money: str) -> None:
# Валидация и преобразование входной суммы
try:
# Удаляем возможные пробелы и заменяем запятую на точку (для удобства)
money_clean = money.strip().replace(',', '.')
amount = Decimal(money_clean)
if amount < 0:
raise ValueError("Сумма должна быть неотрицательной")
except (InvalidOperation, ValueError) as e:
raise ValueError("Некорректный формат суммы") from e
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Обновляем баланс: money = money + amount
cursor.execute("""
UPDATE users
SET money = money + %s
WHERE id = %s
""", (amount, user_id))
if cursor.rowcount == 0:
raise ValueError("Пользователь не найден")
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()