Add persian-tutor: Gradio-based GCSE Persian language learning app

Vocabulary study with FSRS spaced repetition, AI tutoring (Ollama/Claude),
essay marking, idioms browser, Anki export, and dashboard. 918 vocabulary
entries across 39 categories. 41 tests passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
local
2026-02-08 01:57:44 +00:00
parent 104da381fb
commit 2e8c2c11d0
22 changed files with 10664 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
# Persian Language Tutor
## Overview
Gradio-based Persian (Farsi) language learning app for English speakers, using GCSE Persian vocabulary (Pearson spec) as seed data.
## Tech Stack
- **Frontend**: Gradio (browser handles RTL natively)
- **Spaced repetition**: py-fsrs (same algorithm as Anki)
- **AI**: Ollama (fast, local) + Claude CLI (smart, subprocess)
- **STT**: faster-whisper via sttlib from tool-speechtotext
- **Anki export**: genanki for .apkg generation
- **Database**: SQLite (file-based, data/progress.db)
- **Environment**: `whisper-ollama` conda env
## Running
```bash
mamba run -n whisper-ollama python app.py
```
## Testing
```bash
mamba run -n whisper-ollama python -m pytest tests/
```
## Key Paths
- `data/vocabulary.json` — GCSE vocabulary data
- `data/progress.db` — SQLite database (auto-created)
- `app.py` — Gradio entry point
- `db.py` — Database layer with FSRS integration
- `ai.py` — Dual AI backend (Ollama + Claude)
- `stt.py` — Persian speech-to-text wrapper
- `modules/` — Feature modules (vocab, dashboard, essay, tutor, idioms)
## Architecture
- Single-process Gradio app with shared SQLite connection
- FSRS Card objects serialized as JSON in SQLite TEXT columns
- Timestamps stored as ISO-8601 strings
- sttlib imported via sys.path from tool-speechtotext project

View File

@@ -0,0 +1,57 @@
# Persian Language Tutor
A Gradio-based Persian (Farsi) language learning app for English speakers, built around GCSE Persian vocabulary (Pearson specification).
## Features
- **Vocabulary Study** — Search, browse, and study 918 GCSE Persian words across 39 categories
- **Flashcards with FSRS** — Spaced repetition scheduling (same algorithm as Anki)
- **Idioms & Expressions** — 25 Persian social conventions with cultural context
- **AI Tutor** — Conversational Persian lessons by GCSE theme (via Ollama)
- **Essay Marking** — Write Persian essays, get AI feedback and grading (via Claude)
- **Dashboard** — Track progress, streaks, and mastery
- **Anki Export** — Generate .apkg decks for offline study
- **Voice Input** — Speak Persian via microphone (Whisper STT) in the Tutor tab
## Prerequisites
- `whisper-ollama` conda environment with Python 3.10+
- Ollama running locally with `qwen2.5:7b` (or another model)
- Claude CLI installed (for essay marking / smart mode)
## Setup
```bash
/home/ys/miniforge3/envs/whisper-ollama/bin/pip install gradio genanki fsrs
```
## Running the app
```bash
cd /home/ys/family-repo/Code/python/persian-tutor
/home/ys/miniforge3/envs/whisper-ollama/bin/python app.py
```
Then open http://localhost:7860 in your browser.
## Running tests
```bash
cd /home/ys/family-repo/Code/python/persian-tutor
/home/ys/miniforge3/envs/whisper-ollama/bin/python -m pytest tests/ -v
```
41 tests covering db, vocab, ai, and anki_export modules.
## Expanding vocabulary
The vocabulary can be expanded by editing `data/vocabulary.json` directly or by updating `scripts/build_vocab.py` and re-running it:
```bash
/home/ys/miniforge3/envs/whisper-ollama/bin/python scripts/build_vocab.py
```
## TODO
- [ ] Voice-based vocabulary testing — answer flashcard prompts by speaking Persian
- [ ] Improved UI theme and layout polish

View File

@@ -0,0 +1,44 @@
"""Dual AI backend: Ollama (fast/local) and Claude CLI (smart)."""
import subprocess
import ollama
DEFAULT_OLLAMA_MODEL = "qwen2.5:7b"
def ask_ollama(prompt, system=None, model=DEFAULT_OLLAMA_MODEL):
"""Query Ollama with an optional system prompt."""
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
response = ollama.chat(model=model, messages=messages)
return response.message.content
def ask_claude(prompt):
"""Query Claude via the CLI subprocess."""
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True,
text=True,
)
return result.stdout.strip()
def ask(prompt, system=None, quality="fast"):
"""Unified interface. quality='fast' uses Ollama, 'smart' uses Claude."""
if quality == "smart":
return ask_claude(prompt)
return ask_ollama(prompt, system=system)
def chat_ollama(messages, system=None, model=DEFAULT_OLLAMA_MODEL):
"""Multi-turn conversation with Ollama."""
all_messages = []
if system:
all_messages.append({"role": "system", "content": system})
all_messages.extend(messages)
response = ollama.chat(model=model, messages=all_messages)
return response.message.content

View File

@@ -0,0 +1,76 @@
"""Generate Anki .apkg decks from vocabulary data."""
import genanki
import random
# Stable model/deck IDs (generated once, kept constant)
_MODEL_ID = 1607392319
_DECK_ID = 2059400110
def _make_model():
"""Create an Anki note model with two card templates."""
return genanki.Model(
_MODEL_ID,
"GCSE Persian",
fields=[
{"name": "English"},
{"name": "Persian"},
{"name": "Finglish"},
{"name": "Category"},
],
templates=[
{
"name": "English → Persian",
"qfmt": '<div style="font-size:1.5em">{{English}}</div>'
'<br><small>{{Category}}</small>',
"afmt": '{{FrontSide}}<hr id="answer">'
'<div dir="rtl" style="font-size:2em">{{Persian}}</div>'
"<br><div>{{Finglish}}</div>",
},
{
"name": "Persian → English",
"qfmt": '<div dir="rtl" style="font-size:2em">{{Persian}}</div>'
'<br><small>{{Category}}</small>',
"afmt": '{{FrontSide}}<hr id="answer">'
'<div style="font-size:1.5em">{{English}}</div>'
"<br><div>{{Finglish}}</div>",
},
],
css=".card { font-family: arial; text-align: center; }",
)
def export_deck(vocab, categories=None, output_path="gcse-persian.apkg"):
"""Generate an Anki .apkg deck from vocabulary entries.
Args:
vocab: List of vocabulary entries (dicts with english, persian, finglish, category).
categories: Optional list of categories to include. None = all.
output_path: Where to save the .apkg file.
Returns:
Path to the generated .apkg file.
"""
model = _make_model()
deck = genanki.Deck(_DECK_ID, "GCSE Persian")
for entry in vocab:
if categories and entry.get("category") not in categories:
continue
note = genanki.Note(
model=model,
fields=[
entry.get("english", ""),
entry.get("persian", ""),
entry.get("finglish", ""),
entry.get("category", ""),
],
guid=genanki.guid_for(entry.get("id", entry["english"])),
)
deck.add_note(note)
package = genanki.Package(deck)
package.write_to_file(output_path)
return output_path

511
python/persian-tutor/app.py Normal file
View File

@@ -0,0 +1,511 @@
"""Persian Language Tutor — Gradio UI."""
import json
import os
import tempfile
import time
import gradio as gr
import db
from modules import vocab, dashboard, essay, tutor, idioms
from modules.essay import GCSE_THEMES
from modules.tutor import THEME_PROMPTS
from anki_export import export_deck
# ---------- Initialise ----------
db.init_db()
vocabulary = vocab.load_vocab()
categories = ["All"] + vocab.get_categories()
# ---------- Helper ----------
def _rtl(text, size="2em"):
return f'<div dir="rtl" style="font-size:{size}; text-align:center">{text}</div>'
# ================================================================
# TAB HANDLERS
# ================================================================
# ---------- Dashboard ----------
def refresh_dashboard():
overview_md = dashboard.format_overview_markdown()
cat_data = dashboard.get_category_breakdown()
quiz_data = dashboard.get_recent_quizzes()
return overview_md, cat_data, quiz_data
# ---------- Vocabulary Search ----------
def do_search(query, category):
results = vocab.search(query)
if category and category != "All":
results = [r for r in results if r["category"] == category]
if not results:
return "No results found."
lines = []
for r in results:
status = vocab.get_word_status(r["id"])
icon = {"new": "", "learning": "🟨", "mastered": "🟩"}.get(status, "")
lines.append(
f'{icon} **{r["english"]}** — '
f'<span dir="rtl">{r["persian"]}</span>'
f' ({r.get("finglish", "")})'
)
return "\n\n".join(lines)
def do_random_word(category, transliteration):
entry = vocab.get_random_word(category=category)
if not entry:
return "No words found."
return vocab.format_word_card(entry, show_transliteration=transliteration)
# ---------- Flashcards ----------
def start_flashcards(category, direction):
batch = vocab.get_flashcard_batch(count=10, category=category)
if not batch:
return "No words available.", [], 0, 0, "", gr.update(visible=False)
first = batch[0]
if direction == "English → Persian":
prompt = f'<div style="font-size:2em; text-align:center">{first["english"]}</div>'
else:
prompt = _rtl(first["persian"])
return (
prompt, # card_display
batch, # batch state
0, # current index
0, # score
"", # answer_box cleared
gr.update(visible=True), # answer_area visible
)
def submit_answer(user_answer, batch, index, score, direction, transliteration):
if not batch or index >= len(batch):
return "Session complete!", batch, index, score, "", gr.update(visible=False), ""
entry = batch[index]
dir_key = "en_to_fa" if direction == "English → Persian" else "fa_to_en"
is_correct, correct_answer, _ = vocab.check_answer(entry["id"], user_answer, direction=dir_key)
if is_correct:
score += 1
result = "✅ **Correct!**"
else:
result = f"❌ **Incorrect.** The answer is: "
if dir_key == "en_to_fa":
result += f'<span dir="rtl">{correct_answer}</span>'
else:
result += correct_answer
card_info = vocab.format_word_card(entry, show_transliteration=transliteration)
feedback = f"{result}\n\n{card_info}\n\n---\n*Rate your recall to continue:*"
return feedback, batch, index, score, "", gr.update(visible=True), ""
def rate_and_next(rating_str, batch, index, score, direction):
if not batch or index >= len(batch):
return "Session complete!", batch, index, score, gr.update(visible=False)
import fsrs as fsrs_mod
rating_map = {
"Again": fsrs_mod.Rating.Again,
"Hard": fsrs_mod.Rating.Hard,
"Good": fsrs_mod.Rating.Good,
"Easy": fsrs_mod.Rating.Easy,
}
rating = rating_map.get(rating_str, fsrs_mod.Rating.Good)
entry = batch[index]
db.update_word_progress(entry["id"], rating)
index += 1
if index >= len(batch):
summary = f"## Session Complete!\n\n**Score:** {score}/{len(batch)}\n\n"
summary += f"**Accuracy:** {score/len(batch)*100:.0f}%"
return summary, batch, index, score, gr.update(visible=False)
next_entry = batch[index]
if direction == "English → Persian":
prompt = f'<div style="font-size:2em; text-align:center">{next_entry["english"]}</div>'
else:
prompt = _rtl(next_entry["persian"])
return prompt, batch, index, score, gr.update(visible=True)
# ---------- Idioms ----------
def show_random_idiom(transliteration):
expr = idioms.get_random_expression()
return idioms.format_expression(expr, show_transliteration=transliteration), expr
def explain_idiom(expr_state):
if not expr_state:
return "Pick an idiom first."
return idioms.explain_expression(expr_state)
def browse_idioms(transliteration):
exprs = idioms.get_all_expressions()
lines = []
for e in exprs:
line = f'**<span dir="rtl">{e["persian"]}</span>** — {e["english"]}'
if transliteration != "off":
line += f' *({e["finglish"]})*'
lines.append(line)
return "\n\n".join(lines)
# ---------- Tutor ----------
def start_tutor_lesson(theme):
response, messages, system = tutor.start_lesson(theme)
chat_history = [{"role": "assistant", "content": response}]
return chat_history, messages, system, time.time()
def send_tutor_message(user_msg, chat_history, messages, system, audio_input):
# Use STT if audio provided and no text
if audio_input is not None and (not user_msg or not user_msg.strip()):
try:
from stt import transcribe_persian
user_msg = transcribe_persian(audio_input)
except Exception:
user_msg = ""
if not user_msg or not user_msg.strip():
return chat_history, messages, "", None
response, messages = tutor.process_response(user_msg, messages, system=system)
chat_history.append({"role": "user", "content": user_msg})
chat_history.append({"role": "assistant", "content": response})
return chat_history, messages, "", None
def save_tutor(theme, messages, start_time):
if messages and len(messages) > 1:
tutor.save_session(theme, messages, start_time)
return "Session saved!"
return "Nothing to save."
# ---------- Essay ----------
def submit_essay(text, theme):
if not text or not text.strip():
return "Please write an essay first."
return essay.mark_essay(text, theme)
def load_essay_history():
return essay.get_essay_history()
# ---------- Settings / Export ----------
def do_anki_export(cats_selected):
v = vocab.load_vocab()
cats = cats_selected if cats_selected else None
path = os.path.join(tempfile.gettempdir(), "gcse-persian.apkg")
export_deck(v, categories=cats, output_path=path)
return path
def reset_progress():
conn = db.get_connection()
conn.execute("DELETE FROM word_progress")
conn.execute("DELETE FROM quiz_sessions")
conn.execute("DELETE FROM essays")
conn.execute("DELETE FROM tutor_sessions")
conn.commit()
return "Progress reset."
# ================================================================
# GRADIO UI
# ================================================================
with gr.Blocks(title="Persian Language Tutor") as app:
gr.Markdown("# 🇮🇷 Persian Language Tutor\n*GCSE Persian vocabulary with spaced repetition*")
# Shared state
transliteration_state = gr.State(value="Finglish")
with gr.Tabs():
# ==================== DASHBOARD ====================
with gr.Tab("📊 Dashboard"):
overview_md = gr.Markdown("Loading...")
with gr.Row():
cat_table = gr.Dataframe(
headers=["Category", "Total", "Seen", "Mastered", "Progress"],
label="Category Breakdown",
)
quiz_table = gr.Dataframe(
headers=["Date", "Category", "Score", "Duration"],
label="Recent Quizzes",
)
refresh_btn = gr.Button("Refresh", variant="secondary")
refresh_btn.click(
fn=refresh_dashboard,
outputs=[overview_md, cat_table, quiz_table],
)
# ==================== VOCABULARY ====================
with gr.Tab("📚 Vocabulary"):
with gr.Row():
search_box = gr.Textbox(
label="Search (English or Persian)",
placeholder="Type to search...",
)
vocab_cat = gr.Dropdown(
choices=categories, value="All", label="Category"
)
search_btn = gr.Button("Search", variant="primary")
random_btn = gr.Button("Random Word")
search_results = gr.Markdown("Search for a word above.")
search_btn.click(
fn=do_search,
inputs=[search_box, vocab_cat],
outputs=[search_results],
)
search_box.submit(
fn=do_search,
inputs=[search_box, vocab_cat],
outputs=[search_results],
)
random_btn.click(
fn=do_random_word,
inputs=[vocab_cat, transliteration_state],
outputs=[search_results],
)
# ==================== FLASHCARDS ====================
with gr.Tab("🃏 Flashcards"):
with gr.Row():
fc_category = gr.Dropdown(
choices=categories, value="All", label="Category"
)
fc_direction = gr.Radio(
["English → Persian", "Persian → English"],
value="English → Persian",
label="Direction",
)
start_fc_btn = gr.Button("Start Session", variant="primary")
card_display = gr.Markdown("Press 'Start Session' to begin.")
# Hidden states
fc_batch = gr.State([])
fc_index = gr.State(0)
fc_score = gr.State(0)
with gr.Group(visible=False) as answer_area:
answer_box = gr.Textbox(
label="Your answer",
placeholder="Type your answer...",
rtl=True,
)
submit_ans_btn = gr.Button("Submit Answer", variant="primary")
answer_feedback = gr.Markdown("")
with gr.Row():
btn_again = gr.Button("Again", variant="stop")
btn_hard = gr.Button("Hard", variant="secondary")
btn_good = gr.Button("Good", variant="primary")
btn_easy = gr.Button("Easy", variant="secondary")
start_fc_btn.click(
fn=start_flashcards,
inputs=[fc_category, fc_direction],
outputs=[card_display, fc_batch, fc_index, fc_score, answer_box, answer_area],
)
submit_ans_btn.click(
fn=submit_answer,
inputs=[answer_box, fc_batch, fc_index, fc_score, fc_direction, transliteration_state],
outputs=[card_display, fc_batch, fc_index, fc_score, answer_box, answer_area, answer_feedback],
)
answer_box.submit(
fn=submit_answer,
inputs=[answer_box, fc_batch, fc_index, fc_score, fc_direction, transliteration_state],
outputs=[card_display, fc_batch, fc_index, fc_score, answer_box, answer_area, answer_feedback],
)
for btn, label in [(btn_again, "Again"), (btn_hard, "Hard"), (btn_good, "Good"), (btn_easy, "Easy")]:
btn.click(
fn=rate_and_next,
inputs=[gr.State(label), fc_batch, fc_index, fc_score, fc_direction],
outputs=[card_display, fc_batch, fc_index, fc_score, answer_area],
)
# ==================== IDIOMS ====================
with gr.Tab("💬 Idioms & Expressions"):
idiom_display = gr.Markdown("Click 'Random Idiom' or browse below.")
idiom_state = gr.State(None)
with gr.Row():
random_idiom_btn = gr.Button("Random Idiom", variant="primary")
explain_idiom_btn = gr.Button("Explain Usage")
browse_idiom_btn = gr.Button("Browse All")
idiom_explanation = gr.Markdown("")
random_idiom_btn.click(
fn=show_random_idiom,
inputs=[transliteration_state],
outputs=[idiom_display, idiom_state],
)
explain_idiom_btn.click(
fn=explain_idiom,
inputs=[idiom_state],
outputs=[idiom_explanation],
)
browse_idiom_btn.click(
fn=browse_idioms,
inputs=[transliteration_state],
outputs=[idiom_display],
)
# ==================== TUTOR ====================
with gr.Tab("🎓 Tutor"):
tutor_theme = gr.Dropdown(
choices=list(THEME_PROMPTS.keys()),
value="Identity and culture",
label="Theme",
)
start_lesson_btn = gr.Button("New Lesson", variant="primary")
chatbot = gr.Chatbot(label="Conversation")
# Tutor states
tutor_messages = gr.State([])
tutor_system = gr.State("")
tutor_start_time = gr.State(0)
with gr.Row():
tutor_input = gr.Textbox(
label="Your message",
placeholder="Type in English or Persian...",
scale=3,
)
tutor_mic = gr.Audio(
sources=["microphone"],
type="numpy",
label="Speak",
scale=1,
)
send_btn = gr.Button("Send", variant="primary")
save_btn = gr.Button("Save Session", variant="secondary")
save_status = gr.Markdown("")
start_lesson_btn.click(
fn=start_tutor_lesson,
inputs=[tutor_theme],
outputs=[chatbot, tutor_messages, tutor_system, tutor_start_time],
)
send_btn.click(
fn=send_tutor_message,
inputs=[tutor_input, chatbot, tutor_messages, tutor_system, tutor_mic],
outputs=[chatbot, tutor_messages, tutor_input, tutor_mic],
)
tutor_input.submit(
fn=send_tutor_message,
inputs=[tutor_input, chatbot, tutor_messages, tutor_system, tutor_mic],
outputs=[chatbot, tutor_messages, tutor_input, tutor_mic],
)
save_btn.click(
fn=save_tutor,
inputs=[tutor_theme, tutor_messages, tutor_start_time],
outputs=[save_status],
)
# ==================== ESSAY ====================
with gr.Tab("✍️ Essay"):
essay_theme = gr.Dropdown(
choices=GCSE_THEMES,
value="Identity and culture",
label="Theme",
)
essay_input = gr.Textbox(
label="Write your essay in Persian",
lines=10,
rtl=True,
placeholder="اینجا بنویسید...",
)
submit_essay_btn = gr.Button("Submit for Marking", variant="primary")
essay_feedback = gr.Markdown("Write an essay and submit for AI marking.")
gr.Markdown("### Essay History")
essay_history_table = gr.Dataframe(
headers=["Date", "Theme", "Grade", "Preview"],
label="Past Essays",
)
refresh_essays_btn = gr.Button("Refresh History")
submit_essay_btn.click(
fn=submit_essay,
inputs=[essay_input, essay_theme],
outputs=[essay_feedback],
)
refresh_essays_btn.click(
fn=load_essay_history,
outputs=[essay_history_table],
)
# ==================== SETTINGS ====================
with gr.Tab("⚙️ Settings"):
gr.Markdown("## Settings")
transliteration_radio = gr.Radio(
["off", "Finglish", "Academic"],
value="Finglish",
label="Transliteration",
)
ollama_model = gr.Textbox(
label="Ollama Model",
value="qwen2.5:7b",
info="Model used for fast AI responses",
)
whisper_size = gr.Dropdown(
choices=["tiny", "base", "small", "medium", "large-v3"],
value="medium",
label="Whisper Model Size",
)
gr.Markdown("### Anki Export")
export_cats = gr.Dropdown(
choices=vocab.get_categories(),
multiselect=True,
label="Categories to export (empty = all)",
)
export_btn = gr.Button("Export to Anki (.apkg)", variant="primary")
export_file = gr.File(label="Download")
export_btn.click(fn=do_anki_export, inputs=[export_cats], outputs=[export_file])
gr.Markdown("### Reset")
reset_btn = gr.Button("Reset All Progress", variant="stop")
reset_status = gr.Markdown("")
reset_btn.click(fn=reset_progress, outputs=[reset_status])
# Wire transliteration state
transliteration_radio.change(
fn=lambda x: x,
inputs=[transliteration_radio],
outputs=[transliteration_state],
)
# Load dashboard on app start
app.load(fn=refresh_dashboard, outputs=[overview_md, cat_table, quiz_table])
if __name__ == "__main__":
app.launch(theme=gr.themes.Soft())

File diff suppressed because it is too large Load Diff

234
python/persian-tutor/db.py Normal file
View File

@@ -0,0 +1,234 @@
"""SQLite database layer with FSRS spaced repetition integration."""
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
import fsrs
DB_PATH = Path(__file__).parent / "data" / "progress.db"
_conn = None
_scheduler = fsrs.Scheduler()
def get_connection():
"""Return the shared SQLite connection (singleton)."""
global _conn
if _conn is None:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
_conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
_conn.row_factory = sqlite3.Row
_conn.execute("PRAGMA journal_mode=WAL")
return _conn
def init_db():
"""Create all tables if they don't exist. Called once at startup."""
conn = get_connection()
conn.executescript("""
CREATE TABLE IF NOT EXISTS word_progress (
word_id TEXT PRIMARY KEY,
fsrs_state TEXT,
due TIMESTAMP,
stability REAL,
difficulty REAL,
reps INTEGER DEFAULT 0,
lapses INTEGER DEFAULT 0,
last_review TIMESTAMP
);
CREATE TABLE IF NOT EXISTS quiz_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
category TEXT,
total_questions INTEGER,
correct INTEGER,
duration_seconds INTEGER
);
CREATE TABLE IF NOT EXISTS essays (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
essay_text TEXT,
grade TEXT,
feedback TEXT,
theme TEXT
);
CREATE TABLE IF NOT EXISTS tutor_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
theme TEXT,
messages TEXT,
duration_seconds INTEGER
);
""")
conn.commit()
def get_word_progress(word_id):
"""Return learning state for one word, or None if never reviewed."""
conn = get_connection()
row = conn.execute(
"SELECT * FROM word_progress WHERE word_id = ?", (word_id,)
).fetchone()
return dict(row) if row else None
def update_word_progress(word_id, rating):
"""Run FSRS algorithm, update due date/stability/difficulty.
Args:
word_id: Vocabulary entry ID.
rating: fsrs.Rating value (Again=1, Hard=2, Good=3, Easy=4).
"""
conn = get_connection()
existing = get_word_progress(word_id)
if existing and existing["fsrs_state"]:
card = fsrs.Card.from_dict(json.loads(existing["fsrs_state"]))
else:
card = fsrs.Card()
card, review_log = _scheduler.review_card(card, rating)
now = datetime.now(timezone.utc).isoformat()
card_json = json.dumps(card.to_dict(), default=str)
conn.execute(
"""INSERT OR REPLACE INTO word_progress
(word_id, fsrs_state, due, stability, difficulty, reps, lapses, last_review)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
word_id,
card_json,
card.due.isoformat(),
card.stability,
card.difficulty,
(existing["reps"] + 1) if existing else 1,
existing["lapses"] if existing else 0,
now,
),
)
conn.commit()
return card
def get_due_words(limit=20):
"""Return word IDs where due <= now, ordered by due date."""
conn = get_connection()
now = datetime.now(timezone.utc).isoformat()
rows = conn.execute(
"SELECT word_id FROM word_progress WHERE due <= ? ORDER BY due LIMIT ?",
(now, limit),
).fetchall()
return [row["word_id"] for row in rows]
def get_word_counts(total_vocab_size=0):
"""Return dict with total/seen/mastered/due counts for dashboard."""
conn = get_connection()
now = datetime.now(timezone.utc).isoformat()
seen = conn.execute("SELECT COUNT(*) FROM word_progress").fetchone()[0]
mastered = conn.execute(
"SELECT COUNT(*) FROM word_progress WHERE stability > 10"
).fetchone()[0]
due = conn.execute(
"SELECT COUNT(*) FROM word_progress WHERE due <= ?", (now,)
).fetchone()[0]
return {
"total": total_vocab_size,
"seen": seen,
"mastered": mastered,
"due": due,
}
def record_quiz_session(category, total_questions, correct, duration_seconds):
"""Log a completed flashcard session."""
conn = get_connection()
conn.execute(
"INSERT INTO quiz_sessions (category, total_questions, correct, duration_seconds) VALUES (?, ?, ?, ?)",
(category, total_questions, correct, duration_seconds),
)
conn.commit()
def save_essay(essay_text, grade, feedback, theme):
"""Save an essay + AI feedback."""
conn = get_connection()
conn.execute(
"INSERT INTO essays (essay_text, grade, feedback, theme) VALUES (?, ?, ?, ?)",
(essay_text, grade, feedback, theme),
)
conn.commit()
def save_tutor_session(theme, messages, duration_seconds):
"""Save a tutor conversation."""
conn = get_connection()
conn.execute(
"INSERT INTO tutor_sessions (theme, messages, duration_seconds) VALUES (?, ?, ?)",
(theme, json.dumps(messages, ensure_ascii=False), duration_seconds),
)
conn.commit()
def get_stats():
"""Aggregate data for the dashboard."""
conn = get_connection()
recent_quizzes = conn.execute(
"SELECT * FROM quiz_sessions ORDER BY timestamp DESC LIMIT 10"
).fetchall()
total_reviews = conn.execute(
"SELECT COALESCE(SUM(reps), 0) FROM word_progress"
).fetchone()[0]
total_quizzes = conn.execute(
"SELECT COUNT(*) FROM quiz_sessions"
).fetchone()[0]
# Streak: count consecutive days with activity
days = conn.execute(
"SELECT DISTINCT DATE(last_review) as d FROM word_progress WHERE last_review IS NOT NULL ORDER BY d DESC"
).fetchall()
streak = 0
today = datetime.now(timezone.utc).date()
for i, row in enumerate(days):
day = datetime.fromisoformat(row["d"]).date() if isinstance(row["d"], str) else row["d"]
expected = today - __import__("datetime").timedelta(days=i)
if day == expected:
streak += 1
else:
break
return {
"recent_quizzes": [dict(r) for r in recent_quizzes],
"total_reviews": total_reviews,
"total_quizzes": total_quizzes,
"streak": streak,
}
def get_recent_essays(limit=10):
"""Return recent essays for the essay history view."""
conn = get_connection()
rows = conn.execute(
"SELECT * FROM essays ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def close():
"""Close the database connection."""
global _conn
if _conn:
_conn.close()
_conn = None

View File

View File

@@ -0,0 +1,84 @@
"""Dashboard: progress stats, charts, and overview."""
import db
from modules.vocab import load_vocab, get_categories
def get_overview():
"""Return overview stats: total words, seen, mastered, due today."""
vocab = load_vocab()
counts = db.get_word_counts(total_vocab_size=len(vocab))
stats = db.get_stats()
counts["streak"] = stats["streak"]
counts["total_reviews"] = stats["total_reviews"]
counts["total_quizzes"] = stats["total_quizzes"]
return counts
def get_category_breakdown():
"""Return progress per category as list of dicts."""
vocab = load_vocab()
categories = get_categories()
breakdown = []
for cat in categories:
cat_words = [e for e in vocab if e["category"] == cat]
cat_ids = {e["id"] for e in cat_words}
total = len(cat_words)
seen = 0
mastered = 0
for wid in cat_ids:
progress = db.get_word_progress(wid)
if progress:
seen += 1
if progress["stability"] and progress["stability"] > 10:
mastered += 1
breakdown.append({
"Category": cat,
"Total": total,
"Seen": seen,
"Mastered": mastered,
"Progress": f"{seen}/{total}" if total > 0 else "0/0",
})
return breakdown
def get_recent_quizzes(limit=10):
"""Return recent quiz results as list of dicts for display."""
stats = db.get_stats()
quizzes = stats["recent_quizzes"][:limit]
result = []
for q in quizzes:
result.append({
"Date": q["timestamp"],
"Category": q["category"] or "All",
"Score": f"{q['correct']}/{q['total_questions']}",
"Duration": f"{q['duration_seconds'] or 0}s",
})
return result
def format_overview_markdown():
"""Format overview stats as a markdown string for display."""
o = get_overview()
pct = (o["seen"] / o["total"] * 100) if o["total"] > 0 else 0
bar_filled = int(pct / 5)
bar_empty = 20 - bar_filled
progress_bar = "" * bar_filled + "" * bar_empty
lines = [
"## Dashboard",
"",
f"**Words studied:** {o['seen']} / {o['total']} ({pct:.0f}%)",
f"`{progress_bar}`",
"",
f"**Due today:** {o['due']}",
f"**Mastered:** {o['mastered']}",
f"**Daily streak:** {o['streak']} day{'s' if o['streak'] != 1 else ''}",
f"**Total reviews:** {o['total_reviews']}",
f"**Quiz sessions:** {o['total_quizzes']}",
]
return "\n".join(lines)

View File

@@ -0,0 +1,78 @@
"""Essay writing and AI marking."""
import db
from ai import ask
MARKING_SYSTEM_PROMPT = """You are an expert Persian (Farsi) language teacher marking a GCSE-level essay.
You write in English but can read and correct Persian text.
Always provide constructive, encouraging feedback suitable for a language learner."""
MARKING_PROMPT_TEMPLATE = """Please mark this Persian essay written by a GCSE student.
Theme: {theme}
Student's essay:
{essay_text}
Please provide your response in this exact format:
**Grade:** [Give a grade from 1-9 matching GCSE grading, or a descriptive level like A2/B1]
**Summary:** [1-2 sentence overview of the essay quality]
**Corrections:**
[List specific errors with corrections. For each error, show the original text and the corrected version in Persian, with an English explanation]
**Improved version:**
[Rewrite the essay in corrected Persian]
**Tips for improvement:**
[3-5 specific, actionable tips for the student]"""
GCSE_THEMES = [
"Identity and culture",
"Local area and environment",
"School and work",
"Travel and tourism",
"International and global dimension",
]
def mark_essay(essay_text, theme="General"):
"""Send essay to AI for marking. Returns structured feedback."""
if not essay_text or not essay_text.strip():
return "Please write an essay first."
prompt = MARKING_PROMPT_TEMPLATE.format(
theme=theme,
essay_text=essay_text.strip(),
)
feedback = ask(prompt, system=MARKING_SYSTEM_PROMPT, quality="smart")
# Extract grade from feedback (best-effort)
grade = ""
for line in feedback.split("\n"):
if line.strip().startswith("**Grade:**"):
grade = line.replace("**Grade:**", "").strip()
break
# Save to database
db.save_essay(essay_text.strip(), grade, feedback, theme)
return feedback
def get_essay_history(limit=10):
"""Return recent essays for the history view."""
essays = db.get_recent_essays(limit)
result = []
for e in essays:
result.append({
"Date": e["timestamp"],
"Theme": e["theme"] or "General",
"Grade": e["grade"] or "-",
"Preview": (e["essay_text"] or "")[:50] + "...",
})
return result

View File

@@ -0,0 +1,200 @@
"""Persian idioms, expressions, and social conventions."""
from ai import ask
# Built-in collection of common Persian expressions and idioms
EXPRESSIONS = [
{
"persian": "سلام علیکم",
"finglish": "salâm aleykom",
"english": "Peace be upon you (formal greeting)",
"context": "Formal greeting, especially with elders",
},
{
"persian": "خسته نباشید",
"finglish": "khaste nabâshid",
"english": "May you not be tired",
"context": "Common greeting to someone who has been working. Used as 'hello' in shops, offices, etc.",
},
{
"persian": "دستت درد نکنه",
"finglish": "dastet dard nakone",
"english": "May your hand not hurt",
"context": "Thank you for your effort (after someone does something for you)",
},
{
"persian": "قابلی نداره",
"finglish": "ghâbeli nadâre",
"english": "It's not worthy (of you)",
"context": "You're welcome / Don't mention it — said when giving a gift or doing a favour",
},
{
"persian": "تعارف نکن",
"finglish": "ta'ârof nakon",
"english": "Don't do ta'arof",
"context": "Stop being politely modest — please accept! Part of Persian ta'arof culture.",
},
{
"persian": "نوش جان",
"finglish": "nush-e jân",
"english": "May it nourish your soul",
"context": "Said to someone eating — like 'bon appétit' or 'enjoy your meal'",
},
{
"persian": "چشمت روز بد نبینه",
"finglish": "cheshmet ruz-e bad nabine",
"english": "May your eyes never see a bad day",
"context": "A warm wish for someone's wellbeing",
},
{
"persian": "قدمت روی چشم",
"finglish": "ghadamet ru-ye cheshm",
"english": "Your step is on my eye",
"context": "Warm welcome — 'you're very welcome here'. Extremely hospitable expression.",
},
{
"persian": "ان‌شاءالله",
"finglish": "inshâ'allâh",
"english": "God willing",
"context": "Used when talking about future plans. Very common in daily speech.",
},
{
"persian": "ماشاءالله",
"finglish": "mâshâ'allâh",
"english": "What God has willed",
"context": "Expression of admiration or praise, also used to ward off the evil eye.",
},
{
"persian": "الهی شکر",
"finglish": "elâhi shokr",
"english": "Thank God",
"context": "Expression of gratitude, similar to 'thankfully'",
},
{
"persian": "به سلامتی",
"finglish": "be salâmati",
"english": "To your health / Cheers",
"context": "A toast or general well-wishing expression",
},
{
"persian": "عید مبارک",
"finglish": "eyd mobârak",
"english": "Happy holiday/celebration",
"context": "Used for any celebration, especially Nowruz",
},
{
"persian": "تسلیت می‌گم",
"finglish": "tasliyat migam",
"english": "I offer my condolences",
"context": "Expressing sympathy when someone has lost a loved one",
},
{
"persian": "خدا بیامرزه",
"finglish": "khodâ biâmorzesh",
"english": "May God forgive them (rest in peace)",
"context": "Said about someone who has passed away",
},
{
"persian": "زبونت رو گاز بگیر",
"finglish": "zaboonet ro gâz begir",
"english": "Bite your tongue",
"context": "Don't say such things! (similar to English 'touch wood')",
},
{
"persian": "دمت گرم",
"finglish": "damet garm",
"english": "May your breath be warm",
"context": "Well done! / Good for you! (informal, friendly praise)",
},
{
"persian": "چشم",
"finglish": "cheshm",
"english": "On my eye (I will do it)",
"context": "Respectful way of saying 'yes, I'll do it' — shows obedience/respect",
},
{
"persian": "بفرمایید",
"finglish": "befarmâyid",
"english": "Please (go ahead / help yourself / come in)",
"context": "Very versatile polite expression: offering food, inviting someone in, or giving way",
},
{
"persian": "ببخشید",
"finglish": "bebakhshid",
"english": "Excuse me / I'm sorry",
"context": "Used for both apologies and getting someone's attention",
},
{
"persian": "مخلصیم",
"finglish": "mokhlesim",
"english": "I'm your humble servant",
"context": "Polite/humble way of saying goodbye or responding to a compliment (ta'arof)",
},
{
"persian": "سرت سلامت باشه",
"finglish": "saret salâmat bâshe",
"english": "May your head be safe",
"context": "Expression of condolence — 'I'm sorry for your loss'",
},
{
"persian": "روی ما رو زمین ننداز",
"finglish": "ru-ye mâ ro zamin nandâz",
"english": "Don't throw our face on the ground",
"context": "Please don't refuse/embarrass us — said when insisting on a request",
},
{
"persian": "قربونت برم",
"finglish": "ghorboonet beram",
"english": "I'd sacrifice myself for you",
"context": "Term of endearment — very common among family and close friends",
},
{
"persian": "جون دل",
"finglish": "jun-e del",
"english": "Life of my heart",
"context": "Affectionate term used with loved ones",
},
]
def get_all_expressions():
"""Return all built-in expressions."""
return EXPRESSIONS
def get_random_expression():
"""Pick a random expression."""
import random
return random.choice(EXPRESSIONS)
def explain_expression(expression):
"""Use AI to generate a detailed explanation with usage examples."""
prompt = f"""Explain this Persian expression for an English-speaking student:
Persian: {expression['persian']}
Transliteration: {expression['finglish']}
Literal meaning: {expression['english']}
Context: {expression['context']}
Please provide:
1. A fuller explanation of when and how this is used
2. The cultural context (ta'arof, hospitality, etc.)
3. Two example dialogues showing it in use (in Persian with English translation)
4. Any variations or related expressions
Keep it concise and student-friendly."""
return ask(prompt, quality="fast")
def format_expression(expr, show_transliteration="off"):
"""Format an expression for display."""
parts = [
f'<div dir="rtl" style="font-size:1.8em; text-align:center">{expr["persian"]}</div>',
f'<div style="text-align:center; font-size:1.2em">{expr["english"]}</div>',
]
if show_transliteration != "off":
parts.append(f'<div style="text-align:center; color:#666; font-style:italic">{expr["finglish"]}</div>')
parts.append(f'<div style="text-align:center; color:#888; margin-top:0.5em">{expr["context"]}</div>')
return "\n".join(parts)

View File

@@ -0,0 +1,65 @@
"""Conversational Persian lessons by GCSE theme."""
import time
import db
from ai import chat_ollama
TUTOR_SYSTEM_PROMPT = """You are a friendly Persian (Farsi) language tutor teaching English-speaking GCSE students.
Rules:
- Use a mix of English and Persian. Start mostly in English, gradually introducing more Persian.
- When you write Persian, also provide the Finglish transliteration in parentheses.
- Keep responses concise (2-4 sentences per turn).
- Ask the student to practice: translate phrases, answer questions in Persian, or fill in blanks.
- Correct mistakes gently and explain why.
- Stay on the current theme/topic.
- Use Iranian Persian (Farsi), not Dari or Tajik.
- Adapt to the student's level based on their responses."""
THEME_PROMPTS = {
"Identity and culture": "Let's practice talking about family, personality, daily routines, and Persian celebrations like Nowruz!",
"Local area and environment": "Let's practice talking about your home, neighbourhood, shopping, and the environment!",
"School and work": "Let's practice talking about school subjects, school life, jobs, and future plans!",
"Travel and tourism": "Let's practice talking about transport, directions, holidays, hotels, and restaurants!",
"International and global dimension": "Let's practice talking about health, global issues, technology, and social media!",
"Free conversation": "Let's have a free conversation in Persian! I'll help you along the way.",
}
def start_lesson(theme):
"""Generate the opening message for a new lesson.
Returns:
(assistant_message, messages_list)
"""
intro = THEME_PROMPTS.get(theme, THEME_PROMPTS["Free conversation"])
system = TUTOR_SYSTEM_PROMPT + f"\n\nCurrent topic: {theme}. {intro}"
messages = [{"role": "user", "content": f"I'd like to practice Persian. Today's theme is: {theme}"}]
response = chat_ollama(messages, system=system)
messages.append({"role": "assistant", "content": response})
return response, messages, system
def process_response(user_input, messages, system=None):
"""Add user input to conversation, get AI response.
Returns:
(assistant_response, updated_messages)
"""
if not user_input or not user_input.strip():
return "", messages
messages.append({"role": "user", "content": user_input.strip()})
response = chat_ollama(messages, system=system)
messages.append({"role": "assistant", "content": response})
return response, messages
def save_session(theme, messages, start_time):
"""Save the current tutor session to the database."""
duration = int(time.time() - start_time)
db.save_tutor_session(theme, messages, duration)

View File

@@ -0,0 +1,152 @@
"""Vocabulary search, flashcard logic, and FSRS-driven review."""
import json
import random
from pathlib import Path
import fsrs
import db
VOCAB_PATH = Path(__file__).parent.parent / "data" / "vocabulary.json"
_vocab_data = None
def load_vocab():
"""Load vocabulary data from JSON (cached)."""
global _vocab_data
if _vocab_data is None:
with open(VOCAB_PATH, encoding="utf-8") as f:
_vocab_data = json.load(f)
return _vocab_data
def get_categories():
"""Return sorted list of unique categories."""
vocab = load_vocab()
return sorted({entry["category"] for entry in vocab})
def get_sections():
"""Return sorted list of unique sections."""
vocab = load_vocab()
return sorted({entry["section"] for entry in vocab})
def search(query, vocab_data=None):
"""Search vocabulary by English or Persian text. Returns matching entries."""
if not query or not query.strip():
return []
vocab = vocab_data or load_vocab()
query_lower = query.strip().lower()
results = []
for entry in vocab:
if (
query_lower in entry["english"].lower()
or query_lower in entry["persian"]
or (entry.get("finglish") and query_lower in entry["finglish"].lower())
):
results.append(entry)
return results
def get_random_word(vocab_data=None, category=None):
"""Pick a random vocabulary entry, optionally filtered by category."""
vocab = vocab_data or load_vocab()
if category and category != "All":
filtered = [e for e in vocab if e["category"] == category]
else:
filtered = vocab
if not filtered:
return None
return random.choice(filtered)
def get_flashcard_batch(count=10, category=None):
"""Get a batch of words for flashcard study.
Prioritizes due words (FSRS), then fills with new/random words.
"""
vocab = load_vocab()
if category and category != "All":
pool = [e for e in vocab if e["category"] == category]
else:
pool = vocab
# Get due words first
due_ids = db.get_due_words(limit=count)
due_entries = [e for e in pool if e["id"] in due_ids]
# Fill remaining with unseen or random words
remaining = count - len(due_entries)
if remaining > 0:
seen_ids = {e["id"] for e in due_entries}
# Prefer unseen words
unseen = [e for e in pool if e["id"] not in seen_ids and not db.get_word_progress(e["id"])]
if len(unseen) >= remaining:
fill = random.sample(unseen, remaining)
else:
# Use all unseen + random from rest
fill = unseen
still_needed = remaining - len(fill)
rest = [e for e in pool if e["id"] not in seen_ids and e not in fill]
if rest:
fill.extend(random.sample(rest, min(still_needed, len(rest))))
due_entries.extend(fill)
random.shuffle(due_entries)
return due_entries
def check_answer(word_id, user_answer, direction="en_to_fa"):
"""Check if user's answer matches the target word.
Args:
word_id: Vocabulary entry ID.
user_answer: What the user typed.
direction: "en_to_fa" (user writes Persian) or "fa_to_en" (user writes English).
Returns:
(is_correct, correct_answer, entry)
"""
vocab = load_vocab()
entry = next((e for e in vocab if e["id"] == word_id), None)
if not entry:
return False, "", None
user_answer = user_answer.strip()
if direction == "en_to_fa":
correct = entry["persian"].strip()
is_correct = user_answer == correct
else:
correct = entry["english"].strip().lower()
is_correct = user_answer.lower() == correct
return is_correct, correct if not is_correct else user_answer, entry
def format_word_card(entry, show_transliteration="off"):
"""Format a vocabulary entry for display as RTL-safe markdown."""
parts = []
parts.append(f'<div dir="rtl" style="font-size:2em; text-align:center">{entry["persian"]}</div>')
parts.append(f'<div style="font-size:1.3em; text-align:center">{entry["english"]}</div>')
if show_transliteration != "off" and entry.get("finglish"):
parts.append(f'<div style="text-align:center; color:#666; font-style:italic">{entry["finglish"]}</div>')
parts.append(f'<div style="text-align:center; color:#999; font-size:0.9em">{entry.get("category", "")}</div>')
return "\n".join(parts)
def get_word_status(word_id):
"""Return status string for a word: new, learning, or mastered."""
progress = db.get_word_progress(word_id)
if not progress:
return "new"
if progress["stability"] and progress["stability"] > 10:
return "mastered"
return "learning"

View File

@@ -0,0 +1,3 @@
gradio>=4.0
genanki
fsrs

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""One-time script to generate/update vocabulary.json with AI-assisted transliterations.
Usage:
python scripts/generate_vocab.py
This reads an existing vocabulary.json, finds entries missing finglish
transliterations, and uses Ollama to generate them.
"""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from ai import ask_ollama
VOCAB_PATH = Path(__file__).parent.parent / "data" / "vocabulary.json"
def generate_transliterations(vocab):
"""Fill in missing finglish transliterations using AI."""
missing = [e for e in vocab if not e.get("finglish")]
if not missing:
print("All entries already have finglish transliterations.")
return vocab
print(f"Generating transliterations for {len(missing)} entries...")
# Process in batches of 20
batch_size = 20
for i in range(0, len(missing), batch_size):
batch = missing[i : i + batch_size]
pairs = "\n".join(f"{e['persian']} = {e['english']}" for e in batch)
prompt = f"""For each Persian word below, provide the Finglish (romanized) transliteration.
Use these conventions: â for آ, kh for خ, sh for ش, zh for ژ, gh for ق/غ, ch for چ.
Reply with ONLY the transliterations, one per line, in the same order.
{pairs}"""
try:
response = ask_ollama(prompt, model="qwen2.5:7b")
lines = [l.strip() for l in response.strip().split("\n") if l.strip()]
for j, entry in enumerate(batch):
if j < len(lines):
# Clean up the response line
line = lines[j]
# Remove any numbering or equals signs
for sep in ["=", ":", "-", "."]:
if sep in line:
line = line.split(sep)[-1].strip()
entry["finglish"] = line
print(f" Processed {min(i + batch_size, len(missing))}/{len(missing)}")
except Exception as e:
print(f" Error processing batch: {e}")
return vocab
def main():
if not VOCAB_PATH.exists():
print(f"No vocabulary file found at {VOCAB_PATH}")
return
with open(VOCAB_PATH, encoding="utf-8") as f:
vocab = json.load(f)
print(f"Loaded {len(vocab)} entries")
vocab = generate_transliterations(vocab)
with open(VOCAB_PATH, "w", encoding="utf-8") as f:
json.dump(vocab, f, ensure_ascii=False, indent=2)
print(f"Saved {len(vocab)} entries to {VOCAB_PATH}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,65 @@
"""Persian speech-to-text wrapper using sttlib."""
import sys
import numpy as np
sys.path.insert(0, "/home/ys/family-repo/Code/python/tool-speechtotext")
from sttlib import load_whisper_model, transcribe, is_hallucination
_model = None
# Common Whisper hallucinations in Persian/silence
PERSIAN_HALLUCINATIONS = [
"ممنون", # "thank you" hallucination
"خداحافظ", # "goodbye" hallucination
"تماشا کنید", # "watch" hallucination
"لایک کنید", # "like" hallucination
]
def get_model(size="medium"):
"""Load Whisper model (cached singleton)."""
global _model
if _model is None:
_model = load_whisper_model(size)
return _model
def transcribe_persian(audio_tuple):
"""Transcribe Persian audio from Gradio audio component.
Args:
audio_tuple: (sample_rate, numpy_array) from gr.Audio component.
Returns:
Transcribed text string, or empty string on failure/hallucination.
"""
if audio_tuple is None:
return ""
sr, audio = audio_tuple
model = get_model()
# Convert to float32 normalized [-1, 1]
if audio.dtype == np.int16:
audio_float = audio.astype(np.float32) / 32768.0
elif audio.dtype == np.float32:
audio_float = audio
else:
audio_float = audio.astype(np.float32) / np.iinfo(audio.dtype).max
# Mono conversion if stereo
if audio_float.ndim > 1:
audio_float = audio_float.mean(axis=1)
# Use sttlib transcribe
text = transcribe(model, audio_float)
# Filter hallucinations (English + Persian)
if is_hallucination(text):
return ""
if text.strip() in PERSIAN_HALLUCINATIONS:
return ""
return text

View File

View File

@@ -0,0 +1,89 @@
"""Tests for ai.py — dual AI backend."""
import sys
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
import ai
def test_ask_ollama_calls_ollama_chat():
"""ask_ollama should call ollama.chat with correct messages."""
mock_response = MagicMock()
mock_response.message.content = "test response"
with patch("ai.ollama.chat", return_value=mock_response) as mock_chat:
result = ai.ask_ollama("Hello", system="Be helpful")
assert result == "test response"
call_args = mock_chat.call_args
messages = call_args.kwargs.get("messages") or call_args[1].get("messages")
assert len(messages) == 2
assert messages[0]["role"] == "system"
assert messages[1]["role"] == "user"
assert messages[1]["content"] == "Hello"
def test_ask_ollama_no_system():
"""ask_ollama without system prompt should only send user message."""
mock_response = MagicMock()
mock_response.message.content = "response"
with patch("ai.ollama.chat", return_value=mock_response) as mock_chat:
ai.ask_ollama("Hi")
call_args = mock_chat.call_args
messages = call_args.kwargs.get("messages") or call_args[1].get("messages")
assert len(messages) == 1
assert messages[0]["role"] == "user"
def test_ask_claude_calls_subprocess():
"""ask_claude should call claude CLI via subprocess."""
with patch("ai.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout="Claude says hi\n")
result = ai.ask_claude("Hello")
assert result == "Claude says hi"
mock_run.assert_called_once()
args = mock_run.call_args[0][0]
assert args[0] == "claude"
assert "-p" in args
def test_ask_fast_uses_ollama():
"""ask with quality='fast' should use Ollama."""
with patch("ai.ask_ollama", return_value="ollama response") as mock:
result = ai.ask("test", quality="fast")
assert result == "ollama response"
mock.assert_called_once()
def test_ask_smart_uses_claude():
"""ask with quality='smart' should use Claude."""
with patch("ai.ask_claude", return_value="claude response") as mock:
result = ai.ask("test", quality="smart")
assert result == "claude response"
mock.assert_called_once()
def test_chat_ollama():
"""chat_ollama should pass multi-turn messages."""
mock_response = MagicMock()
mock_response.message.content = "continuation"
with patch("ai.ollama.chat", return_value=mock_response) as mock_chat:
messages = [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "Hello!"},
{"role": "user", "content": "How are you?"},
]
result = ai.chat_ollama(messages, system="Be helpful")
assert result == "continuation"
call_args = mock_chat.call_args
all_msgs = call_args.kwargs.get("messages") or call_args[1].get("messages")
# system + 3 conversation messages
assert len(all_msgs) == 4

View File

@@ -0,0 +1,86 @@
"""Tests for anki_export.py — Anki .apkg generation."""
import os
import sys
import tempfile
import zipfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from anki_export import export_deck
SAMPLE_VOCAB = [
{
"id": "verb_go",
"section": "High-frequency language",
"category": "Common verbs",
"english": "to go",
"persian": "رفتن",
"finglish": "raftan",
},
{
"id": "verb_eat",
"section": "High-frequency language",
"category": "Common verbs",
"english": "to eat",
"persian": "خوردن",
"finglish": "khordan",
},
{
"id": "colour_red",
"section": "High-frequency language",
"category": "Colours",
"english": "red",
"persian": "قرمز",
"finglish": "ghermez",
},
]
def test_export_deck_creates_file(tmp_path):
"""export_deck should create a valid .apkg file."""
output = str(tmp_path / "test.apkg")
result = export_deck(SAMPLE_VOCAB, output_path=output)
assert result == output
assert os.path.exists(output)
assert os.path.getsize(output) > 0
def test_export_deck_is_valid_zip(tmp_path):
"""An .apkg file is a zip archive containing an Anki SQLite database."""
output = str(tmp_path / "test.apkg")
export_deck(SAMPLE_VOCAB, output_path=output)
assert zipfile.is_zipfile(output)
def test_export_deck_with_category_filter(tmp_path):
"""export_deck with category filter should only include matching entries."""
output = str(tmp_path / "test.apkg")
export_deck(SAMPLE_VOCAB, categories=["Colours"], output_path=output)
# File should exist and be smaller than unfiltered
assert os.path.exists(output)
size_filtered = os.path.getsize(output)
output2 = str(tmp_path / "test_all.apkg")
export_deck(SAMPLE_VOCAB, output_path=output2)
size_all = os.path.getsize(output2)
# Filtered deck should be smaller (fewer cards)
assert size_filtered <= size_all
def test_export_deck_empty_vocab(tmp_path):
"""export_deck with empty vocabulary should still create a valid file."""
output = str(tmp_path / "test.apkg")
export_deck([], output_path=output)
assert os.path.exists(output)
def test_export_deck_no_category_match(tmp_path):
"""export_deck with non-matching category filter should create empty deck."""
output = str(tmp_path / "test.apkg")
export_deck(SAMPLE_VOCAB, categories=["Nonexistent"], output_path=output)
assert os.path.exists(output)

View File

@@ -0,0 +1,151 @@
"""Tests for db.py — SQLite database layer with FSRS integration."""
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
import fsrs
@pytest.fixture(autouse=True)
def temp_db(tmp_path):
"""Use a temporary database for each test."""
import db as db_mod
db_mod._conn = None
db_mod.DB_PATH = tmp_path / "test.db"
db_mod.init_db()
yield db_mod
db_mod.close()
def test_init_db_creates_tables(temp_db):
"""init_db should create all required tables."""
conn = temp_db.get_connection()
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
table_names = {row["name"] for row in tables}
assert "word_progress" in table_names
assert "quiz_sessions" in table_names
assert "essays" in table_names
assert "tutor_sessions" in table_names
def test_get_word_progress_nonexistent(temp_db):
"""Should return None for a word that hasn't been reviewed."""
assert temp_db.get_word_progress("nonexistent") is None
def test_update_and_get_word_progress(temp_db):
"""update_word_progress should create and update progress."""
card = temp_db.update_word_progress("verb_go", fsrs.Rating.Good)
assert card is not None
assert card.stability is not None
progress = temp_db.get_word_progress("verb_go")
assert progress is not None
assert progress["word_id"] == "verb_go"
assert progress["reps"] == 1
assert progress["fsrs_state"] is not None
def test_update_word_progress_increments_reps(temp_db):
"""Reviewing the same word multiple times should increment reps."""
temp_db.update_word_progress("verb_go", fsrs.Rating.Good)
temp_db.update_word_progress("verb_go", fsrs.Rating.Easy)
progress = temp_db.get_word_progress("verb_go")
assert progress["reps"] == 2
def test_get_due_words(temp_db):
"""get_due_words should return words that are due for review."""
# A newly reviewed word with Rating.Again should be due soon
temp_db.update_word_progress("verb_go", fsrs.Rating.Again)
# An easy word should have a later due date
temp_db.update_word_progress("verb_eat", fsrs.Rating.Easy)
# Due words depend on timing; at minimum both should be in the system
all_progress = temp_db.get_connection().execute(
"SELECT word_id FROM word_progress"
).fetchall()
assert len(all_progress) == 2
def test_get_word_counts(temp_db):
"""get_word_counts should return correct counts."""
counts = temp_db.get_word_counts(total_vocab_size=100)
assert counts["total"] == 100
assert counts["seen"] == 0
assert counts["mastered"] == 0
assert counts["due"] == 0
temp_db.update_word_progress("verb_go", fsrs.Rating.Good)
counts = temp_db.get_word_counts(total_vocab_size=100)
assert counts["seen"] == 1
def test_record_quiz_session(temp_db):
"""record_quiz_session should insert a quiz record."""
temp_db.record_quiz_session("Common verbs", 10, 7, 120)
rows = temp_db.get_connection().execute(
"SELECT * FROM quiz_sessions"
).fetchall()
assert len(rows) == 1
assert rows[0]["correct"] == 7
assert rows[0]["total_questions"] == 10
def test_save_essay(temp_db):
"""save_essay should store the essay and feedback."""
temp_db.save_essay("متن آزمایشی", "B1", "Good effort!", "Identity and culture")
essays = temp_db.get_recent_essays()
assert len(essays) == 1
assert essays[0]["grade"] == "B1"
def test_save_tutor_session(temp_db):
"""save_tutor_session should store the conversation."""
messages = [
{"role": "user", "content": "سلام"},
{"role": "assistant", "content": "سلام! حالت چطوره؟"},
]
temp_db.save_tutor_session("Identity and culture", messages, 300)
rows = temp_db.get_connection().execute(
"SELECT * FROM tutor_sessions"
).fetchall()
assert len(rows) == 1
assert rows[0]["theme"] == "Identity and culture"
def test_get_stats(temp_db):
"""get_stats should return aggregated stats."""
stats = temp_db.get_stats()
assert stats["total_reviews"] == 0
assert stats["total_quizzes"] == 0
assert stats["streak"] == 0
assert isinstance(stats["recent_quizzes"], list)
def test_close_and_reopen(temp_db):
"""Closing and reopening should preserve data."""
temp_db.update_word_progress("verb_go", fsrs.Rating.Good)
db_path = temp_db.DB_PATH
temp_db.close()
# Reopen
temp_db._conn = None
temp_db.DB_PATH = db_path
temp_db.init_db()
progress = temp_db.get_word_progress("verb_go")
assert progress is not None
assert progress["reps"] == 1

View File

@@ -0,0 +1,204 @@
"""Tests for modules/vocab.py — vocabulary search and flashcard logic."""
import json
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
SAMPLE_VOCAB = [
{
"id": "verb_go",
"section": "High-frequency language",
"category": "Common verbs",
"english": "to go",
"persian": "رفتن",
"finglish": "raftan",
},
{
"id": "verb_eat",
"section": "High-frequency language",
"category": "Common verbs",
"english": "to eat",
"persian": "خوردن",
"finglish": "khordan",
},
{
"id": "adj_big",
"section": "High-frequency language",
"category": "Common adjectives",
"english": "big",
"persian": "بزرگ",
"finglish": "bozorg",
},
{
"id": "colour_red",
"section": "High-frequency language",
"category": "Colours",
"english": "red",
"persian": "قرمز",
"finglish": "ghermez",
},
]
@pytest.fixture(autouse=True)
def mock_vocab_and_db(tmp_path):
"""Mock vocabulary loading and use temp DB."""
import db as db_mod
import modules.vocab as vocab_mod
# Temp DB
db_mod._conn = None
db_mod.DB_PATH = tmp_path / "test.db"
db_mod.init_db()
# Mock vocab
vocab_mod._vocab_data = SAMPLE_VOCAB
yield vocab_mod
db_mod.close()
vocab_mod._vocab_data = None
def test_load_vocab(mock_vocab_and_db):
"""load_vocab should return the vocabulary data."""
data = mock_vocab_and_db.load_vocab()
assert len(data) == 4
def test_get_categories(mock_vocab_and_db):
"""get_categories should return unique sorted categories."""
cats = mock_vocab_and_db.get_categories()
assert "Colours" in cats
assert "Common verbs" in cats
assert "Common adjectives" in cats
def test_search_english(mock_vocab_and_db):
"""Search should find entries by English text."""
results = mock_vocab_and_db.search("go")
assert len(results) == 1
assert results[0]["id"] == "verb_go"
def test_search_persian(mock_vocab_and_db):
"""Search should find entries by Persian text."""
results = mock_vocab_and_db.search("رفتن")
assert len(results) == 1
assert results[0]["id"] == "verb_go"
def test_search_finglish(mock_vocab_and_db):
"""Search should find entries by Finglish text."""
results = mock_vocab_and_db.search("raftan")
assert len(results) == 1
assert results[0]["id"] == "verb_go"
def test_search_empty(mock_vocab_and_db):
"""Empty search should return empty list."""
assert mock_vocab_and_db.search("") == []
assert mock_vocab_and_db.search(None) == []
def test_search_no_match(mock_vocab_and_db):
"""Search with no match should return empty list."""
assert mock_vocab_and_db.search("zzzzz") == []
def test_get_random_word(mock_vocab_and_db):
"""get_random_word should return a valid entry."""
word = mock_vocab_and_db.get_random_word()
assert word is not None
assert "id" in word
assert "english" in word
assert "persian" in word
def test_get_random_word_with_category(mock_vocab_and_db):
"""get_random_word with category filter should only return matching entries."""
word = mock_vocab_and_db.get_random_word(category="Colours")
assert word is not None
assert word["category"] == "Colours"
def test_get_random_word_nonexistent_category(mock_vocab_and_db):
"""get_random_word with bad category should return None."""
word = mock_vocab_and_db.get_random_word(category="Nonexistent")
assert word is None
def test_check_answer_correct_en_to_fa(mock_vocab_and_db):
"""Correct Persian answer should be marked correct."""
correct, answer, entry = mock_vocab_and_db.check_answer(
"verb_go", "رفتن", direction="en_to_fa"
)
assert correct is True
def test_check_answer_incorrect_en_to_fa(mock_vocab_and_db):
"""Incorrect Persian answer should be marked incorrect with correct answer."""
correct, answer, entry = mock_vocab_and_db.check_answer(
"verb_go", "خوردن", direction="en_to_fa"
)
assert correct is False
assert answer == "رفتن"
def test_check_answer_fa_to_en(mock_vocab_and_db):
"""Correct English answer (case-insensitive) should be marked correct."""
correct, answer, entry = mock_vocab_and_db.check_answer(
"verb_go", "To Go", direction="fa_to_en"
)
assert correct is True
def test_check_answer_nonexistent_word(mock_vocab_and_db):
"""Checking answer for nonexistent word should return False."""
correct, answer, entry = mock_vocab_and_db.check_answer(
"nonexistent", "test", direction="en_to_fa"
)
assert correct is False
assert entry is None
def test_format_word_card(mock_vocab_and_db):
"""format_word_card should produce RTL HTML with correct content."""
entry = SAMPLE_VOCAB[0]
html = mock_vocab_and_db.format_word_card(entry, show_transliteration="Finglish")
assert "رفتن" in html
assert "to go" in html
assert "raftan" in html
def test_format_word_card_no_transliteration(mock_vocab_and_db):
"""format_word_card with transliteration off should not show finglish."""
entry = SAMPLE_VOCAB[0]
html = mock_vocab_and_db.format_word_card(entry, show_transliteration="off")
assert "raftan" not in html
def test_get_flashcard_batch(mock_vocab_and_db):
"""get_flashcard_batch should return a batch of entries."""
batch = mock_vocab_and_db.get_flashcard_batch(count=2)
assert len(batch) == 2
assert all("id" in e for e in batch)
def test_get_word_status_new(mock_vocab_and_db):
"""Unreviewed word should have status 'new'."""
assert mock_vocab_and_db.get_word_status("verb_go") == "new"
def test_get_word_status_learning(mock_vocab_and_db):
"""Recently reviewed word should have status 'learning'."""
import db
import fsrs
db.update_word_progress("verb_go", fsrs.Rating.Good)
assert mock_vocab_and_db.get_word_status("verb_go") == "learning"