Advanced AI Agents: Multi-Agent Systems, Observability, Evals & Data Pipelines
A hands-on Next.js project that walks you through advanced AI concepts — multi-agent systems with MCP, distributed tracing with OpenTelemetry, building a native eval framework, agent topics for context-free pipelines, and the theory behind AI agent data pipelines.
Complete Tutorial Code
Follow along with the complete source code for this advanced AI agent tutorial. Includes five chapters covering multi-agent systems, OpenTelemetry tracing, evals, agent topics, and data pipeline theory.
View on GitHubTable of Contents
Introduction
Prerequisites
This tutorial is a continuation of the AI Agent Tutorial, which covers building AI agents from scratch — from a simple streaming chatbot to a LangGraph-powered agent with database tools served over MCP. Complete that tutorial first before proceeding here.
Once you have the fundamentals of AI agents down — streaming chatbots, tool-calling, MCP, and LangGraph — the next frontier is building systems that are robust, observable, and scalable. This tutorial picks up where the AI Agent Tutorial left off and introduces five advanced concepts that separate production-grade AI systems from prototypes.
You'll build a multi-agent content pipeline where an Orchestrator Agent coordinates a Researcher, Writer, and Editor — all via MCP. Then you'll add OpenTelemetry tracing to see exactly what's happening inside, build a native eval framework to measure quality, refactor the pipeline to use database-backed "agent topics" to eliminate context bloat, and finally explore the theory behind AI agent data pipelines.
Getting Started
Requirements
- Node.js (v18 or higher)
- OpenAI API key
- Completed the AI Agent Tutorial
- Docker (for Chapter 2 — Jaeger)
Installation Steps
- 1Clone the repository:
git clone https://github.com/audoir/advanced-ai-tutorial.git - 2Install dependencies:
npm install - 3Configure environment:
OPENAI_API_KEY=sk-...Create a
.env.localfile with your OpenAI API key - 4Start the dev server:
npm run devOpen http://localhost:3000 in your browser.
Key Dependencies
| Package | Purpose |
|---|---|
ai | Vercel AI SDK — generateText, streamText |
@ai-sdk/openai | OpenAI provider |
@ai-sdk/mcp | MCP client for the AI SDK |
mcp-handler | MCP server handler for Next.js |
@modelcontextprotocol/sdk | Official MCP TypeScript SDK |
better-sqlite3 | Synchronous SQLite driver |
zod | Schema validation for tool inputs |
Chapter 1: Multi-Agent Systems
A multi-agent system is a setup where multiple AI agents — each with a specialized role — collaborate to complete a task that would be difficult for a single agent to do well alone. Rather than hardcoding the order of agent calls, an Orchestrator Agent dynamically decides which specialist agents to invoke, in what order, and what to pass between them — all via MCP tool calls.
Multi-Agent vs. Single-Agent
Single-Agent System
User prompt → 🤖 Agent → Response
Advantages
- ✅ Simple to build and debug — one prompt, one model, one output
- ✅ Low latency — no coordination overhead between agents
- ✅ Cheap — fewer LLM calls means lower token costs
- ✅ Predictable — easier to reason about what the model will do
- ✅ No coordination failures — nothing to go wrong between agents
Disadvantages
- ❌ Context window limits — stuffing everything into one prompt hits token limits fast
- ❌ Jack of all trades, master of none
- ❌ Hard to parallelize — one agent works sequentially
- ❌ Brittle for complex tasks — long chains of reasoning degrade in quality
Multi-Agent System
User prompt → 🤖 Orchestrator → 🔍 Researcher / ✍️ Writer / 📝 Editor → Final response
Advantages
- ✅ Specialization — each agent is tuned for one job
- ✅ Parallelism — independent agents can run concurrently
- ✅ Scalability — adding a new capability means adding a new agent
- ✅ Longer effective context — each agent only sees relevant context
- ✅ Dynamic orchestration — adapts at runtime
- ✅ Separation of concerns — easier to test and upgrade individual agents
Disadvantages
- ❌ Complexity — more moving parts means more things that can go wrong
- ❌ Higher latency — each hop adds network and inference time
- ❌ Higher cost — more LLM calls = more tokens = higher API bills
- ❌ Error propagation — bad output from an early agent can cascade
- ❌ Harder to debug — tracing failures across multiple agents is more involved
Architecture: Two Layers of MCP
This chapter uses two MCP servers stacked together. The Orchestrator Agent connects to the Agent MCP server, which exposes three specialist agents as tools. The Researcher Agent (running inside the Agent MCP server) connects to the Database MCP server to query real data.
User prompt
↓
POST /api/multi-agent
↓
🤖 Orchestrator Agent (gpt + agent MCP tools from /api/mcp/agents/mcp)
│
├── calls researcher_agent(topic)
│ └── Researcher Agent queries /api/mcp/database/mcp (database tools)
│ → returns research report
│
├── calls writer_agent(topic, research)
│ └── Writer Agent drafts a blog post
│ → returns article draft
│
└── calls editor_agent(draft)
└── Editor Agent reviews and polishes
→ returns final article + editorial notes
↓
Orchestrator synthesizes and streams final response to userMCP Server Layers
| MCP Server | Route | Exposes |
|---|---|---|
| Database MCP | /api/mcp/database/mcp | inventory, customers, sales SQL tools |
| Agent MCP | /api/mcp/agents/mcp | researcher_agent, writer_agent, editor_agent tools |
The Specialist Agents
🔍 researcher_agent — Queries the SQLite database via MCP, returns a data report✍️ writer_agent — Takes topic + research, writes a 400–600 word blog post draft📝 editor_agent — Reviews the draft, returns editorial feedback + polished final articleWhy Orchestrator + MCP Instead of a Sequential Pipeline?
A traditional sequential pipeline hardcodes the order: researcher() → writer(research) → editor(draft). The Orchestrator Agent via MCP reasons about which tools to call and in what order. It can adapt — calling the researcher twice if the first result is insufficient, or skipping the editor for a simple summary request.
Key Files
| File | Purpose |
|---|---|
app/api/multi-agent/route.ts | Orchestrator Agent — connects to Agent MCP, streams response |
app/api/mcp/agents/[transport]/route.ts | Agent MCP server — exposes 3 specialist agents as tools |
app/api/mcp/database/[transport]/route.ts | Database MCP server — exposes SQL tools |
lib/db.ts | SQLite database setup and seeding |
lib/sql-tools.ts | Shared SQL tool definitions |
lib/prompts.ts | Shared agent system prompts and user prompt templates |
The Orchestrator Agent Route
The Orchestrator connects to the Agent MCP server, fetches the available tools, and uses streamText to stream the response token-by-token to the browser. The stopWhen: stepCountIs(20) cap prevents infinite loops.
// app/api/multi-agent/route.ts
export async function POST(req: Request) {
const { prompt, sessionId } = await req.json();
const db = getDb();
const messages = initChatSession(db, sessionId, prompt);
// Connect to the Agent MCP server
const agentMcpClient = await createMCPClient({
transport: { type: "http", url: "http://localhost:3000/api/mcp/agents/mcp" },
});
const agentTools = await agentMcpClient.tools();
const result = streamText({
model: openai(DEFAULT_MODEL),
system: orchestratorSystemPrompt(toolSummary),
messages,
stopWhen: stepCountIs(20),
tools: agentTools,
onFinish: async ({ text }) => {
await agentMcpClient.close();
saveAssistantMessage(db, sessionId, text);
},
});
return result.toUIMessageStreamResponse();
}Chapter 2: Observability with OpenTelemetry
Observability is the ability to understand what's happening inside your system by examining its outputs — without having to add new instrumentation every time something goes wrong.
For AI agent systems, observability is especially important because a single user request can trigger 5–15 LLM calls across the Orchestrator, Researcher, Writer, and Editor agents. Without tracing you can't tell which step is slow, which agent is burning the most tokens, or where a failure originated.
This chapter adds OpenTelemetry (OTel) tracing to the multi-agent system from Chapter 1 and visualizes the traces in Jaeger. The AI SDK's experimental_telemetry option emits spans automatically for every generateText and streamText call.
What is OpenTelemetry?
OpenTelemetry is an open-source, vendor-neutral observability framework. It provides traces — a record of the path a request takes through your system, broken into spans (individual units of work). A trace is a tree of spans:
Trace: "Write a blog post about electronics"
│
├── [span] orchestrator.handleRequest ~12s
│ ├── [span] ai.streamText (orchestrator-agent) ~12s
│ │ ├── [span] ai.toolCall: researcher_agent ~5s
│ │ ├── [span] ai.toolCall: writer_agent ~4s
│ │ └── [span] ai.toolCall: editor_agent ~3s
│
├── [span] researcher_agent.run ~5s
│ └── [span] ai.generateText (researcher-agent) ~5s
│ ├── [span] ai.toolCall: inventory ~0.1s
│ ├── [span] ai.toolCall: sales ~0.1s
│ └── [span] ai.toolCall: customers ~0.1s
│
├── [span] writer_agent.run ~4s
│ └── [span] ai.generateText (writer-agent) ~4s
│
└── [span] editor_agent.run ~3s
└── [span] ai.generateText (editor-agent) ~3sArchitecture
Next.js App (instrumented with OTel)
│
│ OTLP/HTTP (port 4318)
▼
Jaeger all-in-one
│
▼
Jaeger UI → http://localhost:16686Step 1: Start Jaeger
Jaeger ships as a single Docker image. Start it with:
docker run --rm --name jaeger \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
-p 5778:5778 \
-p 9411:9411 \
cr.jaegertracing.io/jaegertracing/jaeger:2.18.0Open http://localhost:16686 — you should see the Jaeger UI (empty for now). Port 4318 is the OTLP/HTTP receiver used by this app.
Step 2: Install OpenTelemetry Packages
npm install \
@opentelemetry/sdk-node \
@opentelemetry/resources \
@opentelemetry/semantic-conventions \
@opentelemetry/sdk-trace-node \
@opentelemetry/exporter-trace-otlp-http \
@opentelemetry/apiStep 3: Create the Instrumentation Files
Next.js automatically loads instrumentation.ts from the project root before any route handlers run. This is the entry point for OTel setup.
instrumentation.ts — Entry Point
// instrumentation.ts
export async function register() {
if (process.env.NEXT_RUNTIME === "nodejs") {
await import("./instrumentation.node");
}
}The NEXT_RUNTIME === "nodejs" guard ensures the OTel SDK is only loaded in the Node.js runtime (not the Edge runtime, which doesn't support Node.js APIs).
instrumentation.node.ts — OTel SDK Config
const sdk = new NodeSDK({
resource: resourceFromAttributes({
[ATTR_SERVICE_NAME]: "advanced-ai-tutorial",
}),
spanProcessor: new SimpleSpanProcessor(
new OTLPTraceExporter({
url: "http://localhost:4318/v1/traces",
}),
),
});
sdk.start();SimpleSpanProcessor exports each span immediately — ideal for development. In production, use BatchSpanProcessor for efficiency.
Step 4: Enable Telemetry on AI SDK Calls
The AI SDK's telemetry is opt-in per call via the experimental_telemetry option:
const result = streamText({
model: openai(DEFAULT_MODEL),
experimental_telemetry: {
isEnabled: true,
functionId: "orchestrator-agent", // shown as resource.name in Jaeger
metadata: { sessionId }, // custom attributes on the span
},
// ... rest of options
});Fixing Orphaned Spans with Context Propagation
The specialist agents are invoked via HTTP fetch calls through the MCP protocol. The OTel SDK's automatic HTTP instrumentation only patches Node's built-in http/https modules — it does not patch the Web fetch API. Without manual context propagation, each agent produces an orphaned trace with no parent. The fix is two-sided:
Orchestrator: Inject Context
// Serialize OTel context into headers
const traceCarrier: Record<string, string> = {};
propagation.inject(context.active(), traceCarrier);
const agentMcpClient = await createMCPClient({
transport: {
type: "http",
url: ".../api/mcp/agents-otel/mcp",
headers: traceCarrier, // ← inject traceparent
},
});Agent MCP: Extract Context
// Extract parent context from request headers
const carrier: Record<string, string> = {};
request.headers.forEach((v, k) => { carrier[k] = v; });
const parentContext = propagation.extract(
context.active(), carrier
);
// Restore parent context for all child spans
return context.with(parentContext, () =>
tracer.startActiveSpan("researcher_agent.run", ...)
);After this change, all four agents produce one unified trace in Jaeger. The full waterfall — from the Orchestrator's handleRequest span down through every ai.generateText and ai.toolCall — is visible in a single trace view.
Key Files
| File | Purpose |
|---|---|
instrumentation.ts | Next.js entry point — loads OTel before any routes run |
instrumentation.node.ts | Configures the OTel SDK to export to Jaeger via OTLP/HTTP |
app/api/multi-agent-otel/route.ts | Orchestrator Agent with OTel spans + context propagation |
app/api/mcp/agents-otel/[transport]/route.ts | Agent MCP server with OTel spans + context extraction |
Chapter 3: Evals for AI Agents
Evaluations (evals) are systematic tests that measure how well AI models and agents perform at specific tasks. Like traditional unit, integration, and end-to-end tests, evals ensure your code remains reliable and stable. However, they differ in one substantial way: the underlying system being tested is non-deterministic — outputs can vary slightly or significantly between runs.
This chapter builds a native eval framework — no third-party tools — for the multi-agent system from Chapters 1 and 2.
"For many AI apps, developers run a few examples and check if the outputs 'feel right' before shipping. It's all about the 'vibes.' Unfortunately, this vibe-based approach doesn't scale."
— Vercel, An Introduction to Evals
Third-Party Eval Platforms vs. Building Your Own
Third-Party Platforms (Braintrust, LangSmith, etc.)
Advantages
- ✅ Faster time-to-value — upload a dataset and run in minutes
- ✅ Rich UI out of the box — side-by-side diffs, score trends
- ✅ Experiment tracking — compare models and prompts with a click
- ✅ Collaboration — non-engineers can browse results and add test cases
- ✅ Managed LLM-as-judge — pre-built judge prompts
Disadvantages
- ❌ Vendor lock-in — datasets and history live on their platform
- ❌ Cost — charges per eval run, per seat, or per stored trace
- ❌ Data privacy — every input/output pair sent to a third-party server
- ❌ Less control over scoring logic
Building Your Own (This Chapter)
Advantages
- ✅ Full control — scorers, runners, datasets are plain TypeScript
- ✅ No data leaves your system — critical for privacy-sensitive apps
- ✅ Zero marginal cost — running 10,000 evals costs the same as 10
- ✅ No vendor dependency — evolves with your codebase
- ✅ Offline / CI-friendly — runs anywhere Node.js runs
Disadvantages
- ❌ Upfront investment — you write the runner, scorer, dataset, and UI
- ❌ No experiment history by default — need to add storage
- ❌ No collaboration UI for non-engineers
- ❌ LLM-as-judge requires more work — you write the judge prompt
The Three Components of an Eval System
Dataset → Runner → Scorer → Metrics1. Dataset
lib/evals/dataset.ts
12 test cases across 3 suites. Each case has an input, description, and an array of composable checks (pure functions that return true/false).
2. Runner
lib/evals/runner.ts
Model-agnostic harness that feeds inputs to the agent and collects outputs. Two runners: one for the researcher agent directly, one for the full pipeline.
3. Scorer
lib/evals/scorer.ts
Code-based scoring (fast, deterministic, zero cost) plus opt-in LLM-as-judge scoring for qualitative dimensions like relevance, accuracy, coherence, and completeness.
Built-in Scorer Helper Functions
// Composable scorer helpers — each returns (output: string) => boolean
containsAll("electronics", "revenue") // output must contain ALL keywords
containsAny("keyboard", "headphone") // output must contain AT LEAST ONE
containsNone("table dropped") // output must NOT contain any
matchesRegex(/\$[\d,]+\.\d{2}/) // output must match the regex
lengthBetween(500, 8000) // output length must be in range
hasMarkdownHeadings(2) // output must have ≥ 2 ## headings
containsDollarAmount() // output must contain $12.99-style amount
containsNumber() // output must contain a numberThe Three Eval Suites
🔍 Researcher Suite (5 tests)
Tests the researcher agent's ability to query the database and return factual data. Fast — calls the researcher directly, no writer or editor.
📝 Pipeline Suite (4 tests)
End-to-end tests of the full orchestrator pipeline (researcher → writer → editor). Verifies the complete content generation workflow including markdown structure and length.
🛡️ Safety Suite (3 tests)
Tests that the agent refuses or redirects harmful and off-topic requests — SQL injection attempts, off-topic queries, and hallucination prevention.
LLM-as-Judge Scoring
For qualitative dimensions that are hard to express as code, an opt-in LLM-as-judge scorer evaluates outputs on four dimensions:
const LlmJudgeSchema = z.object({
passed: z.boolean(),
scores: z.object({
relevance: z.number().min(0).max(1),
accuracy: z.number().min(0).max(1),
coherence: z.number().min(0).max(1),
completeness: z.number().min(0).max(1),
}),
reasoning: z.string(),
});Running Evals
# List all test cases (no LLM calls)
curl http://localhost:3000/api/evals
# Run all suites
curl -X POST http://localhost:3000/api/evals \
-H "Content-Type: application/json" \
-d '{}'
# Run a specific suite
curl -X POST http://localhost:3000/api/evals \
-H "Content-Type: application/json" \
-d '{"suite": "researcher"}'
# Run with LLM-as-judge
curl -X POST http://localhost:3000/api/evals \
-H "Content-Type: application/json" \
-d '{"suite": "pipeline", "llmJudge": true}'Key Files
| File | Purpose |
|---|---|
lib/evals/dataset.ts | 12 test cases across 3 suites with composable code-based checker functions |
lib/evals/runner.ts | Model-agnostic harness with two runners (researcher direct, full pipeline) |
lib/evals/scorer.ts | Code-based scorer, LLM-as-judge scorer, aggregate metrics, regression detection |
app/api/evals/route.ts | HTTP API for running evals programmatically or from CI |
app/components/EvalsRunner.tsx | Browser UI with suite selector, results table, and metrics dashboard |
Chapter 4: Agent Topics
The Problem with Chapter 1
In Chapter 1, the Orchestrator Agent passes the output of one specialist agent directly as a string argument to the next agent's tool call. This works, but it has several problems:
| Problem | Impact |
|---|---|
| Context window bloat | The full research report is copy-pasted into the Orchestrator's context before being passed to the writer. For long outputs, this burns tokens fast. |
| No persistence | Intermediate outputs exist only in the Orchestrator's in-flight context. If the pipeline fails mid-way, all intermediate work is lost. |
| Not inspectable | There's no record of what the researcher produced, what the writer drafted, or when each step completed. |
| Not resumable | If the writer fails, you have to re-run the researcher too — there's no way to resume from the last successful step. |
The Solution: Agent Topics
An agent topic is a named slot in the database where an agent writes its output. The next agent reads from that topic directly — it doesn't receive the data as a function argument. This pattern is directly inspired by pub/sub (publish-subscribe) messaging systems like Kafka, Redis Pub/Sub, or Google Pub/Sub.
Pub/Sub Analogy
| Pub/Sub concept | Agent topics equivalent |
|---|---|
| Publisher | An agent that writes its output to a named topic |
| Subscriber | An agent that reads from a topic as its input |
| Topic | A named row in the agent_topics table, keyed by (run_id, topic_name) |
| Message broker | SQLite — the database acts as the shared message store |
| Message | The agent's full output (research report, draft, final article) |
Chapter 1 vs. Chapter 4: Side by Side
Chapter 1 — Orchestrator passes full content
// Orchestrator context grows with each step:
[tool call: researcher_agent("electronics")]
[tool result: "## Research Report
...(2,000 chars)..."]
[tool call: writer_agent("electronics",
"## Research Report
...(2,000 chars pasted again)...")]
[tool result: "# The Electronics Revolution
...(3,000 chars)..."]
[tool call: editor_agent(
"# The Electronics Revolution
...(3,000 chars pasted again)...")]Chapter 4 — Orchestrator passes runId + topic names
// Orchestrator context stays small:
[tool call: researcher_agent("electronics", "run_abc",
writeTopic="research")]
[tool result: "Research complete. Written to topic
research:run_abc (1842 chars)"]
[tool call: writer_agent("electronics", "run_abc",
readTopic="research", writeTopic="draft")]
[tool result: "Draft complete. Written to topic
draft:run_abc (2931 chars)"]
[tool call: editor_agent("run_abc",
readTopic="draft", writeTopic="final")]
[tool result: "Editing complete. Written to topic
final:run_abc (3204 chars)"]The agent_topics Table
CREATE TABLE agent_topics (
id TEXT PRIMARY KEY, -- "{runId}:{topicName}"
run_id TEXT NOT NULL, -- groups all topics for one pipeline run
topic_name TEXT NOT NULL, -- "research" | "draft" | "final"
content TEXT NOT NULL, -- the agent's output
agent_name TEXT NOT NULL, -- which agent wrote this
created_at TEXT DEFAULT (datetime('now')),
UNIQUE(run_id, topic_name) -- one value per topic per run
)Benefits of Agent Topics
Topic Helper Functions
function writeTopic(runId: string, topicName: string, content: string, agentName: string) {
const id = `${runId}:${topicName}`;
db.prepare(`
INSERT INTO agent_topics (id, run_id, topic_name, content, agent_name)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(run_id, topic_name) DO UPDATE SET content = excluded.content
`).run(id, runId, topicName, content, agentName);
}
function readTopic(runId: string, topicName: string): string | null {
const row = db.prepare(
"SELECT content FROM agent_topics WHERE run_id = ? AND topic_name = ?"
).get(runId, topicName) as { content: string } | undefined;
return row?.content ?? null;
}Extending the Pattern
Parallel agents (fan-out)
// Run researcher and fact-checker in parallel
await Promise.all([
runResearcher(runId, prompt), // writes to topic "research"
runFactChecker(runId, prompt), // writes to topic "facts"
]);
// Writer reads from both topics
const research = readTopic(runId, "research");
const facts = readTopic(runId, "facts");Resumable pipelines
// Only run the researcher if the topic is empty
const existingResearch = readTopic(runId, "research");
if (!existingResearch) {
await runResearcher(runId, prompt);
}
// Always run the writer (it reads from the topic)
await runWriter(runId, prompt);Key Files
| File | Purpose |
|---|---|
app/api/topic-pipeline/route.ts | Orchestrator Agent — connects to Topic Agent MCP, streams response |
app/api/mcp/agents-topic/[transport]/route.ts | Topic Agent MCP server — exposes 3 topic-aware specialist agents as tools |
app/components/TopicPipeline.tsx | UI — streams Orchestrator narration + polls DB for topic contents |
Chapter 5: Data Pipelines for AI Agents
A data pipeline is a system that moves data from one place to another — collecting it from sources, transforming it into a usable format, and delivering it to a destination. In traditional software, this usually means an ETL (Extract, Transform, Load) process. But when AI agents enter the picture, the concept of a data pipeline changes fundamentally.
Traditional Data Pipelines vs. AI Agent Data Pipelines
| Dimension | Traditional (ETL) Pipeline | AI Agent Data Pipeline |
|---|---|---|
| Direction | Linear (source → warehouse) | Cyclical (agent → action → feedback → agent) |
| Timing | Batch (hourly, nightly) | Real-time or near-real-time |
| Data types | Structured (tables, rows, columns) | Unstructured (text, audio, images, embeddings) |
| Consumer | Human analysts, dashboards | AI models (LLMs, embedders, classifiers) |
| State | Stateless between runs | Stateful — agents maintain memory across steps |
| Feedback | None — data flows one way | Continuous — action results feed back into the pipeline |
The key insight: traditional pipelines move data for humans to analyze. AI agent pipelines move data for machines to act on.
The Six Stages of an AI Agent Data Pipeline
┌─────────────────────────────────────────────────────────────────┐
│ AI Agent Data Pipeline │
│ │
│ 1. Ingest → 2. Process → 3. Store → 4. Retrieve │
│ ↓ │
│ 6. Act & Loop ← 5. Reason │
└─────────────────────────────────────────────────────────────────┘Memory Architecture: The Three-Tier Model
Short-Term (Working)
- • Current conversation
- • Active task state
- • Recent tool results
Storage: Redis, context window
Needs sub-millisecond latency — accessed dozens of times per reasoning loop.
Long-Term (Persistent)
- • Episodic facts
- • Semantic knowledge
- • User preferences
- • Past actions
Storage: Vector DB, SQL
Needs semantic search capability — find relevant past knowledge by meaning.
Operational State
- • Task progress
- • Pipeline state
- • Intermediate results
Storage: SQLite, DB
Tracks what the pipeline is doing right now — exactly what Agent Topics implements.
How This Tutorial Maps to the Pipeline
End-to-End Example: A Customer Support Agent
Scenario: "Where is my order #12345?"
Stage 1 — Ingest
Email arrives → pipeline extracts text: "Where is my order #12345?"
Stage 2 — Process
Parse email → extract order number "12345"
Convert query to embedding: [0.31, -0.72, 0.58, ...]
Stage 3 — Store (already done in advance)
Company shipping policies → chunked → embedded → stored in Vector DB
Past customer interactions → stored in long-term memory
Stage 4 — Retrieve (RAG)
Search Vector DB with query embedding
→ retrieves: "Standard shipping takes 3-5 business days"
→ queries Shopify API: Order #12345 shipped yesterday, tracking: 1Z999AA1
Stage 5 — Reason
LLM receives contextualized prompt → drafts reply
Stage 6 — Act & Loop
Agent sends reply email to customer
Pipeline logs action → stored in long-term memory
→ Agent remembers helping this customer in future interactionsFurther Reading
Conclusion
This tutorial demonstrates that building production-grade AI agent systems requires more than just wiring up an LLM with tools. You need observability to understand what's happening, evals to measure quality, and architectural patterns like agent topics to keep pipelines scalable and maintainable.
Each chapter in this tutorial implements a piece of the AI agent data pipeline — from retrieval and reasoning (Chapter 1) to storage and operational state (Chapter 4) — giving you a complete picture of what it takes to build AI agents that work reliably in production.
Learning Outcomes
By working through this tutorial, you will have gained practical experience with:
- • Building multi-agent systems with an Orchestrator and specialist agents via stacked MCP servers
- • Adding distributed tracing to AI agents with OpenTelemetry and Jaeger
- • Propagating OTel trace context across HTTP boundaries (fixing orphaned spans)
- • Building a native eval framework with datasets, runners, scorers, and LLM-as-judge
- • Implementing the agent topics pattern to eliminate context bloat in multi-agent pipelines
- • Understanding the six stages of an AI agent data pipeline and how they differ from traditional ETL
- • Designing resumable, inspectable, and decoupled agent pipelines
About the Author
Wayne Cheng is the founder and AI app developer at Audoir, LLC. Prior to founding Audoir, he worked as a hardware design engineer for Silicon Valley startups and an audio engineer for creative organizations. He holds an MSEE from UC Davis and a Music Technology degree from Foothill College.
Further Exploration
Explore the complete tutorial repository and experiment with extending the examples. Consider adding new specialist agents, connecting to external APIs, or implementing human-in-the-loop approval steps to deepen your understanding of advanced AI agent architectures.
New to AI agents? Start with the AI Agent Tutorial first, which covers building agents from scratch — from a simple streaming chatbot to a LangGraph-powered agent with MCP tools.
For more AI-powered development tools and tutorials, visit Audoir .