"""SQLite database layer with FSRS spaced repetition integration.""" import json import sqlite3 from datetime import datetime, timedelta, timezone from pathlib import Path import fsrs DB_PATH = Path(__file__).parent / "data" / "progress.db" _conn = None _scheduler = fsrs.Scheduler() def get_connection(): """Return the shared SQLite connection (singleton).""" global _conn if _conn is None: DB_PATH.parent.mkdir(parents=True, exist_ok=True) _conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) _conn.row_factory = sqlite3.Row _conn.execute("PRAGMA journal_mode=WAL") return _conn def init_db(): """Create all tables if they don't exist. Called once at startup.""" conn = get_connection() conn.executescript(""" CREATE TABLE IF NOT EXISTS word_progress ( word_id TEXT PRIMARY KEY, fsrs_state TEXT, due TIMESTAMP, stability REAL, difficulty REAL, reps INTEGER DEFAULT 0, lapses INTEGER DEFAULT 0, last_review TIMESTAMP ); CREATE TABLE IF NOT EXISTS quiz_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, category TEXT, total_questions INTEGER, correct INTEGER, duration_seconds INTEGER ); CREATE TABLE IF NOT EXISTS essays ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, essay_text TEXT, grade TEXT, feedback TEXT, theme TEXT ); CREATE TABLE IF NOT EXISTS tutor_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, theme TEXT, messages TEXT, duration_seconds INTEGER ); """) conn.commit() def get_word_progress(word_id): """Return learning state for one word, or None if never reviewed.""" conn = get_connection() row = conn.execute( "SELECT * FROM word_progress WHERE word_id = ?", (word_id,) ).fetchone() return dict(row) if row else None def update_word_progress(word_id, rating): """Run FSRS algorithm, update due date/stability/difficulty. Args: word_id: Vocabulary entry ID. rating: fsrs.Rating value (Again=1, Hard=2, Good=3, Easy=4). """ conn = get_connection() existing = get_word_progress(word_id) if existing and existing["fsrs_state"]: card = fsrs.Card.from_dict(json.loads(existing["fsrs_state"])) else: card = fsrs.Card() card, review_log = _scheduler.review_card(card, rating) now = datetime.now(timezone.utc).isoformat() card_json = json.dumps(card.to_dict(), default=str) conn.execute( """INSERT OR REPLACE INTO word_progress (word_id, fsrs_state, due, stability, difficulty, reps, lapses, last_review) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( word_id, card_json, card.due.isoformat(), card.stability, card.difficulty, (existing["reps"] + 1) if existing else 1, existing["lapses"] if existing else 0, now, ), ) conn.commit() return card def get_due_words(limit=20): """Return word IDs where due <= now, ordered by due date.""" conn = get_connection() now = datetime.now(timezone.utc).isoformat() rows = conn.execute( "SELECT word_id FROM word_progress WHERE due <= ? ORDER BY due LIMIT ?", (now, limit), ).fetchall() return [row["word_id"] for row in rows] def get_word_counts(total_vocab_size=0): """Return dict with total/seen/mastered/due counts for dashboard.""" conn = get_connection() now = datetime.now(timezone.utc).isoformat() seen = conn.execute("SELECT COUNT(*) FROM word_progress").fetchone()[0] mastered = conn.execute( "SELECT COUNT(*) FROM word_progress WHERE stability > 10" ).fetchone()[0] due = conn.execute( "SELECT COUNT(*) FROM word_progress WHERE due <= ?", (now,) ).fetchone()[0] return { "total": total_vocab_size, "seen": seen, "mastered": mastered, "due": due, } def get_all_word_progress(): """Return all word progress as a dict of word_id -> progress dict.""" conn = get_connection() rows = conn.execute("SELECT * FROM word_progress").fetchall() return {row["word_id"]: dict(row) for row in rows} def record_quiz_session(category, total_questions, correct, duration_seconds): """Log a completed flashcard session.""" conn = get_connection() conn.execute( "INSERT INTO quiz_sessions (category, total_questions, correct, duration_seconds) VALUES (?, ?, ?, ?)", (category, total_questions, correct, duration_seconds), ) conn.commit() def save_essay(essay_text, grade, feedback, theme): """Save an essay + AI feedback.""" conn = get_connection() conn.execute( "INSERT INTO essays (essay_text, grade, feedback, theme) VALUES (?, ?, ?, ?)", (essay_text, grade, feedback, theme), ) conn.commit() def save_tutor_session(theme, messages, duration_seconds): """Save a tutor conversation.""" conn = get_connection() conn.execute( "INSERT INTO tutor_sessions (theme, messages, duration_seconds) VALUES (?, ?, ?)", (theme, json.dumps(messages, ensure_ascii=False), duration_seconds), ) conn.commit() def get_stats(): """Aggregate data for the dashboard.""" conn = get_connection() recent_quizzes = conn.execute( "SELECT * FROM quiz_sessions ORDER BY timestamp DESC LIMIT 10" ).fetchall() total_reviews = conn.execute( "SELECT COALESCE(SUM(reps), 0) FROM word_progress" ).fetchone()[0] total_quizzes = conn.execute( "SELECT COUNT(*) FROM quiz_sessions" ).fetchone()[0] # Streak: count consecutive days with activity days = conn.execute( "SELECT DISTINCT DATE(last_review) as d FROM word_progress WHERE last_review IS NOT NULL ORDER BY d DESC" ).fetchall() streak = 0 today = datetime.now(timezone.utc).date() for i, row in enumerate(days): day = datetime.fromisoformat(row["d"]).date() if isinstance(row["d"], str) else row["d"] expected = today - timedelta(days=i) if day == expected: streak += 1 else: break return { "recent_quizzes": [dict(r) for r in recent_quizzes], "total_reviews": total_reviews, "total_quizzes": total_quizzes, "streak": streak, } def get_recent_essays(limit=10): """Return recent essays for the essay history view.""" conn = get_connection() rows = conn.execute( "SELECT * FROM essays ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] def close(): """Close the database connection.""" global _conn if _conn: _conn.close() _conn = None