Complete backend production integrations

This commit is contained in:
Jino Jose 2026-06-30 09:09:43 +05:30
parent 56668f7bdc
commit 492a4b191a
4 changed files with 667 additions and 29 deletions

View File

@ -798,6 +798,19 @@ def audit(db, user_id, username, action, detail="", ip="", result="success"):
(utcnow(), user_id, username, action, detail, ip, result)
)
def _audit(db, username, action, detail="", ip="", result="success", user_id=None):
"""Audit helper for API sections that do not already hold a DB connection."""
own_db = db is None
if own_db:
db = get_db()
try:
audit(db, user_id, username, action, detail, ip, result)
if own_db:
db.commit()
finally:
if own_db:
db.close()
def _setting_value(key: str, default: str = "") -> str:
db = get_db()
try:
@ -1023,8 +1036,8 @@ async def change_password(body: ChangePasswordRequest, request: Request, user: d
# ── User management (admin) ───────────────────────────────────────────────────
@app.get("/api/users/sessions")
async def active_sessions(admin: dict = Depends(admin_only)):
@app.get("/api/users/recent-logins")
async def recent_logins(admin: dict = Depends(admin_only)):
"""Return list of users who have logged in within the last 8 hours (token lifetime)."""
db = get_db()
# Use the same isoformat() style that utcnow() stores (includes +00:00 suffix)
@ -1960,7 +1973,7 @@ async def upload_document(cid: int, file: UploadFile = File(...), admin: dict =
# Launch ingest subprocess
runner = Path(__file__).parent / "rag_ingest.py"
subprocess.Popen(
["python3", str(runner),
[sys.executable, str(runner),
"--doc-id", str(doc_id),
"--db-path", str(DB_PATH),
"--file", str(dest),
@ -2800,7 +2813,8 @@ AGENT_STEP_TYPES = {"prompt", "summarise", "extract", "classify", "rag", "format
def _call_ollama_sync(model: str, prompt: str) -> str:
import urllib.request as _ur
payload = json.dumps({"model": model, "prompt": prompt, "stream": False}).encode()
routed = _select_router_model(prompt, model, {"content_type": "text"})
payload = json.dumps({"model": routed["model"], "prompt": prompt, "stream": False}).encode()
req = _ur.Request(
f"{OLLAMA_URL}/api/generate",
data=payload,
@ -2839,12 +2853,13 @@ async def _execute_agent(run_id: int):
try:
run_row = db.execute("SELECT * FROM agent_runs WHERE id=?", (run_id,)).fetchone()
if not run_row:
return
return {"status": "error", "output": "", "error": "Agent run not found"}
agent_row = db.execute("SELECT * FROM agents WHERE id=?", (run_row["agent_id"],)).fetchone()
if not agent_row:
db.execute("UPDATE agent_runs SET status='error',error_msg='Agent not found',finished_at=? WHERE id=?",
(utcnow(), run_id))
db.commit(); db.close(); return
db.commit()
return {"status": "error", "output": "", "error": "Agent not found"}
steps = json.loads(agent_row["steps"])
pipeline_input = run_row["input"]
@ -2903,18 +2918,21 @@ async def _execute_agent(run_id: int):
"UPDATE agent_runs SET status='error',error_msg=?,steps_log=?,output=?,finished_at=? WHERE id=?",
(str(e), json.dumps(steps_log), prev_output, utcnow(), run_id)
)
db.commit(); db.close(); return
db.commit()
return {"status": "error", "output": prev_output, "error": str(e)}
db.execute(
"UPDATE agent_runs SET status='done',output=?,steps_log=?,finished_at=? WHERE id=?",
(prev_output, json.dumps(steps_log), utcnow(), run_id)
)
db.commit()
return {"status": "done", "output": prev_output, "error": None}
except Exception as e:
db.execute("UPDATE agent_runs SET status='error',error_msg=?,finished_at=? WHERE id=?",
(str(e), utcnow(), run_id))
db.commit()
return {"status": "error", "output": "", "error": str(e)}
finally:
db.close()
@ -3669,8 +3687,9 @@ Respond with ONLY valid JSON, no markdown, no explanation:
def _call_model_sync(model: str, prompt_text: str) -> str:
"""Call an Ollama model and return its text response."""
import urllib.request, json as _json
routed = _select_router_model(prompt_text, model, {"content_type": "text"})
payload = {
"model": model,
"model": routed["model"],
"messages": [{"role": "user", "content": prompt_text}],
"stream": False,
}
@ -3933,7 +3952,8 @@ def _run_scheduled_job_sync(job_id: int):
try:
if job["job_type"] == "prompt":
# Build messages and call Ollama
model = job["model"] or "llama3"
routed = _select_router_model(job["prompt_text"], job["model"] or "llama3", {"content_type": "text"})
model = routed["model"]
payload = {
"model": model,
"messages": [{"role": "user", "content": job["prompt_text"]}],
@ -3954,15 +3974,20 @@ def _run_scheduled_job_sync(job_id: int):
agent = db.execute("SELECT * FROM agents WHERE id=?", (job["agent_id"],)).fetchone()
if not agent:
raise ValueError("Agent not found")
steps = json.loads(agent["steps"])
input_text = job["prompt_text"] or f"Scheduled run of agent: {agent['name']}"
# Reuse the existing agent execution helper
agent_run_id = db.execute(
"INSERT INTO agent_runs (agent_id,agent_name,user_id,username,input,status,created_at) VALUES (?,?,?,?,?,'pending',?)",
(agent["id"], agent["name"], job["user_id"], job["username"], input_text, utcnow())
).lastrowid
db.commit()
loop = _asyncio.new_event_loop()
try:
result = loop.run_until_complete(_execute_agent(steps, input_text, agent))
result = loop.run_until_complete(_execute_agent(agent_run_id))
finally:
loop.close()
output = result.get("output", "")
if result.get("status") == "error":
raise ValueError(result.get("error") or "Agent run failed")
except Exception as e:
error = str(e)
@ -4475,12 +4500,66 @@ def _connector_from_row(row) -> dict:
"stats": _json_load(d["stats_json"], {"files": 0, "rowsRead": 0, "lastSync": None, "errors": 0}),
}
def _estimate_tokens(text: str) -> int:
return max(1, round(len((text or "").split()) * 1.3))
def _select_router_model(prompt: str, default_model: str, context: Optional[dict] = None) -> dict:
"""Apply enabled model-router rules to backend model calls."""
ctx = context or {}
token_est = _estimate_tokens(prompt)
content_type = ctx.get("content_type") or "text"
user_role = ctx.get("user_role") or ""
db = get_db()
try:
rows = db.execute(
"SELECT * FROM router_rules WHERE enabled=1 ORDER BY priority, id"
).fetchall()
finally:
db.close()
for row in rows:
conditions = _json_load(row["conditions_json"], [])
matched = True
for cond in conditions:
ctype = (cond.get("type") or "").strip()
value = str(cond.get("value") or "").strip()
if ctype == "keyword":
keywords = [k.strip().lower() for k in value.split(",") if k.strip()]
matched = any(k in (prompt or "").lower() for k in keywords)
elif ctype == "content_type":
matched = value == content_type
elif ctype == "token_gt":
try:
matched = token_est > int(value or 0)
except ValueError:
matched = False
elif ctype == "token_lt":
try:
matched = token_est < int(value or 0)
except ValueError:
matched = False
elif ctype == "user_role":
matched = value == user_role
else:
matched = False
if not matched:
break
if matched and row["model"]:
return {
"model": row["model"],
"route": row["name"],
"max_tokens": row["max_tokens"],
"temperature": row["temperature"],
}
return {"model": default_model or "llama3", "route": "default", "max_tokens": None, "temperature": None}
def _call_ollama_generate(model: str, prompt: str, max_tokens: int = 512) -> str:
routed = _select_router_model(prompt, model, {"content_type": "text"})
payload = json.dumps({
"model": model or "llama3",
"model": routed["model"],
"prompt": prompt,
"stream": False,
"options": {"num_predict": max_tokens}
"options": {"num_predict": int(routed.get("max_tokens") or max_tokens)}
}).encode()
req = urllib.request.Request(f"{OLLAMA_URL}/api/generate", data=payload, method="POST")
req.add_header("Content-Type", "application/json")
@ -4500,6 +4579,185 @@ def _summarise_locally(text: str) -> str:
preview = " ".join(words[:80])
return f"Summary generated from {len(words)} words. Key discussion: {preview}{'...' if len(words) > 80 else ''}"
def _transcribe_meeting_audio(raw: bytes, suffix: str) -> str:
try:
from faster_whisper import WhisperModel
except ImportError as exc:
raise HTTPException(
status_code=501,
detail="Audio transcription requires faster-whisper. Install backend requirements and ensure ffmpeg is available.",
) from exc
model_size = os.environ.get("CEZEN_WHISPER_MODEL", "base")
device = os.environ.get("CEZEN_WHISPER_DEVICE", "cpu")
compute_type = os.environ.get("CEZEN_WHISPER_COMPUTE_TYPE", "int8")
cache_key = (model_size, device, compute_type)
if getattr(_transcribe_meeting_audio, "_cache_key", None) != cache_key:
_transcribe_meeting_audio._model = WhisperModel(model_size, device=device, compute_type=compute_type)
_transcribe_meeting_audio._cache_key = cache_key
with tempfile.NamedTemporaryFile(suffix=suffix or ".audio", delete=False) as tmp:
tmp.write(raw)
tmp_path = tmp.name
try:
segments, _info = _transcribe_meeting_audio._model.transcribe(tmp_path, vad_filter=True)
transcript = "\n".join(seg.text.strip() for seg in segments if seg.text.strip())
finally:
try:
Path(tmp_path).unlink(missing_ok=True)
except Exception:
pass
if not transcript:
raise HTTPException(status_code=422, detail="No speech could be transcribed from this file")
return transcript
class WorkflowStop(Exception):
def __init__(self, message: str):
super().__init__(message)
self.message = message
def _workflow_condition_true(condition: str, values: dict) -> bool:
expr = _template(condition or "", values).strip()
if not expr:
return True
if "==" in expr:
left, right = expr.split("==", 1)
return left.strip().strip("'\"") == right.strip().strip("'\"")
if "!=" in expr:
left, right = expr.split("!=", 1)
return left.strip().strip("'\"") != right.strip().strip("'\"")
for op in (">=", "<=", ">", "<"):
if op in expr:
left, right = expr.split(op, 1)
try:
a = float(left.strip())
b = float(right.strip())
except ValueError:
return False
return {">=": a >= b, "<=": a <= b, ">": a > b, "<": a < b}[op]
return expr.lower() in ("true", "yes", "1", "continue")
def _workflow_rag_search(cfg: dict, values: dict) -> str:
query = _template(cfg.get("query") or values.get("input") or "", values).strip()
collection = cfg.get("collection") or cfg.get("collection_id")
if not query:
raise ValueError("RAG search requires input or query")
db = get_db()
try:
if str(collection).isdigit():
col = db.execute("SELECT * FROM kb_collections WHERE id=?", (int(collection),)).fetchone()
else:
col = db.execute("SELECT * FROM kb_collections WHERE name=? OR chroma_name=?", (str(collection), str(collection))).fetchone()
finally:
db.close()
if not col:
raise ValueError(f"Knowledge collection not found: {collection or '(blank)'}")
embed = _ollama_embed([query], model=col["embed_model"])
result = _chroma_req("POST", f"/api/v1/collections/{col['chroma_name']}/query", body={
"query_embeddings": embed,
"n_results": int(cfg.get("n_results") or 5),
"include": ["documents", "metadatas", "distances"],
})
docs = result.get("documents", [[]])[0]
metas = result.get("metadatas", [[]])[0]
items = []
for text, meta in zip(docs, metas):
source = meta.get("source") or "knowledge base"
page = f" p.{meta.get('page')}" if meta.get("page") else ""
items.append(f"[{source}{page}] {text}")
return "\n\n".join(items) if items else "No matching knowledge passages found."
def _ensure_kb_collection(name: str) -> sqlite3.Row:
db = get_db()
try:
row = db.execute("SELECT * FROM kb_collections WHERE name=? OR chroma_name=?", (name, name)).fetchone()
if row:
return row
import re as _re
chroma_name = "cezen-" + _re.sub(r"[^a-z0-9-]", "-", name.lower())[:55]
try:
_chroma_req("POST", "/api/v1/collections", body={"name": chroma_name, "metadata": {"hnsw:space": "cosine"}})
except HTTPException as e:
if e.status_code not in (409,):
raise
cur = db.execute(
"INSERT INTO kb_collections (name, description, chroma_name, embed_model, created_at) VALUES (?,?,?,?,?)",
(name, "Created by workflow automation", chroma_name, OLLAMA_EMBED, utcnow())
)
db.commit()
return db.execute("SELECT * FROM kb_collections WHERE id=?", (cur.lastrowid,)).fetchone()
finally:
db.close()
def _workflow_save_kb(cfg: dict, values: dict) -> str:
collection_name = cfg.get("collection") or "Workflow Outputs"
text = _template(cfg.get("text") or cfg.get("body") or values.get("input") or values.get("step1.output") or "", values).strip()
if not text:
raise ValueError("Save to KB requires text/body/input")
col = _ensure_kb_collection(collection_name)
col_dir = KB_DOCS_DIR / str(col["id"])
col_dir.mkdir(exist_ok=True)
dest = col_dir / f"workflow-{uuid.uuid4().hex}.txt"
dest.write_text(text, encoding="utf-8")
db = get_db()
try:
doc_id = db.execute(
"INSERT INTO kb_documents (collection_id, orig_name, file_path, size_bytes, status, uploaded_at) VALUES (?,?,?,?,?,?)",
(col["id"], dest.name, str(dest), dest.stat().st_size, "pending", utcnow())
).lastrowid
db.commit()
finally:
db.close()
runner = Path(__file__).parent / "rag_ingest.py"
subprocess.Popen(
[sys.executable, str(runner), "--doc-id", str(doc_id), "--db-path", str(DB_PATH), "--file", str(dest),
"--collection", col["chroma_name"], "--embed-model", col["embed_model"], "--chroma-url", CHROMA_URL,
"--ollama-url", OLLAMA_URL],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True,
)
return f"Saved document {doc_id} to knowledge collection {col['name']} and started ingestion."
def _workflow_send_email(cfg: dict, values: dict) -> str:
import smtplib
from email.message import EmailMessage
host = os.environ.get("CEZEN_SMTP_HOST")
if not host:
raise RuntimeError("SMTP is not configured. Set CEZEN_SMTP_HOST and related CEZEN_SMTP_* variables.")
port = int(os.environ.get("CEZEN_SMTP_PORT", "587"))
sender = os.environ.get("CEZEN_SMTP_FROM") or os.environ.get("CEZEN_SMTP_USER")
recipient = _template(cfg.get("to") or "", values)
if not sender or not recipient:
raise RuntimeError("Email step requires sender and recipient")
msg = EmailMessage()
msg["From"] = sender
msg["To"] = recipient
msg["Subject"] = _template(cfg.get("subject") or "Nexus One AI workflow output", values)
msg.set_content(_template(cfg.get("body") or cfg.get("bodyTemplate") or values.get("input") or "", values))
with smtplib.SMTP(host, port, timeout=30) as smtp:
if os.environ.get("CEZEN_SMTP_TLS", "true").lower() in ("1", "true", "yes"):
smtp.starttls()
if os.environ.get("CEZEN_SMTP_USER"):
smtp.login(os.environ["CEZEN_SMTP_USER"], os.environ.get("CEZEN_SMTP_PASSWORD", ""))
smtp.send_message(msg)
return f"Email sent to {recipient}."
def _workflow_http_request(cfg: dict, values: dict) -> str:
import urllib.parse
url = _template(cfg.get("url") or "", values).replace("{{_API}}", "http://localhost:8080")
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError("HTTP step requires an http:// or https:// URL")
method = (cfg.get("method") or "POST").upper()
headers = cfg.get("headers") or {}
if isinstance(headers, str):
headers = _json_load(headers, {})
body_text = _template(cfg.get("bodyTemplate") or cfg.get("body") or "", values)
data = body_text.encode() if method not in ("GET", "HEAD") and body_text else None
req = urllib.request.Request(url, data=data, headers=headers, method=method)
if data and "Content-Type" not in headers:
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=int(cfg.get("timeout") or 60)) as resp:
payload = resp.read(512 * 1024).decode(errors="replace")
return f"HTTP {method} {url} -> {resp.status}\n{payload}"
def _meeting_result_from_text(transcript: str, meta: dict, opts: dict) -> dict:
prompt = (
"Analyse this meeting transcript and return concise meeting output with sections: "
@ -4568,15 +4826,19 @@ def _workflow_step_output(step: dict, values: dict) -> str:
if stype == "summarise":
return _summarise_locally(values.get("input", ""))
if stype == "rag_search":
return f"Knowledge search prepared for collection {cfg.get('collection') or 'default'}."
return _workflow_rag_search(cfg, values)
if stype == "save_kb":
return f"Queued save into knowledge collection {cfg.get('collection') or 'default'}."
return _workflow_save_kb(cfg, values)
if stype == "filter":
return "Condition evaluated as true."
matched = _workflow_condition_true(cfg.get("condition") or "", values)
action = cfg.get("trueAction" if matched else "falseAction") or "continue"
if action == "stop":
raise WorkflowStop(f"Condition evaluated as {str(matched).lower()}; workflow stopped.")
return f"Condition evaluated as {str(matched).lower()}; continuing workflow."
if stype == "email":
return f"Email queued for {cfg.get('to') or 'recipient'}."
return _workflow_send_email(cfg, values)
if stype == "http":
return f"{cfg.get('method', 'POST')} request prepared for {cfg.get('url') or 'endpoint'}."
return _workflow_http_request(cfg, values)
return "Step completed."
@app.get("/api/models/list")
@ -4650,7 +4912,11 @@ async def run_workflow_api(workflow_id: str, body: dict, user: dict = Depends(cu
outputs = {}
try:
for idx, step in enumerate(wf.get("steps") or [], start=1):
out = _workflow_step_output(step, values)
try:
out = _workflow_step_output(step, values)
except WorkflowStop as stop:
log.append({"step": step.get("name") or step.get("type") or f"Step {idx}", "status": "stopped", "detail": stop.message})
break
key = (step.get("config") or {}).get("outputVar") or f"step{idx}.output"
values[key] = out
values[f"step{idx}.output"] = out
@ -4711,6 +4977,86 @@ async def save_connectors(body: list[dict], user: dict = Depends(current_user)):
db.commit(); db.close()
return {"ok": True}
def _connector_db_stats(cfg: dict) -> tuple[int, int]:
db_type = (cfg.get("dbType") or cfg.get("engine") or cfg.get("type") or "sqlite").lower()
tables = [t.strip() for t in (cfg.get("tables") or "").split(",") if t.strip()]
if db_type == "sqlite":
path = cfg.get("path") or cfg.get("database")
if not path:
raise ValueError("SQLite connector requires path or database")
conn = sqlite3.connect(path)
try:
if not tables:
tables = [r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'").fetchall()]
rows = 0
for table in tables:
if not table.replace("_", "").replace("-", "").isalnum():
raise ValueError(f"Unsafe table name: {table}")
rows += int(conn.execute(f'SELECT COUNT(*) FROM "{table}"').fetchone()[0])
return len(tables), rows
finally:
conn.close()
if db_type in ("postgres", "postgresql"):
try:
import psycopg2
except ImportError as exc:
raise RuntimeError("PostgreSQL connector requires psycopg2-binary") from exc
conn = psycopg2.connect(
host=cfg.get("host") or "localhost",
port=int(cfg.get("port") or 5432),
dbname=cfg.get("database"),
user=cfg.get("username") or cfg.get("user"),
password=cfg.get("password") or "",
connect_timeout=int(cfg.get("timeout") or 10),
)
try:
with conn.cursor() as cur:
if not tables:
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE'")
tables = [r[0] for r in cur.fetchall()]
rows = 0
for table in tables:
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='public' AND table_name=%s", (table,))
if cur.fetchone()[0] != 1:
raise ValueError(f"Table not found: {table}")
safe_table = table.replace('"', '""')
cur.execute(f'SELECT COUNT(*) FROM "{safe_table}"')
rows += int(cur.fetchone()[0])
return len(tables), rows
finally:
conn.close()
if db_type in ("mysql", "mariadb"):
try:
import pymysql
except ImportError as exc:
raise RuntimeError("MySQL connector requires pymysql") from exc
conn = pymysql.connect(
host=cfg.get("host") or "localhost",
port=int(cfg.get("port") or 3306),
database=cfg.get("database"),
user=cfg.get("username") or cfg.get("user"),
password=cfg.get("password") or "",
connect_timeout=int(cfg.get("timeout") or 10),
read_timeout=int(cfg.get("timeout") or 10),
)
try:
with conn.cursor() as cur:
if not tables:
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema=%s AND table_type='BASE TABLE'", (cfg.get("database"),))
tables = [r[0] for r in cur.fetchall()]
rows = 0
for table in tables:
cur.execute("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema=%s AND table_name=%s", (cfg.get("database"), table))
if cur.fetchone()[0] != 1:
raise ValueError(f"Table not found: {table}")
safe_table = table.replace("`", "``")
cur.execute(f"SELECT COUNT(*) FROM `{safe_table}`")
rows += int(cur.fetchone()[0])
return len(tables), rows
finally:
conn.close()
raise ValueError(f"Unsupported database connector type: {db_type}")
@app.post("/api/connectors/{connector_id}/sync")
async def sync_connector(connector_id: str, user: dict = Depends(current_user)):
db = get_db()
@ -4738,11 +5084,15 @@ async def sync_connector(connector_id: str, user: dict = Depends(current_user)):
level = "warn"; status = "error"; errors = int(stats.get("errors") or 0) + 1
stats.update({"files": files, "lastSync": utcnow(), "errors": errors})
else:
tables = [t.strip() for t in (cfg.get("tables") or "").split(",") if t.strip()]
rows_read = max(int(stats.get("rowsRead") or 0), len(tables) * 100)
stats.update({"rowsRead": rows_read, "lastSync": utcnow(), "errors": 0})
msg = f"[{c['name']}] Read-only schema check complete — {len(tables)} tables configured"
level = "ok"; status = "ok"
try:
table_count, rows_read = _connector_db_stats(cfg)
stats.update({"rowsRead": rows_read, "tables": table_count, "lastSync": utcnow(), "errors": 0})
msg = f"[{c['name']}] Read-only sync complete — {table_count} tables, {rows_read} rows visible"
level = "ok"; status = "ok"
except Exception as e:
stats.update({"lastSync": utcnow(), "errors": int(stats.get("errors") or 0) + 1})
msg = f"[{c['name']}] Database sync failed: {e}"
level = "error"; status = "error"
db.execute("UPDATE connectors SET status=?, stats_json=?, updated_at=? WHERE id=?",
(status, json.dumps(stats), utcnow(), connector_id))
db.execute("INSERT INTO connector_log (connector_id,level,msg,ts) VALUES (?,?,?,?)",
@ -4903,10 +5253,7 @@ async def process_meeting(file: UploadFile = File(...), meta: str = Form("{}"),
if suffix in (".txt", ".md", ".vtt", ".srt"):
text = raw.decode(errors="replace")
if not text:
text = (
f"Uploaded meeting file: {file.filename or 'audio'} ({round(len(raw)/1024/1024, 2)} MB). "
"Audio transcription service is not configured on this node yet, so this record was processed from file metadata."
)
text = _transcribe_meeting_audio(raw, suffix)
result = _meeting_result_from_text(text, meta_obj, meta_obj.get("opts") or {})
db = get_db()
db.execute(

View File

@ -0,0 +1,274 @@
#!/usr/bin/env python3
"""
Nexus One AI RAG Document Ingest Worker
Launched as a subprocess by the FastAPI backend when a document is uploaded.
Pipeline:
1. Extract text from PDF / DOCX / TXT / MD / CSV
2. Chunk into overlapping segments
3. Embed each chunk via Ollama /api/embeddings
4. Store chunks + embeddings in ChromaDB
5. Update kb_documents status in SQLite
Usage (called by main.py):
python3 rag_ingest.py \\
--doc-id 1 --db-path /opt/cezen/data/cezen.db \\
--file /opt/cezen/data/kb_docs/1/abc.pdf \\
--collection cezen-myknowledgebase \\
--embed-model nomic-embed-text \\
--chroma-url http://localhost:8000 \\
--ollama-url http://localhost:11434
"""
import argparse, json, os, sqlite3, sys, uuid
from datetime import datetime, timezone
from pathlib import Path
parser = argparse.ArgumentParser()
parser.add_argument("--doc-id", type=int, required=True)
parser.add_argument("--db-path", required=True)
parser.add_argument("--file", required=True)
parser.add_argument("--collection", required=True)
parser.add_argument("--embed-model", default="nomic-embed-text")
parser.add_argument("--chroma-url", default="http://localhost:8000")
parser.add_argument("--ollama-url", default="http://localhost:11434")
args = parser.parse_args()
CHUNK_SIZE = 512 # tokens/chars per chunk
CHUNK_OVERLAP = 64 # overlap between consecutive chunks
BATCH_SIZE = 16 # how many chunks to embed + upsert at once
def utcnow():
return datetime.now(timezone.utc).isoformat()
def db_connect():
conn = sqlite3.connect(args.db_path)
conn.row_factory = sqlite3.Row
return conn
def set_status(status: str, chunk_count: int = 0, error: str = None):
db = db_connect()
db.execute(
"UPDATE kb_documents SET status=?, chunk_count=?, error_msg=?, processed_at=? WHERE id=?",
(status, chunk_count, error, utcnow(), args.doc_id)
)
if status == "ready" and chunk_count > 0:
# Update collection counters
db.execute("""
UPDATE kb_collections SET
doc_count = (SELECT COUNT(*) FROM kb_documents WHERE collection_id=(SELECT collection_id FROM kb_documents WHERE id=?) AND status='ready'),
chunk_count = chunk_count + ?
WHERE id = (SELECT collection_id FROM kb_documents WHERE id=?)
""", (args.doc_id, chunk_count, args.doc_id))
db.commit()
db.close()
# ── Text extraction ───────────────────────────────────────────────────────────
def extract_pdf(path: str) -> list[dict]:
"""Returns list of {text, page} dicts."""
pages = []
try:
import pypdf
reader = pypdf.PdfReader(path)
for i, page in enumerate(reader.pages):
text = (page.extract_text() or "").strip()
if text:
pages.append({"text": text, "page": i + 1})
except ImportError:
# Fallback: try pdfminer
try:
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer
page_num = 0
for page_layout in extract_pages(path):
page_num += 1
texts = []
for element in page_layout:
if isinstance(element, LTTextContainer):
texts.append(element.get_text())
text = "".join(texts).strip()
if text:
pages.append({"text": text, "page": page_num})
except ImportError:
raise RuntimeError("Install pypdf or pdfminer.six: pip install pypdf")
return pages
def extract_docx(path: str) -> list[dict]:
try:
from docx import Document
doc = Document(path)
text = "\n".join(p.text for p in doc.paragraphs if p.text.strip())
return [{"text": text, "page": None}]
except ImportError:
raise RuntimeError("Install python-docx: pip install python-docx")
def extract_text(path: str, ext: str) -> list[dict]:
"""Returns list of {text, page} dicts by file type."""
if ext == ".pdf":
return extract_pdf(path)
if ext in (".docx", ".doc"):
return extract_docx(path)
if ext == ".csv":
import csv
with open(path, encoding="utf-8", errors="replace") as f:
rows = list(csv.reader(f))
if not rows:
return []
headers = rows[0]
lines = [", ".join(f"{h}: {v}" for h, v in zip(headers, row)) for row in rows[1:] if any(row)]
return [{"text": "\n".join(lines), "page": None}]
# TXT, MD, and anything else
text = Path(path).read_text(encoding="utf-8", errors="replace")
return [{"text": text, "page": None}]
# ── Chunking ──────────────────────────────────────────────────────────────────
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
"""Split text into overlapping chunks by character count."""
if not text:
return []
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Try to break at a sentence/paragraph boundary
if end < len(text):
for sep in ["\n\n", "\n", ". ", " "]:
idx = chunk.rfind(sep)
if idx > chunk_size // 2:
chunk = chunk[:idx + len(sep)]
break
chunks.append(chunk.strip())
start += len(chunk) - overlap
if start >= len(text):
break
return [c for c in chunks if c]
# ── Embedding ─────────────────────────────────────────────────────────────────
def embed_batch(texts: list[str]) -> list[list[float]]:
"""Embed a batch of texts via Ollama."""
import urllib.request, urllib.error
embeddings = []
for text in texts:
body = json.dumps({"model": args.embed_model, "prompt": text}).encode()
req = urllib.request.Request(
f"{args.ollama_url}/api/embeddings", data=body, method="POST"
)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=60) as r:
embeddings.append(json.loads(r.read().decode())["embedding"])
except Exception as e:
raise RuntimeError(f"Ollama embedding error: {e}")
return embeddings
# ── ChromaDB upsert ───────────────────────────────────────────────────────────
def chroma_upsert(ids, embeddings, documents, metadatas):
"""Upsert a batch of chunks into ChromaDB."""
import urllib.request, urllib.error
body = json.dumps({
"ids": ids,
"embeddings": embeddings,
"documents": documents,
"metadatas": metadatas,
}).encode()
req = urllib.request.Request(
f"{args.chroma_url}/api/v1/collections/{args.collection}/upsert",
data=body, method="POST"
)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
raise RuntimeError(f"ChromaDB upsert error {e.code}: {e.read().decode()}")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
set_status("processing")
file_path = args.file
ext = Path(file_path).suffix.lower()
source_name = Path(file_path).name
# Get original filename from DB
try:
db = db_connect()
row = db.execute("SELECT orig_name FROM kb_documents WHERE id=?", (args.doc_id,)).fetchone()
db.close()
if row:
source_name = row["orig_name"]
except Exception:
pass
# Extract text
try:
pages = extract_text(file_path, ext)
except Exception as e:
set_status("failed", error=str(e))
sys.exit(1)
if not pages:
set_status("failed", error="No text could be extracted from the document")
sys.exit(1)
# Chunk all pages
all_chunks = []
for page_info in pages:
chunks = chunk_text(page_info["text"])
for i, chunk in enumerate(chunks):
all_chunks.append({
"text": chunk,
"page": page_info.get("page"),
"chunk": len(all_chunks) + i,
"source": source_name,
"doc_id": args.doc_id,
})
if not all_chunks:
set_status("failed", error="Document produced no text chunks after processing")
sys.exit(1)
# Embed and upsert in batches
total = len(all_chunks)
processed = 0
try:
for batch_start in range(0, total, BATCH_SIZE):
batch = all_chunks[batch_start:batch_start + BATCH_SIZE]
texts = [c["text"] for c in batch]
embeddings = embed_batch(texts)
ids = [f"doc{args.doc_id}-chunk{c['chunk']}" for c in batch]
metas = [{
"doc_id": c["doc_id"],
"source": c["source"],
"page": c["page"] or 0,
"chunk": c["chunk"],
} for c in batch]
chroma_upsert(ids, embeddings, texts, metas)
processed += len(batch)
except Exception as e:
import traceback
set_status("failed", chunk_count=processed, error=f"{e}\n{traceback.format_exc()[:500]}")
sys.exit(1)
set_status("ready", chunk_count=total)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
set_status("failed", error="Interrupted")
sys.exit(130)
except Exception as e:
import traceback
set_status("failed", error=f"{e}\n{traceback.format_exc()[:500]}")
sys.exit(1)

View File

@ -9,5 +9,12 @@ aiofiles>=23.0.0
# Document Intelligence
pymupdf>=1.24.0 # PDF text extraction (fitz)
python-docx>=1.1.0 # Word document extraction
pypdf>=4.2.0 # RAG ingest PDF text extraction
pdfminer.six>=20231228 # RAG ingest PDF fallback parser
# Scheduled Jobs
apscheduler>=3.10.0 # In-process cron/interval scheduler
# Meeting audio transcription
faster-whisper>=1.0.0
# Connector drivers
psycopg2-binary>=2.9.9
pymysql>=1.1.0

View File

@ -10,6 +10,7 @@
- python3.11-venv
- libmupdf-dev # required by pymupdf (Document Intelligence)
- mupdf-tools
- ffmpeg # required by faster-whisper audio transcription
state: present
update_cache: yes
@ -46,6 +47,15 @@
group: "{{ cezen_user }}"
mode: "0755"
- name: Copy RAG ingest worker
copy:
src: rag_ingest.py
dest: /opt/cezen/backend/rag_ingest.py
owner: "{{ cezen_user }}"
group: "{{ cezen_user }}"
mode: "0755"
notify: Restart cezen-api
- name: Copy requirements.txt
copy:
src: requirements.txt