buy_products

main
Stepan Pilipenko 2 months ago
parent b60b5840a1
commit b41a99fd21

@ -1,7 +1,10 @@
import uuid
import psycopg2
import psycopg2.extras
import json
from datetime import timedelta
from psycopg2 import IntegrityError
from collections import Counter
from utils import *
from type import *
@ -139,7 +142,7 @@ def get_product(product_id: int):
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
res = fetch_row_as_json(cursor, "shop", product_id)
res = get_product_by_id(cursor, product_id)
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
@ -153,12 +156,33 @@ def get_product(product_id: int):
def get_basket(token: str):
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
res = None
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
res = fetch_table_as_json(cursor, "basket")
user_id: int = get_user_id(cursor, token)
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
res = cursor.fetchone()
if res is None or res[0] is None:
return json.dumps([])
products_id = res[0] # Это список, например: [1, 1, 3]
# Распаковываем массив и соединяем каждый элемент с таблицей shop
cursor.execute("""
SELECT json_agg(row_to_json(s)) AS result
FROM unnest(%s) AS pid
JOIN shop s ON s.id = pid
""", (products_id,))
result = cursor.fetchone()[0]
if result is None:
return json.dumps([])
return result
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
@ -168,17 +192,35 @@ def get_basket(token: str):
cursor.close()
connection.close()
return res
def get_history(token: str):
def get_histories(token: str):
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
res = None
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
res = fetch_table_as_json(cursor, "history")
user_id: int = get_user_id(cursor, token)
cursor.execute("SELECT histories_id FROM users WHERE id = %s", (user_id,))
res = cursor.fetchone()
if res is None or res[0] is None or len(res[0]) == 0:
return json.dumps([])
histories_id = res[0]
cursor.execute("""
SELECT json_agg(row_to_json(h)) AS result
FROM unnest(%s::INTEGER[]) AS hid
JOIN history h ON h.id = hid
""", (histories_id,))
result = cursor.fetchone()[0]
if result is None:
return json.dumps([])
return result
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
@ -188,8 +230,6 @@ def get_history(token: str):
cursor.close()
connection.close()
return res
def add_product_to_basket(token: str, product_id: int) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
@ -199,16 +239,246 @@ def add_product_to_basket(token: str, product_id: int) -> None:
user_id: int = get_user_id(cursor, token)
# Атомарно уменьшаем count и увеличиваем reserved, только если count > 0
cursor.execute("""
UPDATE shop
SET count = count - 1, reserved = reserved + 1
WHERE id = %s AND count > 0
""", (product_id,))
if cursor.rowcount == 0:
# Проверим, существует ли товар вообще
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError("Товар не найден")
else:
raise ValueError("Недостаточно товара в наличии (count <= 0)")
# Добавляем товар в корзину пользователя
cursor.execute(
"UPDATE basket SET products_id = array_append(products_id, %s) WHERE user_id = %s",
(product_id, user_id)
)
if cursor.rowcount == 0:
# Откатываем транзакцию, так как корзина не обновлена
connection.rollback()
raise ValueError("Пользователь не найден")
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def delete_product_from_basket(token: str, product_id: int) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Получаем текущую корзину
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
row = cursor.fetchone()
if row is None:
raise ValueError("Пользователь не найден")
products = list(row[0])
if product_id not in products:
raise ValueError("Товар отсутствует в корзине")
# Удаляем первое вхождение
products.remove(product_id)
# Возвращаем товар в наличие
cursor.execute("""
UPDATE shop
SET count = count + 1, reserved = reserved - 1
WHERE id = %s AND reserved > 0
""", (product_id,))
if cursor.rowcount == 0:
# Откатываем изменения в корзине
connection.rollback()
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError("Товар не найден")
else:
raise ValueError("Невозможно вернуть товар: reserved <= 0")
# Сохраняем обновлённую корзину
cursor.execute(
"UPDATE basket SET products_id = %s WHERE user_id = %s",
(products, user_id)
)
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def clear_basket(token: str) -> None:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# Получаем текущую корзину
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
row = cursor.fetchone()
if row is None:
raise ValueError("Пользователь не найден")
products_id = row[0] # Это list[int] или None
if not products_id: # Пустой или None
# Нечего очищать — просто убедимся, что корзина пуста
cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,))
connection.commit()
return
# Считаем количество каждого product_id
product_counts = Counter(products_id) # {product_id: quantity}
# Для каждого уникального товара возвращаем его на склад
for product_id, quantity in product_counts.items():
cursor.execute("""
UPDATE shop
SET count = count + %s, reserved = reserved - %s
WHERE id = %s AND reserved >= %s
""", (quantity, quantity, product_id, quantity))
if cursor.rowcount == 0:
# Проверяем, существует ли товар
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError(f"Товар с id={product_id} не найден")
else:
raise ValueError(
f"Недостаточно зарезервированных товаров для id={product_id}: "
f"попытка вернуть {quantity}, но reserved < {quantity}"
)
# Полностью очищаем корзину
cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,))
connection.commit()
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:
cursor.close()
connection.close()
def buy_products(token: str) -> int:
connection = psycopg2.connect(**DB_CONFIG)
cursor = connection.cursor()
try:
if not check_token(cursor, token):
raise AuthError("Пользователь не авторизован")
user_id: int = get_user_id(cursor, token)
# 1. Получаем корзину
cursor.execute("SELECT products_id FROM basket WHERE user_id = %s", (user_id,))
basket_row = cursor.fetchone()
if basket_row is None:
raise ValueError("Пользователь не найден")
products_id = basket_row[0]
if not products_id:
raise ValueError("Корзина пуста")
# 2. Получаем стоимость товаров и проверяем их наличие
product_counts = Counter(products_id)
total_cost = 0
for product_id in product_counts:
cursor.execute("SELECT cost::NUMERIC FROM shop WHERE id = %s", (product_id,))
shop_row = cursor.fetchone()
if shop_row is None:
raise ValueError(f"Товар с id={product_id} не найден")
cost = shop_row[0]
total_cost += cost * product_counts[product_id]
# 3. Вставляем запись в history и получаем её id
cursor.execute("""
INSERT INTO history (user_id, products_id, products_cost, date)
VALUES (%s, %s, %s, CURRENT_DATE)
RETURNING id
""", (user_id, products_id, total_cost))
history_id = cursor.fetchone()[0] # Получаем сгенерированный id
# 4. Добавляем history_id в users.histories_id
cursor.execute("""
UPDATE users
SET histories_id = array_append(COALESCE(histories_id, '{}'), %s)
WHERE id = %s
""", (history_id, user_id))
if cursor.rowcount == 0:
raise ValueError("Не удалось обновить историю пользователя")
# 5. Уменьшаем reserved в shop для каждого купленного товара
for product_id, quantity in product_counts.items():
cursor.execute("""
UPDATE shop
SET reserved = reserved - %s
WHERE id = %s AND reserved >= %s
""", (quantity, product_id, quantity))
if cursor.rowcount == 0:
# Проверяем, существует ли товар
cursor.execute("SELECT id FROM shop WHERE id = %s", (product_id,))
if cursor.fetchone() is None:
raise ValueError(f"Товар с id={product_id} не найден")
else:
raise ValueError(
f"Недостаточно зарезервированных товаров для id={product_id}: "
f"попытка списать {quantity}, но reserved < {quantity}"
)
# 6. Очищаем корзину (products_id = пустой массив)
cursor.execute("UPDATE basket SET products_id = '{}' WHERE user_id = %s", (user_id,))
connection.commit()
return history_id # Можно вернуть id заказа, если нужно
except psycopg2.Error as e:
print("Ошибка при работе с PostgreSQL:", e)
connection.rollback()
raise
except ValueError:
connection.rollback()
raise
finally:

@ -21,4 +21,8 @@ if __name__ == "__main__":
# print(get_history(token))
token = login("vasili_pupkin@gmail.com", "Vasya2005")
add_product_to_basket(token, 1)
# add_product_to_basket(token, 1)
# add_product_to_basket(token, 1)
# add_product_to_basket(token, 1)
# add_product_to_basket(token, 2)
print(get_histories(token))

@ -47,3 +47,5 @@ def get_user_id(cursor, token: str) -> int:
result = cursor.fetchone()
return result[0]
def get_product_by_id(cursor, product_id):
fetch_row_as_json(cursor, "shop", product_id)

Loading…
Cancel
Save