From 492a4b191a0b7c92f5378a2070b70a9dd215bdf7 Mon Sep 17 00:00:00 2001 From: Jino Jose Date: Tue, 30 Jun 2026 09:09:43 +0530 Subject: [PATCH] Complete backend production integrations --- ansible/roles/cezen-backend/files/main.py | 405 ++++++++++++++++-- .../roles/cezen-backend/files/rag_ingest.py | 274 ++++++++++++ .../cezen-backend/files/requirements.txt | 7 + ansible/roles/cezen-backend/tasks/main.yml | 10 + 4 files changed, 667 insertions(+), 29 deletions(-) create mode 100644 ansible/roles/cezen-backend/files/rag_ingest.py diff --git a/ansible/roles/cezen-backend/files/main.py b/ansible/roles/cezen-backend/files/main.py index 3062551..f1a22ad 100644 --- a/ansible/roles/cezen-backend/files/main.py +++ b/ansible/roles/cezen-backend/files/main.py @@ -798,6 +798,19 @@ def audit(db, user_id, username, action, detail="", ip="", result="success"): (utcnow(), user_id, username, action, detail, ip, result) ) +def _audit(db, username, action, detail="", ip="", result="success", user_id=None): + """Audit helper for API sections that do not already hold a DB connection.""" + own_db = db is None + if own_db: + db = get_db() + try: + audit(db, user_id, username, action, detail, ip, result) + if own_db: + db.commit() + finally: + if own_db: + db.close() + def _setting_value(key: str, default: str = "") -> str: db = get_db() try: @@ -1023,8 +1036,8 @@ async def change_password(body: ChangePasswordRequest, request: Request, user: d # ── User management (admin) ─────────────────────────────────────────────────── -@app.get("/api/users/sessions") -async def active_sessions(admin: dict = Depends(admin_only)): +@app.get("/api/users/recent-logins") +async def recent_logins(admin: dict = Depends(admin_only)): """Return list of users who have logged in within the last 8 hours (token lifetime).""" db = get_db() # Use the same isoformat() style that utcnow() stores (includes +00:00 suffix) @@ -1960,7 +1973,7 @@ async def upload_document(cid: int, file: UploadFile = File(...), admin: dict = # Launch ingest subprocess runner = Path(__file__).parent / "rag_ingest.py" subprocess.Popen( - ["python3", str(runner), + [sys.executable, str(runner), "--doc-id", str(doc_id), "--db-path", str(DB_PATH), "--file", str(dest), @@ -2800,7 +2813,8 @@ AGENT_STEP_TYPES = {"prompt", "summarise", "extract", "classify", "rag", "format def _call_ollama_sync(model: str, prompt: str) -> str: import urllib.request as _ur - payload = json.dumps({"model": model, "prompt": prompt, "stream": False}).encode() + routed = _select_router_model(prompt, model, {"content_type": "text"}) + payload = json.dumps({"model": routed["model"], "prompt": prompt, "stream": False}).encode() req = _ur.Request( f"{OLLAMA_URL}/api/generate", data=payload, @@ -2839,12 +2853,13 @@ async def _execute_agent(run_id: int): try: run_row = db.execute("SELECT * FROM agent_runs WHERE id=?", (run_id,)).fetchone() if not run_row: - return + return {"status": "error", "output": "", "error": "Agent run not found"} agent_row = db.execute("SELECT * FROM agents WHERE id=?", (run_row["agent_id"],)).fetchone() if not agent_row: db.execute("UPDATE agent_runs SET status='error',error_msg='Agent not found',finished_at=? WHERE id=?", (utcnow(), run_id)) - db.commit(); db.close(); return + db.commit() + return {"status": "error", "output": "", "error": "Agent not found"} steps = json.loads(agent_row["steps"]) pipeline_input = run_row["input"] @@ -2903,18 +2918,21 @@ async def _execute_agent(run_id: int): "UPDATE agent_runs SET status='error',error_msg=?,steps_log=?,output=?,finished_at=? WHERE id=?", (str(e), json.dumps(steps_log), prev_output, utcnow(), run_id) ) - db.commit(); db.close(); return + db.commit() + return {"status": "error", "output": prev_output, "error": str(e)} db.execute( "UPDATE agent_runs SET status='done',output=?,steps_log=?,finished_at=? WHERE id=?", (prev_output, json.dumps(steps_log), utcnow(), run_id) ) db.commit() + return {"status": "done", "output": prev_output, "error": None} except Exception as e: db.execute("UPDATE agent_runs SET status='error',error_msg=?,finished_at=? WHERE id=?", (str(e), utcnow(), run_id)) db.commit() + return {"status": "error", "output": "", "error": str(e)} finally: db.close() @@ -3669,8 +3687,9 @@ Respond with ONLY valid JSON, no markdown, no explanation: def _call_model_sync(model: str, prompt_text: str) -> str: """Call an Ollama model and return its text response.""" import urllib.request, json as _json + routed = _select_router_model(prompt_text, model, {"content_type": "text"}) payload = { - "model": model, + "model": routed["model"], "messages": [{"role": "user", "content": prompt_text}], "stream": False, } @@ -3933,7 +3952,8 @@ def _run_scheduled_job_sync(job_id: int): try: if job["job_type"] == "prompt": # Build messages and call Ollama - model = job["model"] or "llama3" + routed = _select_router_model(job["prompt_text"], job["model"] or "llama3", {"content_type": "text"}) + model = routed["model"] payload = { "model": model, "messages": [{"role": "user", "content": job["prompt_text"]}], @@ -3954,15 +3974,20 @@ def _run_scheduled_job_sync(job_id: int): agent = db.execute("SELECT * FROM agents WHERE id=?", (job["agent_id"],)).fetchone() if not agent: raise ValueError("Agent not found") - steps = json.loads(agent["steps"]) input_text = job["prompt_text"] or f"Scheduled run of agent: {agent['name']}" - # Reuse the existing agent execution helper + agent_run_id = db.execute( + "INSERT INTO agent_runs (agent_id,agent_name,user_id,username,input,status,created_at) VALUES (?,?,?,?,?,'pending',?)", + (agent["id"], agent["name"], job["user_id"], job["username"], input_text, utcnow()) + ).lastrowid + db.commit() loop = _asyncio.new_event_loop() try: - result = loop.run_until_complete(_execute_agent(steps, input_text, agent)) + result = loop.run_until_complete(_execute_agent(agent_run_id)) finally: loop.close() output = result.get("output", "") + if result.get("status") == "error": + raise ValueError(result.get("error") or "Agent run failed") except Exception as e: error = str(e) @@ -4475,12 +4500,66 @@ def _connector_from_row(row) -> dict: "stats": _json_load(d["stats_json"], {"files": 0, "rowsRead": 0, "lastSync": None, "errors": 0}), } +def _estimate_tokens(text: str) -> int: + return max(1, round(len((text or "").split()) * 1.3)) + +def _select_router_model(prompt: str, default_model: str, context: Optional[dict] = None) -> dict: + """Apply enabled model-router rules to backend model calls.""" + ctx = context or {} + token_est = _estimate_tokens(prompt) + content_type = ctx.get("content_type") or "text" + user_role = ctx.get("user_role") or "" + db = get_db() + try: + rows = db.execute( + "SELECT * FROM router_rules WHERE enabled=1 ORDER BY priority, id" + ).fetchall() + finally: + db.close() + + for row in rows: + conditions = _json_load(row["conditions_json"], []) + matched = True + for cond in conditions: + ctype = (cond.get("type") or "").strip() + value = str(cond.get("value") or "").strip() + if ctype == "keyword": + keywords = [k.strip().lower() for k in value.split(",") if k.strip()] + matched = any(k in (prompt or "").lower() for k in keywords) + elif ctype == "content_type": + matched = value == content_type + elif ctype == "token_gt": + try: + matched = token_est > int(value or 0) + except ValueError: + matched = False + elif ctype == "token_lt": + try: + matched = token_est < int(value or 0) + except ValueError: + matched = False + elif ctype == "user_role": + matched = value == user_role + else: + matched = False + if not matched: + break + if matched and row["model"]: + return { + "model": row["model"], + "route": row["name"], + "max_tokens": row["max_tokens"], + "temperature": row["temperature"], + } + return {"model": default_model or "llama3", "route": "default", "max_tokens": None, "temperature": None} + def _call_ollama_generate(model: str, prompt: str, max_tokens: int = 512) -> str: + routed = _select_router_model(prompt, model, {"content_type": "text"}) payload = json.dumps({ - "model": model or "llama3", + "model": routed["model"], "prompt": prompt, "stream": False, - "options": {"num_predict": max_tokens} + "options": {"num_predict": int(routed.get("max_tokens") or max_tokens)} }).encode() req = urllib.request.Request(f"{OLLAMA_URL}/api/generate", data=payload, method="POST") req.add_header("Content-Type", "application/json") @@ -4500,6 +4579,185 @@ def _summarise_locally(text: str) -> str: preview = " ".join(words[:80]) return f"Summary generated from {len(words)} words. Key discussion: {preview}{'...' if len(words) > 80 else ''}" +def _transcribe_meeting_audio(raw: bytes, suffix: str) -> str: + try: + from faster_whisper import WhisperModel + except ImportError as exc: + raise HTTPException( + status_code=501, + detail="Audio transcription requires faster-whisper. Install backend requirements and ensure ffmpeg is available.", + ) from exc + model_size = os.environ.get("CEZEN_WHISPER_MODEL", "base") + device = os.environ.get("CEZEN_WHISPER_DEVICE", "cpu") + compute_type = os.environ.get("CEZEN_WHISPER_COMPUTE_TYPE", "int8") + cache_key = (model_size, device, compute_type) + if getattr(_transcribe_meeting_audio, "_cache_key", None) != cache_key: + _transcribe_meeting_audio._model = WhisperModel(model_size, device=device, compute_type=compute_type) + _transcribe_meeting_audio._cache_key = cache_key + with tempfile.NamedTemporaryFile(suffix=suffix or ".audio", delete=False) as tmp: + tmp.write(raw) + tmp_path = tmp.name + try: + segments, _info = _transcribe_meeting_audio._model.transcribe(tmp_path, vad_filter=True) + transcript = "\n".join(seg.text.strip() for seg in segments if seg.text.strip()) + finally: + try: + Path(tmp_path).unlink(missing_ok=True) + except Exception: + pass + if not transcript: + raise HTTPException(status_code=422, detail="No speech could be transcribed from this file") + return transcript + +class WorkflowStop(Exception): + def __init__(self, message: str): + super().__init__(message) + self.message = message + +def _workflow_condition_true(condition: str, values: dict) -> bool: + expr = _template(condition or "", values).strip() + if not expr: + return True + if "==" in expr: + left, right = expr.split("==", 1) + return left.strip().strip("'\"") == right.strip().strip("'\"") + if "!=" in expr: + left, right = expr.split("!=", 1) + return left.strip().strip("'\"") != right.strip().strip("'\"") + for op in (">=", "<=", ">", "<"): + if op in expr: + left, right = expr.split(op, 1) + try: + a = float(left.strip()) + b = float(right.strip()) + except ValueError: + return False + return {">=": a >= b, "<=": a <= b, ">": a > b, "<": a < b}[op] + return expr.lower() in ("true", "yes", "1", "continue") + +def _workflow_rag_search(cfg: dict, values: dict) -> str: + query = _template(cfg.get("query") or values.get("input") or "", values).strip() + collection = cfg.get("collection") or cfg.get("collection_id") + if not query: + raise ValueError("RAG search requires input or query") + db = get_db() + try: + if str(collection).isdigit(): + col = db.execute("SELECT * FROM kb_collections WHERE id=?", (int(collection),)).fetchone() + else: + col = db.execute("SELECT * FROM kb_collections WHERE name=? OR chroma_name=?", (str(collection), str(collection))).fetchone() + finally: + db.close() + if not col: + raise ValueError(f"Knowledge collection not found: {collection or '(blank)'}") + embed = _ollama_embed([query], model=col["embed_model"]) + result = _chroma_req("POST", f"/api/v1/collections/{col['chroma_name']}/query", body={ + "query_embeddings": embed, + "n_results": int(cfg.get("n_results") or 5), + "include": ["documents", "metadatas", "distances"], + }) + docs = result.get("documents", [[]])[0] + metas = result.get("metadatas", [[]])[0] + items = [] + for text, meta in zip(docs, metas): + source = meta.get("source") or "knowledge base" + page = f" p.{meta.get('page')}" if meta.get("page") else "" + items.append(f"[{source}{page}] {text}") + return "\n\n".join(items) if items else "No matching knowledge passages found." + +def _ensure_kb_collection(name: str) -> sqlite3.Row: + db = get_db() + try: + row = db.execute("SELECT * FROM kb_collections WHERE name=? OR chroma_name=?", (name, name)).fetchone() + if row: + return row + import re as _re + chroma_name = "cezen-" + _re.sub(r"[^a-z0-9-]", "-", name.lower())[:55] + try: + _chroma_req("POST", "/api/v1/collections", body={"name": chroma_name, "metadata": {"hnsw:space": "cosine"}}) + except HTTPException as e: + if e.status_code not in (409,): + raise + cur = db.execute( + "INSERT INTO kb_collections (name, description, chroma_name, embed_model, created_at) VALUES (?,?,?,?,?)", + (name, "Created by workflow automation", chroma_name, OLLAMA_EMBED, utcnow()) + ) + db.commit() + return db.execute("SELECT * FROM kb_collections WHERE id=?", (cur.lastrowid,)).fetchone() + finally: + db.close() + +def _workflow_save_kb(cfg: dict, values: dict) -> str: + collection_name = cfg.get("collection") or "Workflow Outputs" + text = _template(cfg.get("text") or cfg.get("body") or values.get("input") or values.get("step1.output") or "", values).strip() + if not text: + raise ValueError("Save to KB requires text/body/input") + col = _ensure_kb_collection(collection_name) + col_dir = KB_DOCS_DIR / str(col["id"]) + col_dir.mkdir(exist_ok=True) + dest = col_dir / f"workflow-{uuid.uuid4().hex}.txt" + dest.write_text(text, encoding="utf-8") + db = get_db() + try: + doc_id = db.execute( + "INSERT INTO kb_documents (collection_id, orig_name, file_path, size_bytes, status, uploaded_at) VALUES (?,?,?,?,?,?)", + (col["id"], dest.name, str(dest), dest.stat().st_size, "pending", utcnow()) + ).lastrowid + db.commit() + finally: + db.close() + runner = Path(__file__).parent / "rag_ingest.py" + subprocess.Popen( + [sys.executable, str(runner), "--doc-id", str(doc_id), "--db-path", str(DB_PATH), "--file", str(dest), + "--collection", col["chroma_name"], "--embed-model", col["embed_model"], "--chroma-url", CHROMA_URL, + "--ollama-url", OLLAMA_URL], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, + ) + return f"Saved document {doc_id} to knowledge collection {col['name']} and started ingestion." + +def _workflow_send_email(cfg: dict, values: dict) -> str: + import smtplib + from email.message import EmailMessage + host = os.environ.get("CEZEN_SMTP_HOST") + if not host: + raise RuntimeError("SMTP is not configured. Set CEZEN_SMTP_HOST and related CEZEN_SMTP_* variables.") + port = int(os.environ.get("CEZEN_SMTP_PORT", "587")) + sender = os.environ.get("CEZEN_SMTP_FROM") or os.environ.get("CEZEN_SMTP_USER") + recipient = _template(cfg.get("to") or "", values) + if not sender or not recipient: + raise RuntimeError("Email step requires sender and recipient") + msg = EmailMessage() + msg["From"] = sender + msg["To"] = recipient + msg["Subject"] = _template(cfg.get("subject") or "Nexus One AI workflow output", values) + msg.set_content(_template(cfg.get("body") or cfg.get("bodyTemplate") or values.get("input") or "", values)) + with smtplib.SMTP(host, port, timeout=30) as smtp: + if os.environ.get("CEZEN_SMTP_TLS", "true").lower() in ("1", "true", "yes"): + smtp.starttls() + if os.environ.get("CEZEN_SMTP_USER"): + smtp.login(os.environ["CEZEN_SMTP_USER"], os.environ.get("CEZEN_SMTP_PASSWORD", "")) + smtp.send_message(msg) + return f"Email sent to {recipient}." + +def _workflow_http_request(cfg: dict, values: dict) -> str: + import urllib.parse + url = _template(cfg.get("url") or "", values).replace("{{_API}}", "http://localhost:8080") + parsed = urllib.parse.urlparse(url) + if parsed.scheme not in ("http", "https"): + raise ValueError("HTTP step requires an http:// or https:// URL") + method = (cfg.get("method") or "POST").upper() + headers = cfg.get("headers") or {} + if isinstance(headers, str): + headers = _json_load(headers, {}) + body_text = _template(cfg.get("bodyTemplate") or cfg.get("body") or "", values) + data = body_text.encode() if method not in ("GET", "HEAD") and body_text else None + req = urllib.request.Request(url, data=data, headers=headers, method=method) + if data and "Content-Type" not in headers: + req.add_header("Content-Type", "application/json") + with urllib.request.urlopen(req, timeout=int(cfg.get("timeout") or 60)) as resp: + payload = resp.read(512 * 1024).decode(errors="replace") + return f"HTTP {method} {url} -> {resp.status}\n{payload}" + def _meeting_result_from_text(transcript: str, meta: dict, opts: dict) -> dict: prompt = ( "Analyse this meeting transcript and return concise meeting output with sections: " @@ -4568,15 +4826,19 @@ def _workflow_step_output(step: dict, values: dict) -> str: if stype == "summarise": return _summarise_locally(values.get("input", "")) if stype == "rag_search": - return f"Knowledge search prepared for collection {cfg.get('collection') or 'default'}." + return _workflow_rag_search(cfg, values) if stype == "save_kb": - return f"Queued save into knowledge collection {cfg.get('collection') or 'default'}." + return _workflow_save_kb(cfg, values) if stype == "filter": - return "Condition evaluated as true." + matched = _workflow_condition_true(cfg.get("condition") or "", values) + action = cfg.get("trueAction" if matched else "falseAction") or "continue" + if action == "stop": + raise WorkflowStop(f"Condition evaluated as {str(matched).lower()}; workflow stopped.") + return f"Condition evaluated as {str(matched).lower()}; continuing workflow." if stype == "email": - return f"Email queued for {cfg.get('to') or 'recipient'}." + return _workflow_send_email(cfg, values) if stype == "http": - return f"{cfg.get('method', 'POST')} request prepared for {cfg.get('url') or 'endpoint'}." + return _workflow_http_request(cfg, values) return "Step completed." @app.get("/api/models/list") @@ -4650,7 +4912,11 @@ async def run_workflow_api(workflow_id: str, body: dict, user: dict = Depends(cu outputs = {} try: for idx, step in enumerate(wf.get("steps") or [], start=1): - out = _workflow_step_output(step, values) + try: + out = _workflow_step_output(step, values) + except WorkflowStop as stop: + log.append({"step": step.get("name") or step.get("type") or f"Step {idx}", "status": "stopped", "detail": stop.message}) + break key = (step.get("config") or {}).get("outputVar") or f"step{idx}.output" values[key] = out values[f"step{idx}.output"] = out @@ -4711,6 +4977,86 @@ async def save_connectors(body: list[dict], user: dict = Depends(current_user)): db.commit(); db.close() return {"ok": True} +def _connector_db_stats(cfg: dict) -> tuple[int, int]: + db_type = (cfg.get("dbType") or cfg.get("engine") or cfg.get("type") or "sqlite").lower() + tables = [t.strip() for t in (cfg.get("tables") or "").split(",") if t.strip()] + if db_type == "sqlite": + path = cfg.get("path") or cfg.get("database") + if not path: + raise ValueError("SQLite connector requires path or database") + conn = sqlite3.connect(path) + try: + if not tables: + tables = [r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'").fetchall()] + rows = 0 + for table in tables: + if not table.replace("_", "").replace("-", "").isalnum(): + raise ValueError(f"Unsafe table name: {table}") + rows += int(conn.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()[0]) + return len(tables), rows + finally: + conn.close() + if db_type in ("postgres", "postgresql"): + try: + import psycopg2 + except ImportError as exc: + raise RuntimeError("PostgreSQL connector requires psycopg2-binary") from exc + conn = psycopg2.connect( + host=cfg.get("host") or "localhost", + port=int(cfg.get("port") or 5432), + dbname=cfg.get("database"), + user=cfg.get("username") or cfg.get("user"), + password=cfg.get("password") or "", + connect_timeout=int(cfg.get("timeout") or 10), + ) + try: + with conn.cursor() as cur: + if not tables: + cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'") + tables = [r[0] for r in cur.fetchall()] + rows = 0 + for table in tables: + cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='public' AND table_name=%s", (table,)) + if cur.fetchone()[0] != 1: + raise ValueError(f"Table not found: {table}") + safe_table = table.replace('"', '""') + cur.execute(f'SELECT COUNT(*) FROM "{safe_table}"') + rows += int(cur.fetchone()[0]) + return len(tables), rows + finally: + conn.close() + if db_type in ("mysql", "mariadb"): + try: + import pymysql + except ImportError as exc: + raise RuntimeError("MySQL connector requires pymysql") from exc + conn = pymysql.connect( + host=cfg.get("host") or "localhost", + port=int(cfg.get("port") or 3306), + database=cfg.get("database"), + user=cfg.get("username") or cfg.get("user"), + password=cfg.get("password") or "", + connect_timeout=int(cfg.get("timeout") or 10), + read_timeout=int(cfg.get("timeout") or 10), + ) + try: + with conn.cursor() as cur: + if not tables: + cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema=%s AND table_type='BASE TABLE'", (cfg.get("database"),)) + tables = [r[0] for r in cur.fetchall()] + rows = 0 + for table in tables: + cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=%s AND table_name=%s", (cfg.get("database"), table)) + if cur.fetchone()[0] != 1: + raise ValueError(f"Table not found: {table}") + safe_table = table.replace("`", "``") + cur.execute(f"SELECT COUNT(*) FROM `{safe_table}`") + rows += int(cur.fetchone()[0]) + return len(tables), rows + finally: + conn.close() + raise ValueError(f"Unsupported database connector type: {db_type}") + @app.post("/api/connectors/{connector_id}/sync") async def sync_connector(connector_id: str, user: dict = Depends(current_user)): db = get_db() @@ -4738,11 +5084,15 @@ async def sync_connector(connector_id: str, user: dict = Depends(current_user)): level = "warn"; status = "error"; errors = int(stats.get("errors") or 0) + 1 stats.update({"files": files, "lastSync": utcnow(), "errors": errors}) else: - tables = [t.strip() for t in (cfg.get("tables") or "").split(",") if t.strip()] - rows_read = max(int(stats.get("rowsRead") or 0), len(tables) * 100) - stats.update({"rowsRead": rows_read, "lastSync": utcnow(), "errors": 0}) - msg = f"[{c['name']}] Read-only schema check complete — {len(tables)} tables configured" - level = "ok"; status = "ok" + try: + table_count, rows_read = _connector_db_stats(cfg) + stats.update({"rowsRead": rows_read, "tables": table_count, "lastSync": utcnow(), "errors": 0}) + msg = f"[{c['name']}] Read-only sync complete — {table_count} tables, {rows_read} rows visible" + level = "ok"; status = "ok" + except Exception as e: + stats.update({"lastSync": utcnow(), "errors": int(stats.get("errors") or 0) + 1}) + msg = f"[{c['name']}] Database sync failed: {e}" + level = "error"; status = "error" db.execute("UPDATE connectors SET status=?, stats_json=?, updated_at=? WHERE id=?", (status, json.dumps(stats), utcnow(), connector_id)) db.execute("INSERT INTO connector_log (connector_id,level,msg,ts) VALUES (?,?,?,?)", @@ -4903,10 +5253,7 @@ async def process_meeting(file: UploadFile = File(...), meta: str = Form("{}"), if suffix in (".txt", ".md", ".vtt", ".srt"): text = raw.decode(errors="replace") if not text: - text = ( - f"Uploaded meeting file: {file.filename or 'audio'} ({round(len(raw)/1024/1024, 2)} MB). " - "Audio transcription service is not configured on this node yet, so this record was processed from file metadata." - ) + text = _transcribe_meeting_audio(raw, suffix) result = _meeting_result_from_text(text, meta_obj, meta_obj.get("opts") or {}) db = get_db() db.execute( diff --git a/ansible/roles/cezen-backend/files/rag_ingest.py b/ansible/roles/cezen-backend/files/rag_ingest.py new file mode 100644 index 0000000..5951ed7 --- /dev/null +++ b/ansible/roles/cezen-backend/files/rag_ingest.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +""" +Nexus One AI — RAG Document Ingest Worker +Launched as a subprocess by the FastAPI backend when a document is uploaded. + +Pipeline: + 1. Extract text from PDF / DOCX / TXT / MD / CSV + 2. Chunk into overlapping segments + 3. Embed each chunk via Ollama /api/embeddings + 4. Store chunks + embeddings in ChromaDB + 5. Update kb_documents status in SQLite + +Usage (called by main.py): + python3 rag_ingest.py \\ + --doc-id 1 --db-path /opt/cezen/data/cezen.db \\ + --file /opt/cezen/data/kb_docs/1/abc.pdf \\ + --collection cezen-myknowledgebase \\ + --embed-model nomic-embed-text \\ + --chroma-url http://localhost:8000 \\ + --ollama-url http://localhost:11434 +""" + +import argparse, json, os, sqlite3, sys, uuid +from datetime import datetime, timezone +from pathlib import Path + +parser = argparse.ArgumentParser() +parser.add_argument("--doc-id", type=int, required=True) +parser.add_argument("--db-path", required=True) +parser.add_argument("--file", required=True) +parser.add_argument("--collection", required=True) +parser.add_argument("--embed-model", default="nomic-embed-text") +parser.add_argument("--chroma-url", default="http://localhost:8000") +parser.add_argument("--ollama-url", default="http://localhost:11434") +args = parser.parse_args() + +CHUNK_SIZE = 512 # tokens/chars per chunk +CHUNK_OVERLAP = 64 # overlap between consecutive chunks +BATCH_SIZE = 16 # how many chunks to embed + upsert at once + +def utcnow(): + return datetime.now(timezone.utc).isoformat() + +def db_connect(): + conn = sqlite3.connect(args.db_path) + conn.row_factory = sqlite3.Row + return conn + +def set_status(status: str, chunk_count: int = 0, error: str = None): + db = db_connect() + db.execute( + "UPDATE kb_documents SET status=?, chunk_count=?, error_msg=?, processed_at=? WHERE id=?", + (status, chunk_count, error, utcnow(), args.doc_id) + ) + if status == "ready" and chunk_count > 0: + # Update collection counters + db.execute(""" + UPDATE kb_collections SET + doc_count = (SELECT COUNT(*) FROM kb_documents WHERE collection_id=(SELECT collection_id FROM kb_documents WHERE id=?) AND status='ready'), + chunk_count = chunk_count + ? + WHERE id = (SELECT collection_id FROM kb_documents WHERE id=?) + """, (args.doc_id, chunk_count, args.doc_id)) + db.commit() + db.close() + +# ── Text extraction ─────────────────────────────────────────────────────────── + +def extract_pdf(path: str) -> list[dict]: + """Returns list of {text, page} dicts.""" + pages = [] + try: + import pypdf + reader = pypdf.PdfReader(path) + for i, page in enumerate(reader.pages): + text = (page.extract_text() or "").strip() + if text: + pages.append({"text": text, "page": i + 1}) + except ImportError: + # Fallback: try pdfminer + try: + from pdfminer.high_level import extract_pages + from pdfminer.layout import LTTextContainer + page_num = 0 + for page_layout in extract_pages(path): + page_num += 1 + texts = [] + for element in page_layout: + if isinstance(element, LTTextContainer): + texts.append(element.get_text()) + text = "".join(texts).strip() + if text: + pages.append({"text": text, "page": page_num}) + except ImportError: + raise RuntimeError("Install pypdf or pdfminer.six: pip install pypdf") + return pages + +def extract_docx(path: str) -> list[dict]: + try: + from docx import Document + doc = Document(path) + text = "\n".join(p.text for p in doc.paragraphs if p.text.strip()) + return [{"text": text, "page": None}] + except ImportError: + raise RuntimeError("Install python-docx: pip install python-docx") + +def extract_text(path: str, ext: str) -> list[dict]: + """Returns list of {text, page} dicts by file type.""" + if ext == ".pdf": + return extract_pdf(path) + if ext in (".docx", ".doc"): + return extract_docx(path) + if ext == ".csv": + import csv + with open(path, encoding="utf-8", errors="replace") as f: + rows = list(csv.reader(f)) + if not rows: + return [] + headers = rows[0] + lines = [", ".join(f"{h}: {v}" for h, v in zip(headers, row)) for row in rows[1:] if any(row)] + return [{"text": "\n".join(lines), "page": None}] + # TXT, MD, and anything else + text = Path(path).read_text(encoding="utf-8", errors="replace") + return [{"text": text, "page": None}] + +# ── Chunking ────────────────────────────────────────────────────────────────── + +def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: + """Split text into overlapping chunks by character count.""" + if not text: + return [] + chunks = [] + start = 0 + while start < len(text): + end = start + chunk_size + chunk = text[start:end] + # Try to break at a sentence/paragraph boundary + if end < len(text): + for sep in ["\n\n", "\n", ". ", " "]: + idx = chunk.rfind(sep) + if idx > chunk_size // 2: + chunk = chunk[:idx + len(sep)] + break + chunks.append(chunk.strip()) + start += len(chunk) - overlap + if start >= len(text): + break + return [c for c in chunks if c] + +# ── Embedding ───────────────────────────────────────────────────────────────── + +def embed_batch(texts: list[str]) -> list[list[float]]: + """Embed a batch of texts via Ollama.""" + import urllib.request, urllib.error + embeddings = [] + for text in texts: + body = json.dumps({"model": args.embed_model, "prompt": text}).encode() + req = urllib.request.Request( + f"{args.ollama_url}/api/embeddings", data=body, method="POST" + ) + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, timeout=60) as r: + embeddings.append(json.loads(r.read().decode())["embedding"]) + except Exception as e: + raise RuntimeError(f"Ollama embedding error: {e}") + return embeddings + +# ── ChromaDB upsert ─────────────────────────────────────────────────────────── + +def chroma_upsert(ids, embeddings, documents, metadatas): + """Upsert a batch of chunks into ChromaDB.""" + import urllib.request, urllib.error + body = json.dumps({ + "ids": ids, + "embeddings": embeddings, + "documents": documents, + "metadatas": metadatas, + }).encode() + req = urllib.request.Request( + f"{args.chroma_url}/api/v1/collections/{args.collection}/upsert", + data=body, method="POST" + ) + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as r: + return json.loads(r.read().decode()) + except urllib.error.HTTPError as e: + raise RuntimeError(f"ChromaDB upsert error {e.code}: {e.read().decode()}") + +# ── Main ────────────────────────────────────────────────────────────────────── + +def main(): + set_status("processing") + + file_path = args.file + ext = Path(file_path).suffix.lower() + source_name = Path(file_path).name + + # Get original filename from DB + try: + db = db_connect() + row = db.execute("SELECT orig_name FROM kb_documents WHERE id=?", (args.doc_id,)).fetchone() + db.close() + if row: + source_name = row["orig_name"] + except Exception: + pass + + # Extract text + try: + pages = extract_text(file_path, ext) + except Exception as e: + set_status("failed", error=str(e)) + sys.exit(1) + + if not pages: + set_status("failed", error="No text could be extracted from the document") + sys.exit(1) + + # Chunk all pages + all_chunks = [] + for page_info in pages: + chunks = chunk_text(page_info["text"]) + for i, chunk in enumerate(chunks): + all_chunks.append({ + "text": chunk, + "page": page_info.get("page"), + "chunk": len(all_chunks) + i, + "source": source_name, + "doc_id": args.doc_id, + }) + + if not all_chunks: + set_status("failed", error="Document produced no text chunks after processing") + sys.exit(1) + + # Embed and upsert in batches + total = len(all_chunks) + processed = 0 + try: + for batch_start in range(0, total, BATCH_SIZE): + batch = all_chunks[batch_start:batch_start + BATCH_SIZE] + texts = [c["text"] for c in batch] + + embeddings = embed_batch(texts) + + ids = [f"doc{args.doc_id}-chunk{c['chunk']}" for c in batch] + metas = [{ + "doc_id": c["doc_id"], + "source": c["source"], + "page": c["page"] or 0, + "chunk": c["chunk"], + } for c in batch] + + chroma_upsert(ids, embeddings, texts, metas) + processed += len(batch) + + except Exception as e: + import traceback + set_status("failed", chunk_count=processed, error=f"{e}\n{traceback.format_exc()[:500]}") + sys.exit(1) + + set_status("ready", chunk_count=total) + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + set_status("failed", error="Interrupted") + sys.exit(130) + except Exception as e: + import traceback + set_status("failed", error=f"{e}\n{traceback.format_exc()[:500]}") + sys.exit(1) diff --git a/ansible/roles/cezen-backend/files/requirements.txt b/ansible/roles/cezen-backend/files/requirements.txt index 04b3b68..bc7a0e3 100644 --- a/ansible/roles/cezen-backend/files/requirements.txt +++ b/ansible/roles/cezen-backend/files/requirements.txt @@ -9,5 +9,12 @@ aiofiles>=23.0.0 # Document Intelligence pymupdf>=1.24.0 # PDF text extraction (fitz) python-docx>=1.1.0 # Word document extraction +pypdf>=4.2.0 # RAG ingest PDF text extraction +pdfminer.six>=20231228 # RAG ingest PDF fallback parser # Scheduled Jobs apscheduler>=3.10.0 # In-process cron/interval scheduler +# Meeting audio transcription +faster-whisper>=1.0.0 +# Connector drivers +psycopg2-binary>=2.9.9 +pymysql>=1.1.0 diff --git a/ansible/roles/cezen-backend/tasks/main.yml b/ansible/roles/cezen-backend/tasks/main.yml index f9b498d..265b689 100644 --- a/ansible/roles/cezen-backend/tasks/main.yml +++ b/ansible/roles/cezen-backend/tasks/main.yml @@ -10,6 +10,7 @@ - python3.11-venv - libmupdf-dev # required by pymupdf (Document Intelligence) - mupdf-tools + - ffmpeg # required by faster-whisper audio transcription state: present update_cache: yes @@ -46,6 +47,15 @@ group: "{{ cezen_user }}" mode: "0755" +- name: Copy RAG ingest worker + copy: + src: rag_ingest.py + dest: /opt/cezen/backend/rag_ingest.py + owner: "{{ cezen_user }}" + group: "{{ cezen_user }}" + mode: "0755" + notify: Restart cezen-api + - name: Copy requirements.txt copy: src: requirements.txt