275 lines
10 KiB
Python
275 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Nexus One AI — RAG Document Ingest Worker
|
|
Launched as a subprocess by the FastAPI backend when a document is uploaded.
|
|
|
|
Pipeline:
|
|
1. Extract text from PDF / DOCX / TXT / MD / CSV
|
|
2. Chunk into overlapping segments
|
|
3. Embed each chunk via Ollama /api/embeddings
|
|
4. Store chunks + embeddings in ChromaDB
|
|
5. Update kb_documents status in SQLite
|
|
|
|
Usage (called by main.py):
|
|
python3 rag_ingest.py \\
|
|
--doc-id 1 --db-path /opt/cezen/data/cezen.db \\
|
|
--file /opt/cezen/data/kb_docs/1/abc.pdf \\
|
|
--collection cezen-myknowledgebase \\
|
|
--embed-model nomic-embed-text \\
|
|
--chroma-url http://localhost:8000 \\
|
|
--ollama-url http://localhost:11434
|
|
"""
|
|
|
|
import argparse, json, os, sqlite3, sys, uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--doc-id", type=int, required=True)
|
|
parser.add_argument("--db-path", required=True)
|
|
parser.add_argument("--file", required=True)
|
|
parser.add_argument("--collection", required=True)
|
|
parser.add_argument("--embed-model", default="nomic-embed-text")
|
|
parser.add_argument("--chroma-url", default="http://localhost:8000")
|
|
parser.add_argument("--ollama-url", default="http://localhost:11434")
|
|
args = parser.parse_args()
|
|
|
|
CHUNK_SIZE = 512 # tokens/chars per chunk
|
|
CHUNK_OVERLAP = 64 # overlap between consecutive chunks
|
|
BATCH_SIZE = 16 # how many chunks to embed + upsert at once
|
|
|
|
def utcnow():
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
def db_connect():
|
|
conn = sqlite3.connect(args.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def set_status(status: str, chunk_count: int = 0, error: str = None):
|
|
db = db_connect()
|
|
db.execute(
|
|
"UPDATE kb_documents SET status=?, chunk_count=?, error_msg=?, processed_at=? WHERE id=?",
|
|
(status, chunk_count, error, utcnow(), args.doc_id)
|
|
)
|
|
if status == "ready" and chunk_count > 0:
|
|
# Update collection counters
|
|
db.execute("""
|
|
UPDATE kb_collections SET
|
|
doc_count = (SELECT COUNT(*) FROM kb_documents WHERE collection_id=(SELECT collection_id FROM kb_documents WHERE id=?) AND status='ready'),
|
|
chunk_count = chunk_count + ?
|
|
WHERE id = (SELECT collection_id FROM kb_documents WHERE id=?)
|
|
""", (args.doc_id, chunk_count, args.doc_id))
|
|
db.commit()
|
|
db.close()
|
|
|
|
# ── Text extraction ───────────────────────────────────────────────────────────
|
|
|
|
def extract_pdf(path: str) -> list[dict]:
|
|
"""Returns list of {text, page} dicts."""
|
|
pages = []
|
|
try:
|
|
import pypdf
|
|
reader = pypdf.PdfReader(path)
|
|
for i, page in enumerate(reader.pages):
|
|
text = (page.extract_text() or "").strip()
|
|
if text:
|
|
pages.append({"text": text, "page": i + 1})
|
|
except ImportError:
|
|
# Fallback: try pdfminer
|
|
try:
|
|
from pdfminer.high_level import extract_pages
|
|
from pdfminer.layout import LTTextContainer
|
|
page_num = 0
|
|
for page_layout in extract_pages(path):
|
|
page_num += 1
|
|
texts = []
|
|
for element in page_layout:
|
|
if isinstance(element, LTTextContainer):
|
|
texts.append(element.get_text())
|
|
text = "".join(texts).strip()
|
|
if text:
|
|
pages.append({"text": text, "page": page_num})
|
|
except ImportError:
|
|
raise RuntimeError("Install pypdf or pdfminer.six: pip install pypdf")
|
|
return pages
|
|
|
|
def extract_docx(path: str) -> list[dict]:
|
|
try:
|
|
from docx import Document
|
|
doc = Document(path)
|
|
text = "\n".join(p.text for p in doc.paragraphs if p.text.strip())
|
|
return [{"text": text, "page": None}]
|
|
except ImportError:
|
|
raise RuntimeError("Install python-docx: pip install python-docx")
|
|
|
|
def extract_text(path: str, ext: str) -> list[dict]:
|
|
"""Returns list of {text, page} dicts by file type."""
|
|
if ext == ".pdf":
|
|
return extract_pdf(path)
|
|
if ext in (".docx", ".doc"):
|
|
return extract_docx(path)
|
|
if ext == ".csv":
|
|
import csv
|
|
with open(path, encoding="utf-8", errors="replace") as f:
|
|
rows = list(csv.reader(f))
|
|
if not rows:
|
|
return []
|
|
headers = rows[0]
|
|
lines = [", ".join(f"{h}: {v}" for h, v in zip(headers, row)) for row in rows[1:] if any(row)]
|
|
return [{"text": "\n".join(lines), "page": None}]
|
|
# TXT, MD, and anything else
|
|
text = Path(path).read_text(encoding="utf-8", errors="replace")
|
|
return [{"text": text, "page": None}]
|
|
|
|
# ── Chunking ──────────────────────────────────────────────────────────────────
|
|
|
|
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
|
|
"""Split text into overlapping chunks by character count."""
|
|
if not text:
|
|
return []
|
|
chunks = []
|
|
start = 0
|
|
while start < len(text):
|
|
end = start + chunk_size
|
|
chunk = text[start:end]
|
|
# Try to break at a sentence/paragraph boundary
|
|
if end < len(text):
|
|
for sep in ["\n\n", "\n", ". ", " "]:
|
|
idx = chunk.rfind(sep)
|
|
if idx > chunk_size // 2:
|
|
chunk = chunk[:idx + len(sep)]
|
|
break
|
|
chunks.append(chunk.strip())
|
|
start += len(chunk) - overlap
|
|
if start >= len(text):
|
|
break
|
|
return [c for c in chunks if c]
|
|
|
|
# ── Embedding ─────────────────────────────────────────────────────────────────
|
|
|
|
def embed_batch(texts: list[str]) -> list[list[float]]:
|
|
"""Embed a batch of texts via Ollama."""
|
|
import urllib.request, urllib.error
|
|
embeddings = []
|
|
for text in texts:
|
|
body = json.dumps({"model": args.embed_model, "prompt": text}).encode()
|
|
req = urllib.request.Request(
|
|
f"{args.ollama_url}/api/embeddings", data=body, method="POST"
|
|
)
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=60) as r:
|
|
embeddings.append(json.loads(r.read().decode())["embedding"])
|
|
except Exception as e:
|
|
raise RuntimeError(f"Ollama embedding error: {e}")
|
|
return embeddings
|
|
|
|
# ── ChromaDB upsert ───────────────────────────────────────────────────────────
|
|
|
|
def chroma_upsert(ids, embeddings, documents, metadatas):
|
|
"""Upsert a batch of chunks into ChromaDB."""
|
|
import urllib.request, urllib.error
|
|
body = json.dumps({
|
|
"ids": ids,
|
|
"embeddings": embeddings,
|
|
"documents": documents,
|
|
"metadatas": metadatas,
|
|
}).encode()
|
|
req = urllib.request.Request(
|
|
f"{args.chroma_url}/api/v1/collections/{args.collection}/upsert",
|
|
data=body, method="POST"
|
|
)
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
return json.loads(r.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"ChromaDB upsert error {e.code}: {e.read().decode()}")
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
set_status("processing")
|
|
|
|
file_path = args.file
|
|
ext = Path(file_path).suffix.lower()
|
|
source_name = Path(file_path).name
|
|
|
|
# Get original filename from DB
|
|
try:
|
|
db = db_connect()
|
|
row = db.execute("SELECT orig_name FROM kb_documents WHERE id=?", (args.doc_id,)).fetchone()
|
|
db.close()
|
|
if row:
|
|
source_name = row["orig_name"]
|
|
except Exception:
|
|
pass
|
|
|
|
# Extract text
|
|
try:
|
|
pages = extract_text(file_path, ext)
|
|
except Exception as e:
|
|
set_status("failed", error=str(e))
|
|
sys.exit(1)
|
|
|
|
if not pages:
|
|
set_status("failed", error="No text could be extracted from the document")
|
|
sys.exit(1)
|
|
|
|
# Chunk all pages
|
|
all_chunks = []
|
|
for page_info in pages:
|
|
chunks = chunk_text(page_info["text"])
|
|
for i, chunk in enumerate(chunks):
|
|
all_chunks.append({
|
|
"text": chunk,
|
|
"page": page_info.get("page"),
|
|
"chunk": len(all_chunks) + i,
|
|
"source": source_name,
|
|
"doc_id": args.doc_id,
|
|
})
|
|
|
|
if not all_chunks:
|
|
set_status("failed", error="Document produced no text chunks after processing")
|
|
sys.exit(1)
|
|
|
|
# Embed and upsert in batches
|
|
total = len(all_chunks)
|
|
processed = 0
|
|
try:
|
|
for batch_start in range(0, total, BATCH_SIZE):
|
|
batch = all_chunks[batch_start:batch_start + BATCH_SIZE]
|
|
texts = [c["text"] for c in batch]
|
|
|
|
embeddings = embed_batch(texts)
|
|
|
|
ids = [f"doc{args.doc_id}-chunk{c['chunk']}" for c in batch]
|
|
metas = [{
|
|
"doc_id": c["doc_id"],
|
|
"source": c["source"],
|
|
"page": c["page"] or 0,
|
|
"chunk": c["chunk"],
|
|
} for c in batch]
|
|
|
|
chroma_upsert(ids, embeddings, texts, metas)
|
|
processed += len(batch)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
set_status("failed", chunk_count=processed, error=f"{e}\n{traceback.format_exc()[:500]}")
|
|
sys.exit(1)
|
|
|
|
set_status("ready", chunk_count=total)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
set_status("failed", error="Interrupted")
|
|
sys.exit(130)
|
|
except Exception as e:
|
|
import traceback
|
|
set_status("failed", error=f"{e}\n{traceback.format_exc()[:500]}")
|
|
sys.exit(1)
|