Technology

Advanced AI Agents: Multi-Agent Systems, Observability, Evals & Data Pipelines

A hands-on Next.js project that walks you through advanced AI concepts — multi-agent systems with MCP, distributed tracing with OpenTelemetry, building a native eval framework, agent topics for context-free pipelines, and the theory behind AI agent data pipelines.

25 min read
Published

Complete Tutorial Code

Follow along with the complete source code for this advanced AI agent tutorial. Includes five chapters covering multi-agent systems, OpenTelemetry tracing, evals, agent topics, and data pipeline theory.

View on GitHub

Introduction

⚠️

Prerequisites

This tutorial is a continuation of the AI Agent Tutorial, which covers building AI agents from scratch — from a simple streaming chatbot to a LangGraph-powered agent with database tools served over MCP. Complete that tutorial first before proceeding here.

Once you have the fundamentals of AI agents down — streaming chatbots, tool-calling, MCP, and LangGraph — the next frontier is building systems that are robust, observable, and scalable. This tutorial picks up where the AI Agent Tutorial left off and introduces five advanced concepts that separate production-grade AI systems from prototypes.

You'll build a multi-agent content pipeline where an Orchestrator Agent coordinates a Researcher, Writer, and Editor — all via MCP. Then you'll add OpenTelemetry tracing to see exactly what's happening inside, build a native eval framework to measure quality, refactor the pipeline to use database-backed "agent topics" to eliminate context bloat, and finally explore the theory behind AI agent data pipelines.

Getting Started

Requirements

  • Node.js (v18 or higher)
  • OpenAI API key
  • Completed the AI Agent Tutorial
  • Docker (for Chapter 2 — Jaeger)

Installation Steps

  1. 1
    Clone the repository:
    git clone https://github.com/audoir/advanced-ai-tutorial.git
  2. 2
    Install dependencies:
    npm install
  3. 3
    Configure environment:
    OPENAI_API_KEY=sk-...

    Create a .env.local file with your OpenAI API key

  4. 4
    Start the dev server:
    npm run dev

    Open http://localhost:3000 in your browser.

Key Dependencies

PackagePurpose
aiVercel AI SDK — generateText, streamText
@ai-sdk/openaiOpenAI provider
@ai-sdk/mcpMCP client for the AI SDK
mcp-handlerMCP server handler for Next.js
@modelcontextprotocol/sdkOfficial MCP TypeScript SDK
better-sqlite3Synchronous SQLite driver
zodSchema validation for tool inputs

Chapter 1: Multi-Agent Systems

A multi-agent system is a setup where multiple AI agents — each with a specialized role — collaborate to complete a task that would be difficult for a single agent to do well alone. Rather than hardcoding the order of agent calls, an Orchestrator Agent dynamically decides which specialist agents to invoke, in what order, and what to pass between them — all via MCP tool calls.

Multi-Agent vs. Single-Agent

Single-Agent System

User prompt → 🤖 Agent → Response

Advantages
  • ✅ Simple to build and debug — one prompt, one model, one output
  • ✅ Low latency — no coordination overhead between agents
  • ✅ Cheap — fewer LLM calls means lower token costs
  • ✅ Predictable — easier to reason about what the model will do
  • ✅ No coordination failures — nothing to go wrong between agents
Disadvantages
  • ❌ Context window limits — stuffing everything into one prompt hits token limits fast
  • ❌ Jack of all trades, master of none
  • ❌ Hard to parallelize — one agent works sequentially
  • ❌ Brittle for complex tasks — long chains of reasoning degrade in quality

Multi-Agent System

User prompt → 🤖 Orchestrator → 🔍 Researcher / ✍️ Writer / 📝 Editor → Final response

Advantages
  • ✅ Specialization — each agent is tuned for one job
  • ✅ Parallelism — independent agents can run concurrently
  • ✅ Scalability — adding a new capability means adding a new agent
  • ✅ Longer effective context — each agent only sees relevant context
  • ✅ Dynamic orchestration — adapts at runtime
  • ✅ Separation of concerns — easier to test and upgrade individual agents
Disadvantages
  • ❌ Complexity — more moving parts means more things that can go wrong
  • ❌ Higher latency — each hop adds network and inference time
  • ❌ Higher cost — more LLM calls = more tokens = higher API bills
  • ❌ Error propagation — bad output from an early agent can cascade
  • ❌ Harder to debug — tracing failures across multiple agents is more involved

Architecture: Two Layers of MCP

This chapter uses two MCP servers stacked together. The Orchestrator Agent connects to the Agent MCP server, which exposes three specialist agents as tools. The Researcher Agent (running inside the Agent MCP server) connects to the Database MCP server to query real data.

User prompt
    ↓
POST /api/multi-agent
    ↓
🤖 Orchestrator Agent  (gpt + agent MCP tools from /api/mcp/agents/mcp)
    │
    ├── calls researcher_agent(topic)
    │       └── Researcher Agent queries /api/mcp/database/mcp (database tools)
    │           → returns research report
    │
    ├── calls writer_agent(topic, research)
    │       └── Writer Agent drafts a blog post
    │           → returns article draft
    │
    └── calls editor_agent(draft)
            └── Editor Agent reviews and polishes
                → returns final article + editorial notes
    ↓
Orchestrator synthesizes and streams final response to user

MCP Server Layers

MCP ServerRouteExposes
Database MCP/api/mcp/database/mcpinventory, customers, sales SQL tools
Agent MCP/api/mcp/agents/mcpresearcher_agent, writer_agent, editor_agent tools

The Specialist Agents

🔍 researcher_agent — Queries the SQLite database via MCP, returns a data report
✍️ writer_agent — Takes topic + research, writes a 400–600 word blog post draft
📝 editor_agent — Reviews the draft, returns editorial feedback + polished final article

Why Orchestrator + MCP Instead of a Sequential Pipeline?

A traditional sequential pipeline hardcodes the order: researcher() → writer(research) → editor(draft). The Orchestrator Agent via MCP reasons about which tools to call and in what order. It can adapt — calling the researcher twice if the first result is insufficient, or skipping the editor for a simple summary request.

Key Files

FilePurpose
app/api/multi-agent/route.tsOrchestrator Agent — connects to Agent MCP, streams response
app/api/mcp/agents/[transport]/route.tsAgent MCP server — exposes 3 specialist agents as tools
app/api/mcp/database/[transport]/route.tsDatabase MCP server — exposes SQL tools
lib/db.tsSQLite database setup and seeding
lib/sql-tools.tsShared SQL tool definitions
lib/prompts.tsShared agent system prompts and user prompt templates

The Orchestrator Agent Route

The Orchestrator connects to the Agent MCP server, fetches the available tools, and uses streamText to stream the response token-by-token to the browser. The stopWhen: stepCountIs(20) cap prevents infinite loops.

// app/api/multi-agent/route.ts
export async function POST(req: Request) {
  const { prompt, sessionId } = await req.json();
  const db = getDb();
  const messages = initChatSession(db, sessionId, prompt);

  // Connect to the Agent MCP server
  const agentMcpClient = await createMCPClient({
    transport: { type: "http", url: "http://localhost:3000/api/mcp/agents/mcp" },
  });
  const agentTools = await agentMcpClient.tools();

  const result = streamText({
    model: openai(DEFAULT_MODEL),
    system: orchestratorSystemPrompt(toolSummary),
    messages,
    stopWhen: stepCountIs(20),
    tools: agentTools,
    onFinish: async ({ text }) => {
      await agentMcpClient.close();
      saveAssistantMessage(db, sessionId, text);
    },
  });

  return result.toUIMessageStreamResponse();
}
POST /api/multi-agentAgent MCPDatabase MCPstreamTextstepCountIs

Chapter 2: Observability with OpenTelemetry

Observability is the ability to understand what's happening inside your system by examining its outputs — without having to add new instrumentation every time something goes wrong.

For AI agent systems, observability is especially important because a single user request can trigger 5–15 LLM calls across the Orchestrator, Researcher, Writer, and Editor agents. Without tracing you can't tell which step is slow, which agent is burning the most tokens, or where a failure originated.

This chapter adds OpenTelemetry (OTel) tracing to the multi-agent system from Chapter 1 and visualizes the traces in Jaeger. The AI SDK's experimental_telemetry option emits spans automatically for every generateText and streamText call.

What is OpenTelemetry?

OpenTelemetry is an open-source, vendor-neutral observability framework. It provides traces — a record of the path a request takes through your system, broken into spans (individual units of work). A trace is a tree of spans:

Trace: "Write a blog post about electronics"
│
├── [span] orchestrator.handleRequest                  ~12s
│     ├── [span] ai.streamText (orchestrator-agent)    ~12s
│     │     ├── [span] ai.toolCall: researcher_agent   ~5s
│     │     ├── [span] ai.toolCall: writer_agent       ~4s
│     │     └── [span] ai.toolCall: editor_agent       ~3s
│
├── [span] researcher_agent.run                        ~5s
│     └── [span] ai.generateText (researcher-agent)    ~5s
│           ├── [span] ai.toolCall: inventory          ~0.1s
│           ├── [span] ai.toolCall: sales              ~0.1s
│           └── [span] ai.toolCall: customers          ~0.1s
│
├── [span] writer_agent.run                            ~4s
│     └── [span] ai.generateText (writer-agent)        ~4s
│
└── [span] editor_agent.run                            ~3s
      └── [span] ai.generateText (editor-agent)        ~3s

Architecture

Next.js App (instrumented with OTel)
    │
    │  OTLP/HTTP  (port 4318)
    ▼
Jaeger all-in-one
    │
    ▼
Jaeger UI  →  http://localhost:16686

Step 1: Start Jaeger

Jaeger ships as a single Docker image. Start it with:

docker run --rm --name jaeger \
  -p 16686:16686 \
  -p 4317:4317 \
  -p 4318:4318 \
  -p 5778:5778 \
  -p 9411:9411 \
  cr.jaegertracing.io/jaegertracing/jaeger:2.18.0

Open http://localhost:16686 — you should see the Jaeger UI (empty for now). Port 4318 is the OTLP/HTTP receiver used by this app.

Step 2: Install OpenTelemetry Packages

npm install \
  @opentelemetry/sdk-node \
  @opentelemetry/resources \
  @opentelemetry/semantic-conventions \
  @opentelemetry/sdk-trace-node \
  @opentelemetry/exporter-trace-otlp-http \
  @opentelemetry/api

Step 3: Create the Instrumentation Files

Next.js automatically loads instrumentation.ts from the project root before any route handlers run. This is the entry point for OTel setup.

instrumentation.ts — Entry Point

// instrumentation.ts
export async function register() {
  if (process.env.NEXT_RUNTIME === "nodejs") {
    await import("./instrumentation.node");
  }
}

The NEXT_RUNTIME === "nodejs" guard ensures the OTel SDK is only loaded in the Node.js runtime (not the Edge runtime, which doesn't support Node.js APIs).

instrumentation.node.ts — OTel SDK Config

const sdk = new NodeSDK({
  resource: resourceFromAttributes({
    [ATTR_SERVICE_NAME]: "advanced-ai-tutorial",
  }),
  spanProcessor: new SimpleSpanProcessor(
    new OTLPTraceExporter({
      url: "http://localhost:4318/v1/traces",
    }),
  ),
});

sdk.start();

SimpleSpanProcessor exports each span immediately — ideal for development. In production, use BatchSpanProcessor for efficiency.

Step 4: Enable Telemetry on AI SDK Calls

The AI SDK's telemetry is opt-in per call via the experimental_telemetry option:

const result = streamText({
  model: openai(DEFAULT_MODEL),
  experimental_telemetry: {
    isEnabled: true,
    functionId: "orchestrator-agent",   // shown as resource.name in Jaeger
    metadata: { sessionId },            // custom attributes on the span
  },
  // ... rest of options
});

Fixing Orphaned Spans with Context Propagation

The specialist agents are invoked via HTTP fetch calls through the MCP protocol. The OTel SDK's automatic HTTP instrumentation only patches Node's built-in http/https modules — it does not patch the Web fetch API. Without manual context propagation, each agent produces an orphaned trace with no parent. The fix is two-sided:

Orchestrator: Inject Context

// Serialize OTel context into headers
const traceCarrier: Record<string, string> = {};
propagation.inject(context.active(), traceCarrier);

const agentMcpClient = await createMCPClient({
  transport: {
    type: "http",
    url: ".../api/mcp/agents-otel/mcp",
    headers: traceCarrier,  // ← inject traceparent
  },
});

Agent MCP: Extract Context

// Extract parent context from request headers
const carrier: Record<string, string> = {};
request.headers.forEach((v, k) => { carrier[k] = v; });
const parentContext = propagation.extract(
  context.active(), carrier
);

// Restore parent context for all child spans
return context.with(parentContext, () =>
  tracer.startActiveSpan("researcher_agent.run", ...)
);

After this change, all four agents produce one unified trace in Jaeger. The full waterfall — from the Orchestrator's handleRequest span down through every ai.generateText and ai.toolCall — is visible in a single trace view.

Key Files

FilePurpose
instrumentation.tsNext.js entry point — loads OTel before any routes run
instrumentation.node.tsConfigures the OTel SDK to export to Jaeger via OTLP/HTTP
app/api/multi-agent-otel/route.tsOrchestrator Agent with OTel spans + context propagation
app/api/mcp/agents-otel/[transport]/route.tsAgent MCP server with OTel spans + context extraction
OpenTelemetryJaegerOTLP/HTTPexperimental_telemetrycontext propagation

Chapter 3: Evals for AI Agents

Evaluations (evals) are systematic tests that measure how well AI models and agents perform at specific tasks. Like traditional unit, integration, and end-to-end tests, evals ensure your code remains reliable and stable. However, they differ in one substantial way: the underlying system being tested is non-deterministic — outputs can vary slightly or significantly between runs.

This chapter builds a native eval framework — no third-party tools — for the multi-agent system from Chapters 1 and 2.

"For many AI apps, developers run a few examples and check if the outputs 'feel right' before shipping. It's all about the 'vibes.' Unfortunately, this vibe-based approach doesn't scale."

— Vercel, An Introduction to Evals

Third-Party Eval Platforms vs. Building Your Own

Third-Party Platforms (Braintrust, LangSmith, etc.)

Advantages
  • ✅ Faster time-to-value — upload a dataset and run in minutes
  • ✅ Rich UI out of the box — side-by-side diffs, score trends
  • ✅ Experiment tracking — compare models and prompts with a click
  • ✅ Collaboration — non-engineers can browse results and add test cases
  • ✅ Managed LLM-as-judge — pre-built judge prompts
Disadvantages
  • ❌ Vendor lock-in — datasets and history live on their platform
  • ❌ Cost — charges per eval run, per seat, or per stored trace
  • ❌ Data privacy — every input/output pair sent to a third-party server
  • ❌ Less control over scoring logic

Building Your Own (This Chapter)

Advantages
  • ✅ Full control — scorers, runners, datasets are plain TypeScript
  • ✅ No data leaves your system — critical for privacy-sensitive apps
  • ✅ Zero marginal cost — running 10,000 evals costs the same as 10
  • ✅ No vendor dependency — evolves with your codebase
  • ✅ Offline / CI-friendly — runs anywhere Node.js runs
Disadvantages
  • ❌ Upfront investment — you write the runner, scorer, dataset, and UI
  • ❌ No experiment history by default — need to add storage
  • ❌ No collaboration UI for non-engineers
  • ❌ LLM-as-judge requires more work — you write the judge prompt

The Three Components of an Eval System

Dataset  →  Runner  →  Scorer  →  Metrics

1. Dataset
lib/evals/dataset.ts

12 test cases across 3 suites. Each case has an input, description, and an array of composable checks (pure functions that return true/false).

2. Runner
lib/evals/runner.ts

Model-agnostic harness that feeds inputs to the agent and collects outputs. Two runners: one for the researcher agent directly, one for the full pipeline.

3. Scorer
lib/evals/scorer.ts

Code-based scoring (fast, deterministic, zero cost) plus opt-in LLM-as-judge scoring for qualitative dimensions like relevance, accuracy, coherence, and completeness.

Built-in Scorer Helper Functions

// Composable scorer helpers — each returns (output: string) => boolean
containsAll("electronics", "revenue")   // output must contain ALL keywords
containsAny("keyboard", "headphone")    // output must contain AT LEAST ONE
containsNone("table dropped")           // output must NOT contain any
matchesRegex(/\$[\d,]+\.\d{2}/)        // output must match the regex
lengthBetween(500, 8000)                // output length must be in range
hasMarkdownHeadings(2)                  // output must have ≥ 2 ## headings
containsDollarAmount()                  // output must contain $12.99-style amount
containsNumber()                        // output must contain a number

The Three Eval Suites

🔍 Researcher Suite (5 tests)

Tests the researcher agent's ability to query the database and return factual data. Fast — calls the researcher directly, no writer or editor.

• Best-selling electronics products
• Products generating the most revenue
• Most active customers
• Products low on stock
• Sales breakdown by product category

📝 Pipeline Suite (4 tests)

End-to-end tests of the full orchestrator pipeline (researcher → writer → editor). Verifies the complete content generation workflow including markdown structure and length.

🛡️ Safety Suite (3 tests)

Tests that the agent refuses or redirects harmful and off-topic requests — SQL injection attempts, off-topic queries, and hallucination prevention.

LLM-as-Judge Scoring

For qualitative dimensions that are hard to express as code, an opt-in LLM-as-judge scorer evaluates outputs on four dimensions:

const LlmJudgeSchema = z.object({
  passed: z.boolean(),
  scores: z.object({
    relevance:    z.number().min(0).max(1),
    accuracy:     z.number().min(0).max(1),
    coherence:    z.number().min(0).max(1),
    completeness: z.number().min(0).max(1),
  }),
  reasoning: z.string(),
});

Running Evals

# List all test cases (no LLM calls)
curl http://localhost:3000/api/evals

# Run all suites
curl -X POST http://localhost:3000/api/evals \
  -H "Content-Type: application/json" \
  -d '{}'

# Run a specific suite
curl -X POST http://localhost:3000/api/evals \
  -H "Content-Type: application/json" \
  -d '{"suite": "researcher"}'

# Run with LLM-as-judge
curl -X POST http://localhost:3000/api/evals \
  -H "Content-Type: application/json" \
  -d '{"suite": "pipeline", "llmJudge": true}'

Key Files

FilePurpose
lib/evals/dataset.ts12 test cases across 3 suites with composable code-based checker functions
lib/evals/runner.tsModel-agnostic harness with two runners (researcher direct, full pipeline)
lib/evals/scorer.tsCode-based scorer, LLM-as-judge scorer, aggregate metrics, regression detection
app/api/evals/route.tsHTTP API for running evals programmatically or from CI
app/components/EvalsRunner.tsxBrowser UI with suite selector, results table, and metrics dashboard
DatasetRunnerScorerLLM-as-judgeRegression detection

Chapter 4: Agent Topics

The Problem with Chapter 1

In Chapter 1, the Orchestrator Agent passes the output of one specialist agent directly as a string argument to the next agent's tool call. This works, but it has several problems:

ProblemImpact
Context window bloatThe full research report is copy-pasted into the Orchestrator's context before being passed to the writer. For long outputs, this burns tokens fast.
No persistenceIntermediate outputs exist only in the Orchestrator's in-flight context. If the pipeline fails mid-way, all intermediate work is lost.
Not inspectableThere's no record of what the researcher produced, what the writer drafted, or when each step completed.
Not resumableIf the writer fails, you have to re-run the researcher too — there's no way to resume from the last successful step.

The Solution: Agent Topics

An agent topic is a named slot in the database where an agent writes its output. The next agent reads from that topic directly — it doesn't receive the data as a function argument. This pattern is directly inspired by pub/sub (publish-subscribe) messaging systems like Kafka, Redis Pub/Sub, or Google Pub/Sub.

Pub/Sub Analogy

Pub/Sub conceptAgent topics equivalent
PublisherAn agent that writes its output to a named topic
SubscriberAn agent that reads from a topic as its input
TopicA named row in the agent_topics table, keyed by (run_id, topic_name)
Message brokerSQLite — the database acts as the shared message store
MessageThe agent's full output (research report, draft, final article)

Chapter 1 vs. Chapter 4: Side by Side

Chapter 1 — Orchestrator passes full content

// Orchestrator context grows with each step:
[tool call: researcher_agent("electronics")]
[tool result: "## Research Report

...(2,000 chars)..."]
[tool call: writer_agent("electronics",
  "## Research Report

...(2,000 chars pasted again)...")]
[tool result: "# The Electronics Revolution

...(3,000 chars)..."]
[tool call: editor_agent(
  "# The Electronics Revolution

...(3,000 chars pasted again)...")]

Chapter 4 — Orchestrator passes runId + topic names

// Orchestrator context stays small:
[tool call: researcher_agent("electronics", "run_abc",
  writeTopic="research")]
[tool result: "Research complete. Written to topic
  research:run_abc (1842 chars)"]
[tool call: writer_agent("electronics", "run_abc",
  readTopic="research", writeTopic="draft")]
[tool result: "Draft complete. Written to topic
  draft:run_abc (2931 chars)"]
[tool call: editor_agent("run_abc",
  readTopic="draft", writeTopic="final")]
[tool result: "Editing complete. Written to topic
  final:run_abc (3204 chars)"]

The agent_topics Table

CREATE TABLE agent_topics (
  id           TEXT PRIMARY KEY,          -- "{runId}:{topicName}"
  run_id       TEXT NOT NULL,             -- groups all topics for one pipeline run
  topic_name   TEXT NOT NULL,             -- "research" | "draft" | "final"
  content      TEXT NOT NULL,             -- the agent's output
  agent_name   TEXT NOT NULL,             -- which agent wrote this
  created_at   TEXT DEFAULT (datetime('now')),
  UNIQUE(run_id, topic_name)              -- one value per topic per run
)

Benefits of Agent Topics

No context bloat — only the topic ID (a short string) is passed between agents
Persistence — every intermediate output is stored in SQLite
Resumability — a failed pipeline can restart from the last successful topic write
Inspectability — any topic can be queried at any time with SQL
Fan-out / fan-in — multiple agents can read from the same topic, or one agent can read from multiple topics
Decoupling — agents only know which topic to read from and write to

Topic Helper Functions

function writeTopic(runId: string, topicName: string, content: string, agentName: string) {
  const id = `${runId}:${topicName}`;
  db.prepare(`
    INSERT INTO agent_topics (id, run_id, topic_name, content, agent_name)
    VALUES (?, ?, ?, ?, ?)
    ON CONFLICT(run_id, topic_name) DO UPDATE SET content = excluded.content
  `).run(id, runId, topicName, content, agentName);
}

function readTopic(runId: string, topicName: string): string | null {
  const row = db.prepare(
    "SELECT content FROM agent_topics WHERE run_id = ? AND topic_name = ?"
  ).get(runId, topicName) as { content: string } | undefined;
  return row?.content ?? null;
}

Extending the Pattern

Parallel agents (fan-out)

// Run researcher and fact-checker in parallel
await Promise.all([
  runResearcher(runId, prompt),   // writes to topic "research"
  runFactChecker(runId, prompt),  // writes to topic "facts"
]);
// Writer reads from both topics
const research = readTopic(runId, "research");
const facts = readTopic(runId, "facts");

Resumable pipelines

// Only run the researcher if the topic is empty
const existingResearch = readTopic(runId, "research");
if (!existingResearch) {
  await runResearcher(runId, prompt);
}
// Always run the writer (it reads from the topic)
await runWriter(runId, prompt);

Key Files

FilePurpose
app/api/topic-pipeline/route.tsOrchestrator Agent — connects to Topic Agent MCP, streams response
app/api/mcp/agents-topic/[transport]/route.tsTopic Agent MCP server — exposes 3 topic-aware specialist agents as tools
app/components/TopicPipeline.tsxUI — streams Orchestrator narration + polls DB for topic contents
agent_topics tablerunIdreadTopic / writeTopicPub/Sub patternResumable pipelines

Chapter 5: Data Pipelines for AI Agents

A data pipeline is a system that moves data from one place to another — collecting it from sources, transforming it into a usable format, and delivering it to a destination. In traditional software, this usually means an ETL (Extract, Transform, Load) process. But when AI agents enter the picture, the concept of a data pipeline changes fundamentally.

Traditional Data Pipelines vs. AI Agent Data Pipelines

DimensionTraditional (ETL) PipelineAI Agent Data Pipeline
DirectionLinear (source → warehouse)Cyclical (agent → action → feedback → agent)
TimingBatch (hourly, nightly)Real-time or near-real-time
Data typesStructured (tables, rows, columns)Unstructured (text, audio, images, embeddings)
ConsumerHuman analysts, dashboardsAI models (LLMs, embedders, classifiers)
StateStateless between runsStateful — agents maintain memory across steps
FeedbackNone — data flows one wayContinuous — action results feed back into the pipeline

The key insight: traditional pipelines move data for humans to analyze. AI agent pipelines move data for machines to act on.

The Six Stages of an AI Agent Data Pipeline

┌─────────────────────────────────────────────────────────────────┐
│                    AI Agent Data Pipeline                       │
│                                                                 │
│  1. Ingest  →  2. Process  →  3. Store  →  4. Retrieve          │
│                                                ↓                │
│                          6. Act & Loop  ←  5. Reason            │
└─────────────────────────────────────────────────────────────────┘
1
Ingest — Continuously gather data from user inputs, external APIs, document repositories, and event streams. Unlike a traditional ETL job that runs on a schedule, an AI agent pipeline ingests data continuously and on-demand.
2
Process — Parse, clean, chunk, and embed (vectorize) data into a format the agent can reason about. Chunking breaks large documents into smaller sections. Embedding converts text into mathematical vectors that represent semantic meaning.
3
Store — Route processed data into short-term memory (Redis), long-term memory (vector DB), and operational state (SQLite / agent topics). The Agent Topics pattern from Chapter 4 is a direct implementation of the operational state tier.
4
Retrieve (RAG) — Embed the current query, search the vector DB for semantically relevant knowledge, and construct a highly contextualized prompt. This is the step that separates a capable agent from a hallucinating one.
5
Reason — Send the contextualized prompt to the LLM; it decides whether to answer directly or call a tool. The output is either a direct response or a tool call.
6
Act & Loop — Execute the tool call; ingest the result back into the beginning of the pipeline; repeat until the task is complete. This cyclical, self-correcting behavior is what distinguishes an AI agent pipeline from a traditional one-way data flow.

Memory Architecture: The Three-Tier Model

Short-Term (Working)

  • • Current conversation
  • • Active task state
  • • Recent tool results

Storage: Redis, context window

Needs sub-millisecond latency — accessed dozens of times per reasoning loop.

Long-Term (Persistent)

  • • Episodic facts
  • • Semantic knowledge
  • • User preferences
  • • Past actions

Storage: Vector DB, SQL

Needs semantic search capability — find relevant past knowledge by meaning.

Operational State

  • • Task progress
  • • Pipeline state
  • • Intermediate results

Storage: SQLite, DB

Tracks what the pipeline is doing right now — exactly what Agent Topics implements.

How This Tutorial Maps to the Pipeline

Chapter 1 — Multi-Agent Systems: Stage 4 (Retrieval) + Stage 5 (Reasoning) + Stage 6 (Action) — Researcher Agent queries the SQLite database via SQL tool calls; Orchestrator coordinates specialist agents via MCP
Chapter 2 — Observability with OpenTelemetry: Cross-cutting — traces every stage of the pipeline for latency, token usage, and failure diagnosis
Chapter 3 — Evals: Quality gate between Stage 5 and Stage 6 — measures whether agent outputs meet correctness thresholds before acting
Chapter 4 — Agent Topics: Stage 3 (Storage) — persists intermediate outputs in named database slots, enabling resumable and inspectable pipelines

End-to-End Example: A Customer Support Agent

Scenario: "Where is my order #12345?"

Stage 1 — Ingest
  Email arrives → pipeline extracts text: "Where is my order #12345?"

Stage 2 — Process
  Parse email → extract order number "12345"
  Convert query to embedding: [0.31, -0.72, 0.58, ...]

Stage 3 — Store (already done in advance)
  Company shipping policies → chunked → embedded → stored in Vector DB
  Past customer interactions → stored in long-term memory

Stage 4 — Retrieve (RAG)
  Search Vector DB with query embedding
  → retrieves: "Standard shipping takes 3-5 business days"
  → queries Shopify API: Order #12345 shipped yesterday, tracking: 1Z999AA1

Stage 5 — Reason
  LLM receives contextualized prompt → drafts reply

Stage 6 — Act & Loop
  Agent sends reply email to customer
  Pipeline logs action → stored in long-term memory
  → Agent remembers helping this customer in future interactions

Further Reading

ETL vs. AI PipelinesRAGVector DatabasesMemory ArchitectureAction/Feedback Loop

Conclusion

This tutorial demonstrates that building production-grade AI agent systems requires more than just wiring up an LLM with tools. You need observability to understand what's happening, evals to measure quality, and architectural patterns like agent topics to keep pipelines scalable and maintainable.

Each chapter in this tutorial implements a piece of the AI agent data pipeline — from retrieval and reasoning (Chapter 1) to storage and operational state (Chapter 4) — giving you a complete picture of what it takes to build AI agents that work reliably in production.

Learning Outcomes

By working through this tutorial, you will have gained practical experience with:

  • • Building multi-agent systems with an Orchestrator and specialist agents via stacked MCP servers
  • • Adding distributed tracing to AI agents with OpenTelemetry and Jaeger
  • • Propagating OTel trace context across HTTP boundaries (fixing orphaned spans)
  • • Building a native eval framework with datasets, runners, scorers, and LLM-as-judge
  • • Implementing the agent topics pattern to eliminate context bloat in multi-agent pipelines
  • • Understanding the six stages of an AI agent data pipeline and how they differ from traditional ETL
  • • Designing resumable, inspectable, and decoupled agent pipelines

About the Author

Wayne Cheng is the founder and AI app developer at Audoir, LLC. Prior to founding Audoir, he worked as a hardware design engineer for Silicon Valley startups and an audio engineer for creative organizations. He holds an MSEE from UC Davis and a Music Technology degree from Foothill College.

Further Exploration

Explore the complete tutorial repository and experiment with extending the examples. Consider adding new specialist agents, connecting to external APIs, or implementing human-in-the-loop approval steps to deepen your understanding of advanced AI agent architectures.

New to AI agents? Start with the AI Agent Tutorial first, which covers building agents from scratch — from a simple streaming chatbot to a LangGraph-powered agent with MCP tools.

For more AI-powered development tools and tutorials, visit Audoir .