DeepIndex / main.py
chouchouvs's picture
Update main.py
d106952 verified
# -*- coding: utf-8 -*-
"""
Remote Indexer — Async (FAISS) for code repos.
API:
- POST /index -> {job_id}
- GET /status/{job_id} -> JobState
- POST /search -> {"results":[{path,text,score,...}, ...]} (file-level aggregation by default)
- GET /artifacts/{project_id}/dataset -> tgz
- GET /artifacts/{project_id}/faiss -> tgz
Key improvements (v3.1.0):
- Code-aware chunking by lines + code boundaries (instead of character tokenization)
- Semantic chunk header (FILE/LANG/KIND/LINES) to improve embedding signal
- store_text respected (embed always, store configurable)
- Search groups by file (unique paths) with aggregation; still backward compatible with {path,text,score}
"""
from __future__ import annotations
import os
import io
import re
import json
import time
import tarfile
import hashlib
import logging
from typing import Dict, Any, List, Tuple, Optional
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import faiss
import gradio as gr
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
# =============================================================================
# LOGGING
# =============================================================================
LOG = logging.getLogger("remote_indexer")
DBG = logging.getLogger("remote_indexer.dbg")
if not LOG.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
LOG.addHandler(_h)
LOG.setLevel(os.getenv("LOG_LEVEL", "DEBUG").upper())
if not DBG.handlers:
_h2 = logging.StreamHandler()
_h2.setFormatter(logging.Formatter("[DEBUG] %(asctime)s - %(message)s"))
DBG.addHandler(_h2)
DBG.setLevel(os.getenv("DBG_LEVEL", "DEBUG").upper())
# =============================================================================
# CONFIG / ENV
# =============================================================================
PORT = int(os.getenv("PORT", "7860"))
DATA_ROOT = os.getenv("DATA_ROOT", "/data").rstrip("/") or "/data"
os.makedirs(DATA_ROOT, exist_ok=True)
EMB_PROVIDER = os.getenv("EMB_PROVIDER", "dummy").strip().lower()
EMB_MODEL = os.getenv("EMB_MODEL", "sentence-transformers/all-mpnet-base-v2").strip()
EMB_BATCH = int(os.getenv("EMB_BATCH", "32"))
EMB_DIM = int(os.getenv("EMB_DIM", "128")) # dummy only
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "1"))
# =============================================================================
# CACHE DIRECTORIES (avoid PermissionError on HF)
# =============================================================================
def _setup_cache_dirs() -> Dict[str, str]:
os.environ.setdefault("HOME", "/home/user")
cache_root = os.getenv("CACHE_ROOT", "/tmp/.cache").rstrip("/")
paths = {
"root": cache_root,
"hf_home": f"{cache_root}/huggingface",
"hf_hub": f"{cache_root}/huggingface/hub",
"hf_tf": f"{cache_root}/huggingface/transformers",
"torch": f"{cache_root}/torch",
"st": f"{cache_root}/sentence-transformers",
"mpl": f"{cache_root}/matplotlib",
}
for p in paths.values():
try:
os.makedirs(p, exist_ok=True)
except Exception as e:
LOG.warning("Impossible de créer %s : %s", p, e)
os.environ["HF_HOME"] = paths["hf_home"]
os.environ["HF_HUB_CACHE"] = paths["hf_hub"]
os.environ["TRANSFORMERS_CACHE"] = paths["hf_tf"]
os.environ["TORCH_HOME"] = paths["torch"]
os.environ["SENTENCE_TRANSFORMERS_HOME"] = paths["st"]
os.environ["MPLCONFIGDIR"] = paths["mpl"]
os.environ.setdefault("HF_HUB_DISABLE_SYMLINKS_WARNING", "1")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
LOG.info("Caches configurés: %s", json.dumps(paths, indent=2))
return paths
CACHE_PATHS = _setup_cache_dirs()
_ST_MODEL = None
_HF_TOKENIZER = None
_HF_MODEL = None
# =============================================================================
# JOB STATE
# =============================================================================
class JobState(BaseModel):
job_id: str
project_id: str
stage: str = "pending" # pending -> queued -> chunking -> embedding -> indexing -> done/failed
total_files: int = 0
total_chunks: int = 0
embedded: int = 0
indexed: int = 0
errors: List[str] = []
messages: List[str] = []
started_at: float = time.time()
finished_at: Optional[float] = None
JOBS: Dict[str, JobState] = {}
def _now() -> str:
return time.strftime("%H:%M:%S")
def _proj_dirs(project_id: str) -> Tuple[str, str, str]:
base = os.path.join(DATA_ROOT, project_id)
ds_dir = os.path.join(base, "dataset")
fx_dir = os.path.join(base, "faiss")
os.makedirs(ds_dir, exist_ok=True)
os.makedirs(fx_dir, exist_ok=True)
return base, ds_dir, fx_dir
def _add_msg(st: JobState, msg: str):
st.messages.append(f"[{_now()}] {msg}")
LOG.info("[%s] %s", st.job_id, msg)
DBG.debug("[%s] %s", st.job_id, msg)
def _set_stage(st: JobState, stage: str):
st.stage = stage
_add_msg(st, f"stage={stage}")
# =============================================================================
# CHUNKING (code-aware)
# =============================================================================
_RE_CODE_BOUNDARY = re.compile(
r"""^\s*(export\s+)?(default\s+)?(async\s+)?(function|class|const|let|var)\b|^\s*interface\b|^\s*type\b""",
re.IGNORECASE,
)
def _infer_kind(path: str) -> str:
p = (path or "").lower().replace("\\", "/")
if "/hooks/" in p or p.endswith(("hook.ts", "hook.tsx")) or os.path.basename(p).startswith("use"):
return "hook"
if "/components/" in p:
return "component"
if "/pages/" in p or "/routes/" in p:
return "page"
if "/api/" in p:
return "api"
if p.endswith((".test.ts", ".test.tsx", ".spec.ts", ".spec.tsx", ".test.js", ".spec.js")):
return "test"
return "file"
def _make_chunk_header(path: str, kind: str, start_line: int, end_line: int) -> str:
ext = os.path.splitext(path)[1].lstrip(".").lower()
return (
f"FILE: {path}\n"
f"LANG: {ext or 'text'}\n"
f"KIND: {kind}\n"
f"LINES: {start_line}-{end_line}\n"
)
def _chunk_text_codeaware(text: str, size: int = 1200, overlap: int = 150) -> List[Dict[str, Any]]:
"""
Chunking par lignes + frontières de code.
size/overlap sont en caractères approximatifs.
Retour: [{start_line,end_line,text}, ...]
"""
text = (text or "").replace("\r\n", "\n").replace("\r", "\n")
lines = text.split("\n")
out: List[Dict[str, Any]] = []
buf: List[str] = []
buf_len = 0
start_line = 1
def flush(end_line: int):
nonlocal buf, buf_len, start_line
chunk = "\n".join(buf).strip()
if chunk:
out.append({"start_line": start_line, "end_line": end_line, "text": chunk})
buf = []
buf_len = 0
start_line = end_line + 1
for i, line in enumerate(lines, start=1):
if buf and buf_len >= size and _RE_CODE_BOUNDARY.match(line):
flush(i - 1)
buf.append(line)
buf_len += len(line) + 1
if buf_len >= size:
flush(i)
if buf:
flush(len(lines))
# overlap (approx): prepend tail of previous chunk
if overlap > 0 and len(out) >= 2:
new_out = [out[0]]
for prev, cur in zip(out, out[1:]):
tail = prev["text"][-overlap:]
cur2 = dict(cur)
cur2["text"] = (tail + "\n" + cur["text"]).strip()
cur2["start_line"] = max(1, cur["start_line"] - 1)
new_out.append(cur2)
out = new_out
return out
# =============================================================================
# EMBEDDINGS
# =============================================================================
def _emb_dummy(texts: List[str], dim: int = EMB_DIM) -> np.ndarray:
vecs = np.zeros((len(texts), dim), dtype="float32")
for i, t in enumerate(texts):
h = hashlib.sha1((t or "").encode("utf-8")).digest()
rng = np.random.default_rng(int.from_bytes(h[:8], "little", signed=False))
v = rng.standard_normal(dim).astype("float32")
vecs[i] = v / (np.linalg.norm(v) + 1e-9)
return vecs
def _get_st_model():
global _ST_MODEL
if _ST_MODEL is None:
from sentence_transformers import SentenceTransformer
_ST_MODEL = SentenceTransformer(EMB_MODEL, cache_folder=CACHE_PATHS["st"])
LOG.info("[st] modèle chargé: %s (cache=%s)", EMB_MODEL, CACHE_PATHS["st"])
return _ST_MODEL
def _emb_st(texts: List[str], batch_size: int) -> np.ndarray:
model = _get_st_model()
vecs = model.encode(
texts,
batch_size=max(1, int(batch_size)),
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=False,
).astype("float32")
return vecs
def _get_hf_model():
global _HF_TOKENIZER, _HF_MODEL
if _HF_MODEL is None or _HF_TOKENIZER is None:
from transformers import AutoTokenizer, AutoModel
_HF_TOKENIZER = AutoTokenizer.from_pretrained(EMB_MODEL, cache_dir=CACHE_PATHS["hf_tf"])
_HF_MODEL = AutoModel.from_pretrained(EMB_MODEL, cache_dir=CACHE_PATHS["hf_tf"])
_HF_MODEL.eval()
LOG.info("[hf] modèle chargé: %s (cache=%s)", EMB_MODEL, CACHE_PATHS["hf_tf"])
return _HF_TOKENIZER, _HF_MODEL
def _mean_pool(last_hidden_state: "np.ndarray", attention_mask: "np.ndarray") -> "np.ndarray":
mask = attention_mask[..., None]
s = (last_hidden_state * mask).sum(axis=1)
d = mask.sum(axis=1) + 1e-9
return s / d
def _l2_normalize(x: np.ndarray) -> np.ndarray:
n = np.linalg.norm(x, axis=1, keepdims=True) + 1e-12
return x / n
def _emb_hf(texts: List[str], batch_size: int) -> np.ndarray:
tok, model = _get_hf_model()
# Torch is required only here
import torch
outs: List[np.ndarray] = []
bs = max(1, int(batch_size))
with torch.no_grad():
for i in range(0, len(texts), bs):
batch = texts[i:i+bs]
enc = tok(batch, padding=True, truncation=True, return_tensors="pt")
out = model(**enc)
last = out.last_hidden_state.detach().cpu().numpy()
mask = enc["attention_mask"].detach().cpu().numpy()
pooled = _mean_pool(last, mask)
outs.append(pooled.astype("float32"))
xb = np.vstack(outs) if outs else np.zeros((0, 1), dtype="float32")
xb = _l2_normalize(xb)
return xb.astype("float32")
# =============================================================================
# DATASET + FAISS IO
# =============================================================================
def _save_dataset(ds_dir: str, rows: List[Dict[str, Any]]) -> None:
os.makedirs(ds_dir, exist_ok=True)
data_path = os.path.join(ds_dir, "data.jsonl")
with open(data_path, "w", encoding="utf-8") as f:
for r in rows:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
def _load_dataset(ds_dir: str) -> List[Dict[str, Any]]:
data_path = os.path.join(ds_dir, "data.jsonl")
if not os.path.isfile(data_path):
return []
out: List[Dict[str, Any]] = []
with open(data_path, "r", encoding="utf-8") as f:
for line in f:
try:
out.append(json.loads(line))
except Exception:
continue
return out
def _save_faiss(fx_dir: str, xb: np.ndarray, meta: Dict[str, Any]) -> None:
os.makedirs(fx_dir, exist_ok=True)
idx_path = os.path.join(fx_dir, "emb.faiss")
index = faiss.IndexFlatIP(xb.shape[1]) # cosine ~ inner product if normalized
index.add(xb)
faiss.write_index(index, idx_path)
with open(os.path.join(fx_dir, "meta.json"), "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
def _load_faiss(fx_dir: str) -> faiss.Index:
idx_path = os.path.join(fx_dir, "emb.faiss")
if not os.path.isfile(idx_path):
raise FileNotFoundError(f"FAISS index introuvable: {idx_path}")
return faiss.read_index(idx_path)
def _tar_dir_to_bytes(dir_path: str) -> bytes:
bio = io.BytesIO()
with tarfile.open(fileobj=bio, mode="w:gz") as tar:
tar.add(dir_path, arcname=os.path.basename(dir_path))
bio.seek(0)
return bio.read()
# =============================================================================
# WORKER POOL (async)
# =============================================================================
EXECUTOR = ThreadPoolExecutor(max_workers=max(1, MAX_WORKERS))
LOG.info("ThreadPoolExecutor initialisé : max_workers=%s", MAX_WORKERS)
def _do_index_job(
st: JobState,
files: List[Dict[str, str]],
chunk_size: int,
overlap: int,
batch_size: int,
store_text: bool,
) -> None:
"""
Heavy task running in worker thread. Updates st during pipeline.
"""
try:
_, ds_dir, fx_dir = _proj_dirs(st.project_id)
# 1) Chunking
_set_stage(st, "chunking")
rows: List[Dict[str, Any]] = []
embed_texts: List[str] = []
st.total_files = len(files)
for it in files:
path = (it.get("path") or "unknown").strip()
txt = it.get("text") or ""
kind = _infer_kind(path)
chunks = _chunk_text_codeaware(txt, size=int(chunk_size), overlap=int(overlap))
_add_msg(st, f"{path}: len(text)={len(txt)} chunks={len(chunks)}")
for ci, ck in enumerate(chunks):
header = _make_chunk_header(path, kind, int(ck["start_line"]), int(ck["end_line"]))
merged = header + "\n" + (ck["text"] or "")
# Always embed merged (full signal)
embed_texts.append(merged)
# Store text depending on store_text (but keep at least header)
stored_text = merged if store_text else header
rows.append({
"path": path,
"text": stored_text,
"chunk_id": int(ci),
"kind": kind,
"start_line": int(ck["start_line"]),
"end_line": int(ck["end_line"]),
})
st.total_chunks = len(rows)
_add_msg(st, f"Total chunks = {st.total_chunks}")
# 2) Embedding
_set_stage(st, "embedding")
if EMB_PROVIDER == "dummy":
xb = _emb_dummy(embed_texts, dim=EMB_DIM)
dim = xb.shape[1]
elif EMB_PROVIDER == "st":
xb = _emb_st(embed_texts, batch_size=batch_size or EMB_BATCH)
dim = xb.shape[1]
else:
xb = _emb_hf(embed_texts, batch_size=batch_size or EMB_BATCH)
dim = xb.shape[1]
st.embedded = int(xb.shape[0])
_add_msg(st, f"Embeddings {st.embedded}/{st.total_chunks}")
_add_msg(st, f"Embeddings dim={dim}")
# 3) Save dataset (jsonl)
_save_dataset(ds_dir, rows)
_add_msg(st, f"Dataset sauvegardé dans {ds_dir} (store_text={store_text})")
# 4) FAISS
_set_stage(st, "indexing")
faiss_meta = {
"dim": int(dim),
"count": int(xb.shape[0]),
"provider": EMB_PROVIDER,
"model": EMB_MODEL if EMB_PROVIDER != "dummy" else None,
"chunking": "codeaware_lines_v1",
"store_text": bool(store_text),
}
_save_faiss(fx_dir, xb, meta=faiss_meta)
st.indexed = int(xb.shape[0])
_add_msg(st, f"FAISS écrit sur {os.path.join(fx_dir, 'emb.faiss')}")
_add_msg(st, f"OK — dataset+index prêts (projet={st.project_id})")
_set_stage(st, "done")
st.finished_at = time.time()
except Exception as e:
LOG.exception("Job %s failed", st.job_id)
st.errors.append(str(e))
_add_msg(st, f"❌ Exception: {e}")
st.stage = "failed"
st.finished_at = time.time()
def _submit_job(
project_id: str,
files: List[Dict[str, str]],
chunk_size: int,
overlap: int,
batch_size: int,
store_text: bool
) -> str:
job_id = hashlib.sha1(f"{project_id}{time.time()}".encode()).hexdigest()[:12]
st = JobState(job_id=job_id, project_id=project_id, stage="pending", messages=[])
JOBS[job_id] = st
_add_msg(st, f"Job {job_id} créé pour project {project_id}")
_add_msg(st, f"Index start project={project_id} files={len(files)} chunk_size={chunk_size} overlap={overlap} batch_size={batch_size} store_text={store_text} provider={EMB_PROVIDER} model={EMB_MODEL if EMB_PROVIDER!='dummy' else '-'}")
EXECUTOR.submit(_do_index_job, st, files, chunk_size, overlap, batch_size, store_text)
_set_stage(st, "queued")
return job_id
# =============================================================================
# FASTAPI
# =============================================================================
fastapi_app = FastAPI(title="remote-indexer-async", version="3.1.0")
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],
)
class FileItem(BaseModel):
path: str
text: str
class IndexRequest(BaseModel):
project_id: str
files: List[FileItem]
chunk_size: int = 512
overlap: int = 50
batch_size: int = 32
store_text: bool = True
class SearchRequest(BaseModel):
project_id: str
query: str
k: int = 10
# new: fetch more chunks then aggregate by file
group_by_file: bool = True
fetch_k: int = 60 # number of chunks retrieved before aggregation
@fastapi_app.get("/health")
def health():
info = {
"ok": True,
"service": "remote-indexer-async",
"provider": EMB_PROVIDER,
"model": EMB_MODEL if EMB_PROVIDER != "dummy" else None,
"cache_root": os.getenv("CACHE_ROOT", "/tmp/.cache"),
"workers": MAX_WORKERS,
"data_root": DATA_ROOT,
"version": "3.1.0",
}
return info
@fastapi_app.get("/")
def root_redirect():
return {"ok": True, "service": "remote-indexer-async", "ui": "/ui"}
@fastapi_app.post("/index")
def index(req: IndexRequest):
"""
Async: returns immediately a job_id. Worker thread does the heavy job.
"""
try:
files = [fi.model_dump() for fi in req.files]
job_id = _submit_job(
project_id=req.project_id,
files=files,
chunk_size=int(req.chunk_size),
overlap=int(req.overlap),
batch_size=int(req.batch_size),
store_text=bool(req.store_text),
)
return {"job_id": job_id}
except Exception as e:
LOG.exception("index failed (submit)")
raise HTTPException(status_code=500, detail=str(e))
@fastapi_app.get("/status/{job_id}")
def status(job_id: str):
st = JOBS.get(job_id)
if not st:
raise HTTPException(status_code=404, detail="job inconnu")
return JSONResponse(st.model_dump())
@fastapi_app.post("/search")
def search(req: SearchRequest):
_, ds_dir, fx_dir = _proj_dirs(req.project_id)
idx_path = os.path.join(fx_dir, "emb.faiss")
ds_path = os.path.join(ds_dir, "data.jsonl")
if not (os.path.isfile(idx_path) and os.path.isfile(ds_path)):
raise HTTPException(status_code=409, detail="Index non prêt (reviens plus tard)")
rows = _load_dataset(ds_dir)
if not rows:
raise HTTPException(status_code=404, detail="dataset introuvable")
# Query embedding with the SAME provider
qtxt = (req.query or "").strip()
if not qtxt:
return {"results": []}
bs = max(1, int(EMB_BATCH))
if EMB_PROVIDER == "dummy":
q = _emb_dummy([qtxt], dim=EMB_DIM)[0:1, :]
elif EMB_PROVIDER == "st":
q = _emb_st([qtxt], batch_size=1)[0:1, :]
else:
q = _emb_hf([qtxt], batch_size=1)[0:1, :]
index = _load_faiss(fx_dir)
if index.d != q.shape[1]:
raise HTTPException(status_code=500, detail=f"dim incompatibles: index.d={index.d} vs query={q.shape[1]}")
k = int(max(1, req.k))
fetch_k = int(max(k, req.fetch_k or k))
scores, ids = index.search(q, fetch_k)
ids = ids[0].tolist()
scores = scores[0].tolist()
raw_hits: List[Dict[str, Any]] = []
for idx, sc in zip(ids, scores):
if idx < 0 or idx >= len(rows):
continue
r = rows[idx]
raw_hits.append({
"path": r.get("path") or "",
"text": r.get("text") or "",
"score": float(sc),
"chunk_id": r.get("chunk_id"),
"kind": r.get("kind"),
"start_line": r.get("start_line"),
"end_line": r.get("end_line"),
})
# Legacy mode: return chunk-level as-is
if not bool(req.group_by_file):
return {"results": raw_hits[:k]}
# File-level aggregation (unique path)
from collections import defaultdict
by_path: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for h in raw_hits:
p = h.get("path") or ""
if p:
by_path[p].append(h)
file_results: List[Dict[str, Any]] = []
for path, hits in by_path.items():
hits.sort(key=lambda x: x["score"], reverse=True)
best = hits[0]
top = hits[:3]
avg_top = sum(h["score"] for h in top) / max(1, len(top))
file_score = float(best["score"] + 0.15 * avg_top)
# Backward compat: keep {path,text,score} at top-level (text = best chunk stored)
file_results.append({
"path": path,
"text": best.get("text") or "",
"score": file_score,
# Extra diagnostics (optional for client)
"chunk_id": best.get("chunk_id"),
"kind": best.get("kind"),
"start_line": best.get("start_line"),
"end_line": best.get("end_line"),
"hits": int(len(hits)),
"top_chunks": [
{
"chunk_id": h.get("chunk_id"),
"score": float(h.get("score") or 0.0),
"start_line": h.get("start_line"),
"end_line": h.get("end_line"),
}
for h in top
],
})
file_results.sort(key=lambda x: x["score"], reverse=True)
return {"results": file_results[:k]}
# ----------- ARTIFACTS EXPORT -----------
@fastapi_app.get("/artifacts/{project_id}/dataset")
def download_dataset(project_id: str):
_, ds_dir, _ = _proj_dirs(project_id)
if not os.path.isdir(ds_dir):
raise HTTPException(status_code=404, detail="Dataset introuvable")
buf = _tar_dir_to_bytes(ds_dir)
headers = {"Content-Disposition": f'attachment; filename="{project_id}_dataset.tgz"'}
return StreamingResponse(io.BytesIO(buf), media_type="application/gzip", headers=headers)
@fastapi_app.get("/artifacts/{project_id}/faiss")
def download_faiss(project_id: str):
_, _, fx_dir = _proj_dirs(project_id)
if not os.path.isdir(fx_dir):
raise HTTPException(status_code=404, detail="FAISS introuvable")
buf = _tar_dir_to_bytes(fx_dir)
headers = {"Content-Disposition": f'attachment; filename="{project_id}_faiss.tgz"'}
return StreamingResponse(io.BytesIO(buf), media_type="application/gzip", headers=headers)
# =============================================================================
# GRADIO UI (optional)
# =============================================================================
def _ui_index(project_id: str, sample_text: str):
files = [{"path": "sample.tsx", "text": sample_text}]
try:
req = IndexRequest(project_id=project_id, files=[FileItem(**f) for f in files])
res = index(req)
return f"Job lancé: {res['job_id']}"
except Exception as e:
return f"Erreur index: {e}"
def _ui_search(project_id: str, query: str, k: int):
try:
res = search(SearchRequest(project_id=project_id, query=query, k=int(k)))
return json.dumps(res, ensure_ascii=False, indent=2)
except Exception as e:
return f"Erreur search: {e}"
with gr.Blocks(title="Remote Indexer (Async FAISS)", analytics_enabled=False) as ui:
gr.Markdown("## Remote Indexer — **Async** (API: `/index`, `/status/{job}`, `/search`, `/artifacts/.`).")
gr.Markdown(
f"**Provider**: `{EMB_PROVIDER}` — **Model**: `{EMB_MODEL if EMB_PROVIDER!='dummy' else '-'}` — "
f"**Cache**: `{os.getenv('CACHE_ROOT', '/tmp/.cache')}` — **Workers**: `{MAX_WORKERS}`"
)
with gr.Tab("Index"):
pid = gr.Textbox(label="Project ID", value="DEEPWEB")
sample = gr.Textbox(
label="Texte d’exemple",
value="export const AlbumCard = () => { return <div>Albums</div> }",
lines=4
)
btn = gr.Button("Lancer index (sample)")
out = gr.Textbox(label="Résultat")
btn.click(_ui_index, inputs=[pid, sample], outputs=[out])
with gr.Tab("Search"):
pid2 = gr.Textbox(label="Project ID", value="DEEPWEB")
q = gr.Textbox(label="Query", value="améliorer la page albums")
k = gr.Slider(1, 20, value=10, step=1, label="k")
btn2 = gr.Button("Rechercher")
out2 = gr.Code(label="Résultats")
btn2.click(_ui_search, inputs=[pid2, q, k], outputs=[out2])
fastapi_app = gr.mount_gradio_app(fastapi_app, ui, path="/ui")
# =============================================================================
# MAIN
# =============================================================================
if __name__ == "__main__":
import uvicorn
LOG.info("Démarrage Uvicorn sur 0.0.0.0:%s (UI_PATH=/ui) — async index", PORT)
uvicorn.run(fastapi_app, host="0.0.0.0", port=PORT)