Liga-System/data/data_api.py

669 lines
21 KiB
Python

import sqlite3
import random
from wood import logger
from data.setup_database import DB_PATH
def validate_user_session(db_id, discord_id):
"""Prüft, ob das Cookie noch zur aktuellen Datenbank passt."""
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
cursor.execute("SELECT discord_id FROM players WHERE id = ?", (db_id,))
result = cursor.fetchone()
connection.close()
# 1. Fall: Die ID gibt es gar nicht mehr in der Datenbank
if result is None:
return False
# 2. Fall: Die ID gehört jetzt einem anderen Discord-Account (Datenbank wurde resettet)
if str(result[0]) != str(discord_id):
return False
# 3. Fall: Alles ist korrekt!
return True
def change_display_name(player_id, new_name):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
cursor.execute("UPDATE players SET display_name = ? WHERE id = ?", (new_name, player_id))
connection.commit()
connection.close()
def generate_silly_name():
adjectives = ["Verwirrter", "Blinder", "Heulender", "Zorniger", "Chaos", "Verzweifelter", "Schreiender", "Stolpernder", "Schwitzender"]
nouns = ["Grot", "Kultist", "Servitor", "Snotling", "Guardmen", "Würfellecker", "Regelvergesser", "Meta-Chaser", "Klebschnüffler"]
adj = random.choice(adjectives)
noun = random.choice(nouns)
return f"{adj} {noun}"
def get_or_create_player(discord_id, discord_name, avatar_url):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# fragen 4 Dinge ab (id, discord_name, display_name, discord_avatar_url)
cursor.execute("SELECT id, discord_name, display_name, discord_avatar_url FROM players WHERE discord_id = ?", (discord_id,))
player = cursor.fetchone()
if player is None:
# Random Silly Name Generator für neue Spieler. Damit sie angeregt werden ihren richtigen Namen einzutragen.
silly_name = generate_silly_name()
cursor.execute("INSERT INTO players (discord_id, discord_name, display_name, discord_avatar_url) VALUES (?, ?, ?, ?)", (discord_id, discord_name, silly_name, avatar_url))
logger.log("NEW PLAYER", str("Ein neuer Spieler wurde angelegt - " + discord_name))
connection.commit()
cursor.execute("SELECT id, discord_name, display_name, discord_avatar_url FROM players WHERE discord_id = ?", (discord_id,))
player = cursor.fetchone()
else:
# Falls sich Name oder Bild auf Discord geändert haben, machen wir ein Update
cursor.execute("UPDATE players SET discord_name = ?, discord_avatar_url = ? WHERE discord_id = ?", (discord_name, avatar_url, discord_id))
connection.commit()
cursor.execute("SELECT id, discord_name, display_name, discord_avatar_url FROM players WHERE discord_id = ?", (discord_id,))
player = cursor.fetchone()
connection.close()
return player
def get_all_players():
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# Alle Spieler laden, absteigend sortiert nach MMR
cursor.execute("SELECT id, name, mmr, points, games FROM players ORDER BY mmr DESC")
players = cursor.fetchall()
connection.close()
# Die Daten für das Web-GUI in eine lesbare Form umwandeln (Liste von Dictionaries)
result = []
return result
def get_all_players_from_system(system_name):
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
# ID und Namen der Spieler.
# DISTINCT stellt sicher, dass jeder Spieler nur einmal vorkommt.
query = """
SELECT DISTINCT
p.id AS player_id,
p.display_name,
p.discord_name
FROM players p
JOIN player_game_statistic stat ON p.id = stat.player_id
JOIN gamesystems sys ON stat.gamesystem_id = sys.id
WHERE sys.name = ?
ORDER BY p.display_name ASC
"""
cursor.execute(query, (system_name,))
rows = cursor.fetchall()
connection.close()
result = []
for row in rows:
result.append(dict(row))
return result
def get_gamesystem_id_by_name(system_name):
"""Holt die interne ID eines Spielsystems anhand seines Namens (z.B. 'Warhammer 40k' -> 1)."""
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# Wir suchen exakt nach dem Namen
cursor.execute("SELECT id FROM gamesystems WHERE name = ?", (system_name,))
row = cursor.fetchone()
connection.close()
# Wenn wir einen Treffer haben, geben wir die erste Spalte (die ID) zurück
if row:
return row[0]
# Wenn das System nicht existiert
return None
def get_player_statistics(player_id):
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
query = """
SELECT
sys.id AS gamesystem_id,
sys.name AS gamesystem_name,
sys.*,
stat.mmr,
stat.games_in_system,
stat.points
FROM gamesystems sys
LEFT JOIN player_game_statistic stat
ON sys.id = stat.gamesystem_id AND stat.player_id = ?
"""
cursor.execute(query, (player_id,))
rows = cursor.fetchall()
connection.close()
result = []
for row in rows:
result.append(dict(row))
return result
def join_league(player_id, gamesystem_id):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# Wir fügen nur die beiden IDs ein, der Rest wird von den DEFAULT-Werten der DB erledigt
query = """
INSERT INTO player_game_statistic (player_id, gamesystem_id)
VALUES (?, ?)
"""
logger.log("DataAPI", f"{get_player_name(player_id)} joined {gamesystem_id}", player_id)
cursor.execute(query, (player_id, gamesystem_id))
connection.commit()
connection.close()
def get_recent_matches_for_player(player_id):
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
query = """
SELECT
m.id AS match_id,
sys.name AS gamesystem_name,
m.player1_id,
p1.display_name AS p1_display,
p1.discord_name AS p1_discord,
m.score_player1,
m.player2_id,
p2.display_name AS p2_display,
p2.discord_name AS p2_discord,
m.score_player2,
m.played_at
FROM matches m
JOIN gamesystems sys ON m.gamesystem_id = sys.id
JOIN players p1 ON m.player1_id = p1.id
JOIN players p2 ON m.player2_id = p2.id
WHERE m.player1_id = ? OR m.player2_id = ?
ORDER BY m.played_at DESC
LIMIT 10
"""
cursor.execute(query, (player_id, player_id))
rows = cursor.fetchall()
connection.close()
result = []
for row in rows:
result.append(dict(row))
return result
def add_new_match(system_name, player1_id, player2_id, score_p1, score_p2):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# 1. Wir suchen die interne ID des Spielsystems anhand des Namens (z.B. "Warhammer 40k" -> 1)
cursor.execute("SELECT id FROM gamesystems WHERE name = ?", (system_name,))
sys_row = cursor.fetchone()
# Sicherheitscheck (sollte eigentlich nie passieren)
if not sys_row:
connection.close()
return None
sys_id = sys_row[0]
# 2. Das Match eintragen! (Datum 'played_at' macht SQLite durch DEFAULT CURRENT_TIMESTAMP automatisch)
query = """
INSERT INTO matches (gamesystem_id, player1_id, player2_id, score_player1, score_player2)
VALUES (?, ?, ?, ?, ?)
"""
cursor.execute(query, (sys_id, player1_id, player2_id, score_p1, score_p2))
new_match_id = cursor.lastrowid
logger.log("DataAPI", f"{get_player_name(player1_id)}:({score_p1}) posted Match. System: {system_name}, {get_player_name(player2_id)}:({score_p2})", player1_id)
connection.commit()
connection.close()
return new_match_id
def save_calculated_match(calc_results: dict):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
try:
match_id = calc_results["match_id"]
winner_id = calc_results["winner_id"]
looser_id = calc_results["looser_id"]
# Match aus DB lesen
cursor.execute(
"SELECT player1_id, player2_id, gamesystem_id FROM matches WHERE id = ?",
(match_id,)
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Match ID {match_id} nicht in der Datenbank gefunden.")
player1_id, player2_id, gamesystem_id = row
# Daten der Spieler aus calc_results holen (Key = player_id als int/str)
p1 = calc_results[player1_id]
p2 = calc_results[player2_id]
# 1. Match-Tabelle updaten
match_query = """
UPDATE matches
SET
player1_base_change = ?, player1_khorne = ?, player1_slaanesh = ?,
player1_tzeentch = ?, player1_mmr_change = ?,
player2_base_change = ?, player2_khorne = ?, player2_slaanesh = ?,
player2_tzeentch = ?, player2_mmr_change = ?,
rust_factor = ?, elo_factor = ?, point_inflation = ?,
match_is_counted = 1
WHERE id = ?
"""
cursor.execute(match_query, (
p1["base"], p1["khorne"], p1["slaanesh"], p1["tzeentch"], p1["total"],
p2["base"], p2["khorne"], p2["slaanesh"], p2["tzeentch"], p2["total"],
calc_results["rust_factor"], calc_results["elo_factor"], calc_results["point_inflation"],
match_id
))
# 2. Scores holen
cursor.execute(
"SELECT score_player1, score_player2 FROM matches WHERE id = ?",
(match_id,)
)
score_row = cursor.fetchone()
score_p1, score_p2 = score_row if score_row else (0, 0)
# 3. Statistik-Query
stat_query = """
UPDATE player_game_statistic
SET
mmr = mmr + ?,
games_in_system = games_in_system + 1,
points = points + ?,
avv_points = (points + ?) / (games_in_system + 1),
last_played = CURRENT_TIMESTAMP
WHERE player_id = ? AND gamesystem_id = ?
"""
# 4. Statistik Spieler 1
cursor.execute(stat_query, (
p1["total"], score_p1, score_p1,
player1_id, gamesystem_id
))
# 5. Statistik Spieler 2
cursor.execute(stat_query, (
p2["total"], score_p2, score_p2,
player2_id, gamesystem_id
))
connection.commit()
logger.log("MATCH_CALC", f"Match ID {match_id} wurde komplett berechnet und verbucht.")
return True
except Exception as e:
connection.rollback()
print(f"KRITISCHER FEHLER beim Speichern des Matches: {e}")
return False
finally:
connection.close()
# -----------------------------------------------------
# Get Data Funktionen
# -----------------------------------------------------
def get_gamesystem_data(system_id):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
cursor.execute("SELECT * FROM gamesystems WHERE id = ?", (system_id,))
row = cursor.fetchone()
connection.close()
return dict(zip([col[0] for col in cursor.description], row)) if row else None
def get_gamesystems_data():
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
cursor.execute("SELECT * FROM gamesystems")
rows = cursor.fetchall()
connection.close()
# SQLite-Rows in normale Python-Dictionaries um
result = []
for row in rows:
result.append(dict(row))
return result
def get_leaderboard(system_name):
"""Holt alle Spieler eines Systems sortiert nach MMR für die Rangliste."""
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
# WIR HABEN HIER EINE BEDINGUNG HINZUGEFÜGT: AND stat.games_in_system > 0
# Dadurch filtert die Datenbank direkt auf dem Server schon alle "0-Spiele"-Accounts raus.
query = """
SELECT p.id, p.display_name, p.discord_name, stat.mmr
FROM players p
JOIN player_game_statistic stat ON p.id = stat.player_id
JOIN gamesystems sys ON stat.gamesystem_id = sys.id
WHERE sys.name = ? AND stat.games_in_system > 0
ORDER BY stat.mmr DESC
"""
cursor.execute(query, (system_name,))
rows = cursor.fetchall()
connection.close()
result = []
for row in rows:
result.append(dict(row))
return result
def get_match_by_id(match_id: int) -> dict | None:
"""Gibt alle Match-Daten inkl. Gamesystem-Name als Dict zurück."""
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
try:
cursor.execute("""
SELECT
m.id,
m.gamesystem_id,
g.name AS gamesystem_name,
m.player1_id,
m.score_player1,
m.player2_id,
m.score_player2,
m.played_at
FROM matches m
JOIN gamesystems g ON m.gamesystem_id = g.id
WHERE m.id = ?
""", (match_id,))
row = cursor.fetchone()
if not row:
return None
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, row))
except Exception as e:
print(f"Fehler beim Laden des Matches: {e}")
return None
finally:
connection.close()
def get_player_name(player_id):
"""Gibt den Namen eines Spielers im Format 'Anzeigename (Discordname)' zurück."""
# Sicherheits-Check: Falls aus Versehen gar keine ID übergeben wird
if player_id is None:
return "Unbekannter Spieler"
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
cursor.execute("SELECT display_name, discord_name FROM players WHERE id = ?", (player_id,))
row = cursor.fetchone()
connection.close()
# Wenn die Datenbank den Spieler gefunden hat:
if row:
display_name = row[0]
discord_name = row[1]
return f"{display_name} ({discord_name})"
else:
return "Gelöschter Spieler"
def get_system_name(sys_id):
if sys_id is None:
return "Unbekanntes System"
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
cursor.execute("SELECT name FROM gamesystems WHERE id = ?", (sys_id,))
row = cursor.fetchone()
connection.close()
if row:
return row[0]
else:
return "Gelöschtes System"
def get_days_since_last_system_game(player_id, gamesystem_id):
"""Gibt zurück, wie viele Tage das letzte Spiel in einem bestimmten System her ist."""
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# Julianday() berechnet in SQLite die Differenz in Tagen zwischen zwei Daten
query = """
SELECT CAST(julianday('now') - julianday(last_played) AS INTEGER)
FROM player_game_statistic
WHERE player_id = ? AND gamesystem_id = ?
"""
cursor.execute(query, (player_id, gamesystem_id))
result = cursor.fetchone()
connection.close()
# Wenn er gefunden wurde und ein Datum hat, gib die Tage zurück. Sonst (Erstes Spiel) 0.
if result and result[0] is not None:
return result[0]
return 0
# -----------------------------------------------------
# Matches Bestätigen, Löschen, Berechnen, ...
# -----------------------------------------------------
def get_unconfirmed_matches(player_id):
"""Holt alle offenen Matches, die der Spieler noch bestätigen muss."""
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
# Wir joinen players (für den Namen des Gegners) und gamesystems (für den Systemnamen)
query = """
SELECT m.id AS match_id, m.score_player1, m.score_player2, m.played_at,
sys.name AS system_name,
p1.display_name AS p1_name
FROM matches m
JOIN gamesystems sys ON m.gamesystem_id = sys.id
JOIN players p1 ON m.player1_id = p1.id
WHERE m.player2_id = ? AND m.player2_check = 0
"""
cursor.execute(query, (player_id,))
rows = cursor.fetchall()
connection.close()
# Wir geben eine Liste mit Dictionaries zurück
return [dict(row) for row in rows]
def delete_match(match_id, player_id):
"""Löscht ein Match anhand seiner ID komplett aus der Datenbank."""
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# DELETE FROM löscht die gesamte Zeile, bei der die ID übereinstimmt.
cursor.execute("DELETE FROM matches WHERE id = ?", (match_id,))
logger.log("data_api.delete_match", f"Match mit ID{match_id} von User gelöscht.", f"{player_id}")
connection.commit()
connection.close()
def confirm_match(match_id):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# Ändert nur die Spalte player2_check auf 1 (True)
cursor.execute("UPDATE matches SET player2_check = 1 WHERE id = ?", (match_id,))
logger.log("data_api.confirm_match", f"Match mit ID{match_id} von Player2 bestätigt.")
connection.commit()
connection.close()
def set_match_counted(match_id):
"""Setzt den Haken (1), dass das Match erfolgreich in die MMR eingeflossen ist."""
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# Ändert nur die Spalte match_is_counted auf 1 (True)
cursor.execute("UPDATE matches SET match_is_counted = 1 WHERE id = ?", (match_id,))
logger.log("data_api.set_match_counted", f"Match mit ID: {match_id} berechnet.")
connection.commit()
connection.close()
def get_submitted_matches(player_id):
"""Holt alle offenen Matches, die der Spieler selbst eingetragen hat, aber vom Gegner noch nicht bestätigt wurden."""
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
# ACHTUNG: Wir joinen hier p2 (player2), weil wir den Namen des Gegners anzeigen wollen!
query = """
SELECT m.id AS match_id, m.score_player1, m.score_player2, m.played_at,
sys.name AS system_name,
p2.display_name AS p2_name
FROM matches m
JOIN gamesystems sys ON m.gamesystem_id = sys.id
JOIN players p2 ON m.player2_id = p2.id
WHERE m.player1_id = ? AND m.player2_check = 0
"""
cursor.execute(query, (player_id,))
rows = cursor.fetchall()
connection.close()
return [dict(row) for row in rows]
def get_match_history_log(player_id):
"""Holt ALLE Matches eines Spielers inklusive der MMR-Änderungen für das Log."""
connection = sqlite3.connect(DB_PATH)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
query = """
SELECT m.played_at, sys.name AS gamesystem_name,
m.player1_id, p1.display_name AS p1_display, p1.discord_name AS p1_discord, m.score_player1, m.player1_mmr_change,
m.player2_id, p2.display_name AS p2_display, p2.discord_name AS p2_discord, m.score_player2, m.player2_mmr_change,
m.player2_check, m.match_is_counted
FROM matches m
JOIN gamesystems sys ON m.gamesystem_id = sys.id
JOIN players p1 ON m.player1_id = p1.id
JOIN players p2 ON m.player2_id = p2.id
WHERE m.player1_id = ? OR m.player2_id = ?
ORDER BY m.played_at DESC
"""
cursor.execute(query, (player_id, player_id))
rows = cursor.fetchall()
connection.close()
return [dict(row) for row in rows]
# -----------------------------------------------------
# Testing and Prototyping
# -----------------------------------------------------
def create_random_dummy_match(player_id):
connection = sqlite3.connect(DB_PATH)
cursor = connection.cursor()
# 1. Die ID des Dummys suchen (Wir suchen einfach nach dem Namen 'Dummy')
cursor.execute("SELECT id FROM players WHERE display_name LIKE '%Dummy%' OR discord_name LIKE '%Dummy%' LIMIT 1")
dummy_row = cursor.fetchone()
if not dummy_row:
connection.close()
print("Fehler: Kein Dummy in der Datenbank gefunden!")
return False
dummy_id = dummy_row[0]
# 2. Zufällige Punkte generieren (z.B. zwischen 0 und 100)
score_p1 = random.randint(0, 100)
score_p2 = random.randint(0, 100)
# 3. Das Match hart in System 1 (gamesystem_id = 1) eintragen
query = """
INSERT INTO matches (gamesystem_id, player2_id, player1_id, score_player1, score_player2, player2_check)
VALUES (?, ?, ?, ?, ?, 0)
"""
cursor.execute(query, (1, player_id, dummy_id, score_p1, score_p2))
connection.commit()
connection.close()
p1_name = get_player_name(player_id)
dummy_name = get_player_name(dummy_id)
sys_name = get_system_name(1)
logger.log("TEST_MATCH", f"Zufallsspiel generiert. [{sys_name}]: {p1_name} ({score_p1}) vs. {dummy_name} ({score_p2})", player_id)
return True