#!/usr/bin/env python3 """ Nexus One AI — RAG Document Ingest Worker Launched as a subprocess by the FastAPI backend when a document is uploaded. Pipeline: 1. Extract text from PDF / DOCX / TXT / MD / CSV 2. Chunk into overlapping segments 3. Embed each chunk via Ollama /api/embeddings 4. Store chunks + embeddings in ChromaDB 5. Update kb_documents status in SQLite Usage (called by main.py): python3 rag_ingest.py \\ --doc-id 1 --db-path /opt/cezen/data/cezen.db \\ --file /opt/cezen/data/kb_docs/1/abc.pdf \\ --collection cezen-myknowledgebase \\ --embed-model nomic-embed-text \\ --chroma-url http://localhost:8000 \\ --ollama-url http://localhost:11434 """ import argparse, json, os, sqlite3, sys, uuid from datetime import datetime, timezone from pathlib import Path parser = argparse.ArgumentParser() parser.add_argument("--doc-id", type=int, required=True) parser.add_argument("--db-path", required=True) parser.add_argument("--file", required=True) parser.add_argument("--collection", required=True) parser.add_argument("--embed-model", default="nomic-embed-text") parser.add_argument("--chroma-url", default="http://localhost:8000") parser.add_argument("--ollama-url", default="http://localhost:11434") args = parser.parse_args() CHUNK_SIZE = 512 # tokens/chars per chunk CHUNK_OVERLAP = 64 # overlap between consecutive chunks BATCH_SIZE = 16 # how many chunks to embed + upsert at once def utcnow(): return datetime.now(timezone.utc).isoformat() def db_connect(): conn = sqlite3.connect(args.db_path) conn.row_factory = sqlite3.Row return conn def set_status(status: str, chunk_count: int = 0, error: str = None): db = db_connect() db.execute( "UPDATE kb_documents SET status=?, chunk_count=?, error_msg=?, processed_at=? WHERE id=?", (status, chunk_count, error, utcnow(), args.doc_id) ) if status == "ready" and chunk_count > 0: # Update collection counters db.execute(""" UPDATE kb_collections SET doc_count = (SELECT COUNT(*) FROM kb_documents WHERE collection_id=(SELECT collection_id FROM kb_documents WHERE id=?) AND status='ready'), chunk_count = chunk_count + ? WHERE id = (SELECT collection_id FROM kb_documents WHERE id=?) """, (args.doc_id, chunk_count, args.doc_id)) db.commit() db.close() # ── Text extraction ─────────────────────────────────────────────────────────── def extract_pdf(path: str) -> list[dict]: """Returns list of {text, page} dicts.""" pages = [] try: import pypdf reader = pypdf.PdfReader(path) for i, page in enumerate(reader.pages): text = (page.extract_text() or "").strip() if text: pages.append({"text": text, "page": i + 1}) except ImportError: # Fallback: try pdfminer try: from pdfminer.high_level import extract_pages from pdfminer.layout import LTTextContainer page_num = 0 for page_layout in extract_pages(path): page_num += 1 texts = [] for element in page_layout: if isinstance(element, LTTextContainer): texts.append(element.get_text()) text = "".join(texts).strip() if text: pages.append({"text": text, "page": page_num}) except ImportError: raise RuntimeError("Install pypdf or pdfminer.six: pip install pypdf") return pages def extract_docx(path: str) -> list[dict]: try: from docx import Document doc = Document(path) text = "\n".join(p.text for p in doc.paragraphs if p.text.strip()) return [{"text": text, "page": None}] except ImportError: raise RuntimeError("Install python-docx: pip install python-docx") def extract_text(path: str, ext: str) -> list[dict]: """Returns list of {text, page} dicts by file type.""" if ext == ".pdf": return extract_pdf(path) if ext in (".docx", ".doc"): return extract_docx(path) if ext == ".csv": import csv with open(path, encoding="utf-8", errors="replace") as f: rows = list(csv.reader(f)) if not rows: return [] headers = rows[0] lines = [", ".join(f"{h}: {v}" for h, v in zip(headers, row)) for row in rows[1:] if any(row)] return [{"text": "\n".join(lines), "page": None}] # TXT, MD, and anything else text = Path(path).read_text(encoding="utf-8", errors="replace") return [{"text": text, "page": None}] # ── Chunking ────────────────────────────────────────────────────────────────── def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: """Split text into overlapping chunks by character count.""" if not text: return [] chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] # Try to break at a sentence/paragraph boundary if end < len(text): for sep in ["\n\n", "\n", ". ", " "]: idx = chunk.rfind(sep) if idx > chunk_size // 2: chunk = chunk[:idx + len(sep)] break chunks.append(chunk.strip()) start += len(chunk) - overlap if start >= len(text): break return [c for c in chunks if c] # ── Embedding ───────────────────────────────────────────────────────────────── def embed_batch(texts: list[str]) -> list[list[float]]: """Embed a batch of texts via Ollama.""" import urllib.request, urllib.error embeddings = [] for text in texts: body = json.dumps({"model": args.embed_model, "prompt": text}).encode() req = urllib.request.Request( f"{args.ollama_url}/api/embeddings", data=body, method="POST" ) req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=60) as r: embeddings.append(json.loads(r.read().decode())["embedding"]) except Exception as e: raise RuntimeError(f"Ollama embedding error: {e}") return embeddings # ── ChromaDB upsert ─────────────────────────────────────────────────────────── def chroma_upsert(ids, embeddings, documents, metadatas): """Upsert a batch of chunks into ChromaDB.""" import urllib.request, urllib.error body = json.dumps({ "ids": ids, "embeddings": embeddings, "documents": documents, "metadatas": metadatas, }).encode() req = urllib.request.Request( f"{args.chroma_url}/api/v1/collections/{args.collection}/upsert", data=body, method="POST" ) req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=30) as r: return json.loads(r.read().decode()) except urllib.error.HTTPError as e: raise RuntimeError(f"ChromaDB upsert error {e.code}: {e.read().decode()}") # ── Main ────────────────────────────────────────────────────────────────────── def main(): set_status("processing") file_path = args.file ext = Path(file_path).suffix.lower() source_name = Path(file_path).name # Get original filename from DB try: db = db_connect() row = db.execute("SELECT orig_name FROM kb_documents WHERE id=?", (args.doc_id,)).fetchone() db.close() if row: source_name = row["orig_name"] except Exception: pass # Extract text try: pages = extract_text(file_path, ext) except Exception as e: set_status("failed", error=str(e)) sys.exit(1) if not pages: set_status("failed", error="No text could be extracted from the document") sys.exit(1) # Chunk all pages all_chunks = [] for page_info in pages: chunks = chunk_text(page_info["text"]) for i, chunk in enumerate(chunks): all_chunks.append({ "text": chunk, "page": page_info.get("page"), "chunk": len(all_chunks) + i, "source": source_name, "doc_id": args.doc_id, }) if not all_chunks: set_status("failed", error="Document produced no text chunks after processing") sys.exit(1) # Embed and upsert in batches total = len(all_chunks) processed = 0 try: for batch_start in range(0, total, BATCH_SIZE): batch = all_chunks[batch_start:batch_start + BATCH_SIZE] texts = [c["text"] for c in batch] embeddings = embed_batch(texts) ids = [f"doc{args.doc_id}-chunk{c['chunk']}" for c in batch] metas = [{ "doc_id": c["doc_id"], "source": c["source"], "page": c["page"] or 0, "chunk": c["chunk"], } for c in batch] chroma_upsert(ids, embeddings, texts, metas) processed += len(batch) except Exception as e: import traceback set_status("failed", chunk_count=processed, error=f"{e}\n{traceback.format_exc()[:500]}") sys.exit(1) set_status("ready", chunk_count=total) if __name__ == "__main__": try: main() except KeyboardInterrupt: set_status("failed", error="Interrupted") sys.exit(130) except Exception as e: import traceback set_status("failed", error=f"{e}\n{traceback.format_exc()[:500]}") sys.exit(1)