On this tutorial, we construct an ultra-advanced agentic AI workflow that behaves like a production-grade analysis and reasoning system quite than a single immediate name. We ingest actual net sources asynchronously, break up them into provenance-tracked chunks, and run hybrid retrieval utilizing each TF-IDF (sparse) and OpenAI embeddings (dense), then fuse outcomes for greater recall and stability. We orchestrate a number of brokers, planning, synthesis, and restore, whereas implementing strict guardrails so each main declare is grounded in retrieved proof, and we persist episodic reminiscence. Therefore, the system improves its technique over time. Try the FULL CODES right here.
!pip -q set up openai openai-agents pydantic httpx beautifulsoup4 lxml scikit-learn numpy
import os, re, json, time, getpass, asyncio, sqlite3, hashlib
from typing import Checklist, Dict, Tuple, Elective, Any
import numpy as np
import httpx
from bs4 import BeautifulSoup
from pydantic import BaseModel, Area
from sklearn.feature_extraction.textual content import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from openai import AsyncOpenAI
from brokers import Agent, Runner, SQLiteSession
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
if not os.environ.get("OPENAI_API_KEY"):
elevate RuntimeError("OPENAI_API_KEY not provided.")
print("✅ OpenAI API key loaded securely.")
oa = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
def sha1(s: str) -> str:
return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()
def normalize_url(u: str) -> str:
u = (u or "").strip()
return u.rstrip(").,]"'")
def clean_html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
txt = soup.get_text("n")
txt = re.sub(r"n{3,}", "nn", txt).strip()
txt = re.sub(r"[ t]+", " ", txt)
return txt
def chunk_text(textual content: str, chunk_chars: int = 1600, overlap_chars: int = 320) -> Checklist[str]:
if not textual content:
return []
textual content = re.sub(r"s+", " ", textual content).strip()
n = len(textual content)
step = max(1, chunk_chars - overlap_chars)
chunks = []
i = 0
whereas i < n:
chunks.append(textual content[i:i + chunk_chars])
i += step
return chunks
def canonical_chunk_id(s: str) -> str:
if s is None:
return ""
s = str(s).strip()
s = s.strip("<>"'()[]{}")
s = s.rstrip(".,;:")
return s
def inject_exec_summary_citations(exec_summary: str, citations: Checklist[str], allowed_chunk_ids: Checklist[str]) -> str:
exec_summary = exec_summary or ""
cset = []
for c in citations:
c = canonical_chunk_id(c)
if c and c in allowed_chunk_ids and c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) < 2:
for c in allowed_chunk_ids:
if c not in cset:
cset.append(c)
if len(cset) >= 2:
break
if len(cset) >= 2:
wanted = [c for c in cset if c not in exec_summary]
if wanted:
exec_summary = exec_summary.strip()
if exec_summary and never exec_summary.endswith("."):
exec_summary += "."
exec_summary += f" (cite: {cset[0]}) (cite: {cset[1]})"
return exec_summaryWe arrange the atmosphere, securely load the OpenAI API key, and initialize core utilities that all the things else depends upon. We outline hashing, URL normalization, HTML cleansing, and chunking so all downstream steps function on clear, constant textual content. We additionally add deterministic helpers to normalize and inject citations, guaranteeing guardrails are at all times happy. Try the FULL CODES right here.
async def fetch_many(urls: Checklist[str], timeout_s: float = 25.0, per_url_char_limit: int = 60000) -> Dict[str, str]:
headers = {"User-Agent": "Mozilla/5.0 (AgenticAI/4.2)"}
urls = [normalize_url(u) for u in urls]
urls = [u for u in urls if u.startswith("http")]
urls = record(dict.fromkeys(urls))
out: Dict[str, str] = {}
async with httpx.AsyncClient(timeout=timeout_s, follow_redirects=True, headers=headers) as consumer:
async def _one(url: str):
attempt:
r = await consumer.get(url)
r.raise_for_status()
out[url] = clean_html_to_text(r.textual content)[:per_url_char_limit]
besides Exception as e:
out[url] = f"__FETCH_ERROR__ {type(e).__name__}: {e}"
await asyncio.collect(*[_one(u) for u in urls])
return out
def dedupe_texts(sources: Dict[str, str]) -> Dict[str, str]:
seen = set()
out = {}
for url, txt in sources.gadgets():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
proceed
h = sha1(txt[:25000])
if h in seen:
proceed
seen.add(h)
out[url] = txt
return out
class ChunkRecord(BaseModel):
chunk_id: str
url: str
chunk_index: int
textual content: str
class RetrievalHit(BaseModel):
chunk_id: str
url: str
chunk_index: int
score_sparse: float = 0.0
score_dense: float = 0.0
score_fused: float = 0.0
textual content: str
class EvidencePack(BaseModel):
question: str
hits: Checklist[RetrievalHit]We asynchronously fetch a number of net sources in parallel and aggressively deduplicate content material to keep away from redundant proof. We convert uncooked pages into structured textual content and outline the core information fashions that symbolize chunks and retrieval hits. We guarantee every bit of textual content is traceable again to a selected supply and chunk index. Try the FULL CODES right here.
EPISODE_DB = "agentic_episode_memory.db"
def episode_db_init():
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
query TEXT NOT NULL,
urls_json TEXT NOT NULL,
retrieval_queries_json TEXT NOT NULL,
useful_sources_json TEXT NOT NULL
)
""")
con.commit()
con.shut()
def episode_store(query: str, urls: Checklist[str], retrieval_queries: Checklist[str], useful_sources: Checklist[str]):
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute(
"INSERT INTO episodes(ts, question, urls_json, retrieval_queries_json, useful_sources_json) VALUES(?,?,?,?,?)",
(int(time.time()), query, json.dumps(urls), json.dumps(retrieval_queries), json.dumps(useful_sources)),
)
con.commit()
con.shut()
def episode_recall(query: str, top_k: int = 2) -> Checklist[Dict[str, Any]]:
con = sqlite3.join(EPISODE_DB)
cur = con.cursor()
cur.execute("SELECT ts, question, urls_json, retrieval_queries_json, useful_sources_json FROM episodes ORDER BY ts DESC LIMIT 200")
rows = cur.fetchall()
con.shut()
q_tokens = set(re.findall(r"[A-Za-z]{3,}", (query or "").decrease()))
scored = []
for ts, q2, u, rq, us in rows:
t2 = set(re.findall(r"[A-Za-z]{3,}", (q2 or "").decrease()))
if not t2:
proceed
rating = len(q_tokens & t2) / max(1, len(q_tokens))
if rating > 0:
scored.append((rating, {
"ts": ts,
"question": q2,
"urls": json.hundreds(u),
"retrieval_queries": json.hundreds(rq),
"useful_sources": json.hundreds(us),
}))
scored.kind(key=lambda x: x[0], reverse=True)
return [x[1] for x in scored[:top_k]]
episode_db_init()We introduce episodic reminiscence backed by SQLite so the system can recall what labored in earlier runs. We retailer questions, retrieval methods, and helpful sources to information future planning. We additionally implement light-weight similarity-based recall to bias the system towards traditionally efficient patterns. Try the FULL CODES right here.
class HybridIndex:
def __init__(self):
self.information: Checklist[ChunkRecord] = []
self.tfidf: Elective[TfidfVectorizer] = None
self.tfidf_mat = None
self.emb_mat: Elective[np.ndarray] = None
def build_sparse(self):
corpus = [r.text for r in self.records] if self.information else [""]
self.tfidf = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=80000)
self.tfidf_mat = self.tfidf.fit_transform(corpus)
def search_sparse(self, question: str, ok: int) -> Checklist[Tuple[int, float]]:
if not self.information or self.tfidf is None or self.tfidf_mat is None:
return []
qv = self.tfidf.remodel([query])
sims = cosine_similarity(qv, self.tfidf_mat).flatten()
prime = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in prime]
def set_dense(self, mat: np.ndarray):
self.emb_mat = mat.astype(np.float32)
def search_dense(self, q_emb: np.ndarray, ok: int) -> Checklist[Tuple[int, float]]:
if self.emb_mat is None or not self.information:
return []
M = self.emb_mat
q = q_emb.astype(np.float32).reshape(1, -1)
M_norm = M / (np.linalg.norm(M, axis=1, keepdims=True) + 1e-9)
q_norm = q / (np.linalg.norm(q) + 1e-9)
sims = (M_norm @ q_norm.T).flatten()
prime = np.argsort(-sims)[:k]
return [(int(i), float(sims[i])) for i in prime]
def rrf_fuse(rankings: Checklist[List[int]], ok: int = 60) -> Dict[int, float]:
scores: Dict[int, float] = {}
for r in rankings:
for pos, idx in enumerate(r, begin=1):
scores[idx] = scores.get(idx, 0.0) + 1.0 / (ok + pos)
return scores
HYBRID = HybridIndex()
ALLOWED_URLS: Checklist[str] = []
EMBED_MODEL = "text-embedding-3-small"
async def embed_batch(texts: Checklist[str]) -> np.ndarray:
resp = await oa.embeddings.create(mannequin=EMBED_MODEL, enter=texts, encoding_format="float")
vecs = [np.array(item.embedding, dtype=np.float32) for item in resp.data]
return np.vstack(vecs) if vecs else np.zeros((0, 0), dtype=np.float32)
async def embed_texts(texts: Checklist[str], batch_size: int = 96, max_concurrency: int = 3) -> np.ndarray:
sem = asyncio.Semaphore(max_concurrency)
mats: Checklist[Tuple[int, np.ndarray]] = []
async def _one(begin: int, batch: Checklist[str]):
async with sem:
m = await embed_batch(batch)
mats.append((begin, m))
duties = []
for begin in vary(0, len(texts), batch_size):
batch = [t[:7000] for t in texts[start:start + batch_size]]
duties.append(_one(begin, batch))
await asyncio.collect(*duties)
mats.kind(key=lambda x: x[0])
emb = np.vstack([m for _, m in mats]) if mats else np.zeros((len(texts), 0), dtype=np.float32)
if emb.form[0] != len(texts):
elevate RuntimeError(f"Embedding rows mismatch: got {emb.shape[0]} expected {len(texts)}")
return emb
async def embed_query(question: str) -> np.ndarray:
m = await embed_batch([query[:7000]])
return m[0] if m.form[0] else np.zeros((0,), dtype=np.float32)
async def build_index(urls: Checklist[str], max_chunks_per_url: int = 60):
world ALLOWED_URLS
fetched = await fetch_many(urls)
fetched = dedupe_texts(fetched)
information: Checklist[ChunkRecord] = []
allowed: Checklist[str] = []
for url, txt in fetched.gadgets():
if not isinstance(txt, str) or txt.startswith("__FETCH_ERROR__"):
proceed
allowed.append(url)
chunks = chunk_text(txt)[:max_chunks_per_url]
for i, ch in enumerate(chunks):
cid = f"{sha1(url)}:{i}"
information.append(ChunkRecord(chunk_id=cid, url=url, chunk_index=i, textual content=ch))
if not information:
err_view = {normalize_url(u): fetched.get(normalize_url(u), "") for u in urls}
elevate RuntimeError("No sources fetched successfully.n" + json.dumps(err_view, indent=2)[:4000])
ALLOWED_URLS = allowed
HYBRID.information = information
HYBRID.build_sparse()
texts = [r.text for r in HYBRID.records]
emb = await embed_texts(texts, batch_size=96, max_concurrency=3)
HYBRID.set_dense(emb)We construct a hybrid retrieval index that mixes sparse TF-IDF search with dense OpenAI embeddings. We allow reciprocal rank fusion, in order that sparse and dense indicators complement one another quite than compete. We assemble the index as soon as per run and reuse it throughout all retrieval queries for effectivity. Try the FULL CODES right here.
def build_evidence_pack(question: str, sparse: Checklist[Tuple[int,float]], dense: Checklist[Tuple[int,float]], ok: int = 10) -> EvidencePack:
sparse_rank = [i for i,_ in sparse]
dense_rank = [i for i,_ in dense]
sparse_scores = {i:s for i,s in sparse}
dense_scores = {i:s for i,s in dense}
fused = rrf_fuse([sparse_rank, dense_rank], ok=60) if dense_rank else rrf_fuse([sparse_rank], ok=60)
prime = sorted(fused.keys(), key=lambda i: fused[i], reverse=True)[:k]
hits: Checklist[RetrievalHit] = []
for idx in prime:
r = HYBRID.information[idx]
hits.append(RetrievalHit(
chunk_id=r.chunk_id, url=r.url, chunk_index=r.chunk_index,
score_sparse=float(sparse_scores.get(idx, 0.0)),
score_dense=float(dense_scores.get(idx, 0.0)),
score_fused=float(fused.get(idx, 0.0)),
textual content=r.textual content
))
return EvidencePack(question=question, hits=hits)
async def gather_evidence(queries: Checklist[str], per_query_k: int = 10, sparse_k: int = 60, dense_k: int = 60):
proof: Checklist[EvidencePack] = []
useful_sources_count: Dict[str, int] = {}
all_chunk_ids: Checklist[str] = []
for q in queries:
sparse = HYBRID.search_sparse(q, ok=sparse_k)
q_emb = await embed_query(q)
dense = HYBRID.search_dense(q_emb, ok=dense_k)
pack = build_evidence_pack(q, sparse, dense, ok=per_query_k)
proof.append(pack)
for h in pack.hits[:6]:
useful_sources_count[h.url] = useful_sources_count.get(h.url, 0) + 1
for h in pack.hits:
all_chunk_ids.append(h.chunk_id)
useful_sources = sorted(useful_sources_count.keys(), key=lambda u: useful_sources_count[u], reverse=True)
all_chunk_ids = sorted(record(dict.fromkeys(all_chunk_ids)))
return proof, useful_sources[:8], all_chunk_ids
class Plan(BaseModel):
goal: str
subtasks: Checklist[str]
retrieval_queries: Checklist[str]
acceptance_checks: Checklist[str]
class UltraAnswer(BaseModel):
title: str
executive_summary: str
structure: Checklist[str]
retrieval_strategy: Checklist[str]
agent_graph: Checklist[str]
implementation_notes: Checklist[str]
risks_and_limits: Checklist[str]
citations: Checklist[str]
sources: Checklist[str]
def normalize_answer(ans: UltraAnswer, allowed_chunk_ids: Checklist[str]) -> UltraAnswer:
information = ans.model_dump()
information["citations"] = [canonical_chunk_id(x) for x in (data.get("citations") or [])]
information["citations"] = [x for x in data["citations"] if x in allowed_chunk_ids]
information["executive_summary"] = inject_exec_summary_citations(information.get("executive_summary",""), information["citations"], allowed_chunk_ids)
return UltraAnswer(**information)
def validate_ultra(ans: UltraAnswer, allowed_chunk_ids: Checklist[str]) -> None:
extras = [u for u in ans.sources if u not in ALLOWED_URLS]
if extras:
elevate ValueError(f"Non-allowed sources in output: {extras}")
cset = set(ans.citations or [])
lacking = [cid for cid in cset if cid not in set(allowed_chunk_ids)]
if lacking:
elevate ValueError(f"Citations reference unknown chunk_ids (not retrieved): {missing}")
if len(cset) < 6:
elevate ValueError("Need at least 6 distinct chunk_id citations in ultra mode.")
es_text = ans.executive_summary or ""
es_count = sum(1 for cid in cset if cid in es_text)
if es_count < 2:
elevate ValueError("Executive summary must include at least 2 chunk_id citations verbatim.")
PLANNER = Agent(
title="Planner",
mannequin="gpt-4o-mini",
directions=(
"Return a technical Plan schema.n"
"Make 10-16 retrieval_queries.n"
"Acceptance must include: at least 6 citations and exec_summary contains at least 2 citations verbatim."
),
output_type=Plan,
)
SYNTHESIZER = Agent(
title="Synthesizer",
mannequin="gpt-4o-mini",
directions=(
"Return UltraAnswer schema.n"
"Hard constraints:n"
"- executive_summary MUST include at least TWO citations verbatim as: (cite: ).n"
"- citations must be chosen ONLY from ALLOWED_CHUNK_IDS list.n"
"- citations list must include at least 6 unique chunk_ids.n"
"- sources must be subset of allowed URLs.n"
),
output_type=UltraAnswer,
)
FIXER = Agent(
title="Fixer",
mannequin="gpt-4o-mini",
directions=(
"Repair to satisfy guardrails.n"
"Ensure executive_summary includes at least TWO citations verbatim.n"
"Choose citations ONLY from ALLOWED_CHUNK_IDS list.n"
"Return UltraAnswer schema."
),
output_type=UltraAnswer,
)
session = SQLiteSession("ultra_agentic_user", "ultra_agentic_session.db") We collect proof by working a number of focused queries, fusing sparse and dense outcomes, and assembling proof packs with scores and provenance. We outline strict schemas for plans and ultimate solutions, then normalize and validate citations in opposition to retrieved chunk IDs. We implement laborious guardrails so each reply stays grounded and auditable. Try the FULL CODES right here.
async def run_ultra_agentic(query: str, urls: Checklist[str], max_repairs: int = 2) -> UltraAnswer:
await build_index(urls)
recall_hint = json.dumps(episode_recall(query, top_k=2), indent=2)[:2000]
plan_res = await Runner.run(
PLANNER,
f"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nnRecall:n{recall_hint}n",
session=session
)
plan: Plan = plan_res.final_output
queries = (plan.retrieval_queries or [])[:16]
evidence_packs, useful_sources, allowed_chunk_ids = await gather_evidence(queries)
evidence_json = json.dumps([p.model_dump() for p in evidence_packs], indent=2)[:16000]
allowed_chunk_ids_json = json.dumps(allowed_chunk_ids[:200], indent=2)
draft_res = await Runner.run(
SYNTHESIZER,
f"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
f"Evidence packs:n{evidence_json}nn"
"Return UltraAnswer.",
session=session
)
draft = normalize_answer(draft_res.final_output, allowed_chunk_ids)
last_err = None
for i in vary(max_repairs + 1):
attempt:
validate_ultra(draft, allowed_chunk_ids)
episode_store(query, ALLOWED_URLS, plan.retrieval_queries, useful_sources)
return draft
besides Exception as e:
last_err = str(e)
if i >= max_repairs:
draft = normalize_answer(draft, allowed_chunk_ids)
validate_ultra(draft, allowed_chunk_ids)
return draft
fixer_res = await Runner.run(
FIXER,
f"Question:n{question}nnAllowed URLs:n{json.dumps(ALLOWED_URLS, indent=2)}nn"
f"ALLOWED_CHUNK_IDS:n{allowed_chunk_ids_json}nn"
f"Guardrail error:n{last_err}nn"
f"Draft:n{json.dumps(draft.model_dump(), indent=2)[:12000]}nn"
f"Evidence packs:n{evidence_json}nn"
"Return corrected UltraAnswer that passes guardrails.",
session=session
)
draft = normalize_answer(fixer_res.final_output, allowed_chunk_ids)
elevate RuntimeError(f"Unexpected failure: {last_err}")
query = (
"Design a production-lean but advanced agentic AI workflow in Python with hybrid retrieval, "
"provenance-first citations, critique-and-repair loops, and episodic memory. "
"Explain why each layer matters, failure modes, and evaluation."
)
urls = [
"
"
"
"
]
ans = await run_ultra_agentic(query, urls, max_repairs=2)
print("nTITLE:n", ans.title)
print("nEXECUTIVE SUMMARY:n", ans.executive_summary)
print("nARCHITECTURE:")
for x in ans.structure:
print("-", x)
print("nRETRIEVAL STRATEGY:")
for x in ans.retrieval_strategy:
print("-", x)
print("nAGENT GRAPH:")
for x in ans.agent_graph:
print("-", x)
print("nIMPLEMENTATION NOTES:")
for x in ans.implementation_notes:
print("-", x)
print("nRISKS & LIMITS:")
for x in ans.risks_and_limits:
print("-", x)
print("nCITATIONS (chunk_ids):")
for c in ans.citations:
print("-", c)
print("nSOURCES:")
for s in ans.sources:
print("-", s)We orchestrate the total agentic loop by chaining planning, synthesis, validation, and restore in an async-safe pipeline. We robotically retry and repair outputs till they cross all constraints with out human intervention. We end by working a full instance and printing a completely grounded, production-ready agentic response.
In conclusion, we developed a complete agentic pipeline strong to widespread failure modes: unstable embedding shapes, quotation drift, and lacking grounding in govt summaries. We validated outputs in opposition to allowlisted sources, retrieved chunk IDs, robotically normalized citations, and injected deterministic citations when wanted to ensure compliance with out sacrificing correctness. By combining hybrid retrieval, critique-and-repair loops, and episodic reminiscence, we created a reusable basis we will prolong with stronger evaluations (claim-to-evidence protection scoring, adversarial red-teaming, and regression assessments) to constantly harden the system because it scales to new domains and bigger corpora.
Try the FULL CODES right here. Additionally, be happy to comply with us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be a part of us on telegram as effectively.



