A single AI agent with a few tools can handle simple tasks. But real-world workflows — process a customer refund, generate a research report, onboard a new user — involve multiple steps, conditional logic, parallel execution, error recovery, and human approvals.
That's orchestration: the layer that coordinates what your agent does, in what order, and what happens when things go wrong. Without it, your agent is a talented freelancer with no project management. With it, your agent becomes a reliable workflow engine.
This guide covers three approaches to agent orchestration: LangGraph (graph-based), Temporal (durable workflows), and custom orchestrators — with trade-offs and code for each.
Why Orchestration Matters
Consider a simple task: "Process a refund for order #12345."
Without orchestration, your agent calls tools in whatever order the LLM decides. Sometimes it works. Sometimes it processes the refund before checking eligibility. Sometimes it retries a failed API call 47 times. Sometimes it forgets to send the confirmation email.
With orchestration:
verify_identity → check_eligibility → [approve if > $100] → process_refund → send_confirmation
│ │ │ │
└── retry 2x └── if ineligible └── timeout 5min └── retry 3x
then escalate → explain why → escalate then log error
Every step has defined behavior, error handling, and transitions. The workflow is testable, observable, and predictable.
Approach 1: LangGraph — Graph-Based Orchestration
LangGraph (from LangChain) models your agent as a state machine graph. Nodes are steps, edges are transitions, and state flows through the graph.
Core Concepts
- State — A typed dictionary that accumulates data through the graph
- Nodes — Functions that read state, do work, and return updated state
- Edges — Transitions between nodes (conditional or unconditional)
- Checkpointing — Save state at each step for recovery and debugging
Example: Customer Support Agent
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class SupportState(TypedDict):
messages: list # Conversation history
intent: str # Classified intent
customer_id: str | None # Authenticated customer
order: dict | None # Looked-up order
resolution: str | None # How we resolved the issue
should_escalate: bool
# Node functions
async def classify_intent(state: SupportState) -> SupportState:
intent = await llm.classify(state["messages"][-1])
return {"intent": intent}
async def authenticate(state: SupportState) -> SupportState:
customer = await lookup_customer(state["messages"])
return {"customer_id": customer["id"] if customer else None}
async def lookup_order(state: SupportState) -> SupportState:
order = await get_order(state["customer_id"], state["messages"])
return {"order": order}
async def generate_response(state: SupportState) -> SupportState:
response = await llm.respond(state)
return {"messages": [response], "resolution": "resolved"}
async def escalate(state: SupportState) -> SupportState:
ticket = await create_support_ticket(state)
return {"resolution": f"Escalated: {ticket['id']}"}
# Conditional edges
def route_after_classify(state: SupportState) -> Literal["authenticate", "respond", "escalate"]:
if state["intent"] in ["order_status", "refund"]:
return "authenticate"
if state["intent"] == "general_question":
return "respond"
return "escalate"
def route_after_auth(state: SupportState) -> Literal["lookup_order", "escalate"]:
if state["customer_id"]:
return "lookup_order"
return "escalate"
# Build the graph
graph = StateGraph(SupportState)
graph.add_node("classify", classify_intent)
graph.add_node("authenticate", authenticate)
graph.add_node("lookup_order", lookup_order)
graph.add_node("respond", generate_response)
graph.add_node("escalate", escalate)
graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classify)
graph.add_conditional_edges("authenticate", route_after_auth)
graph.add_edge("lookup_order", "respond")
graph.add_edge("respond", END)
graph.add_edge("escalate", END)
# Compile with checkpointing
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = graph.compile(checkpointer=checkpointer)
# Run
result = await app.ainvoke(
{"messages": ["I want a refund for order #12345"], "should_escalate": False},
config={"configurable": {"thread_id": "conv-123"}}
)
Human-in-the-Loop with LangGraph
# Add an interrupt point before processing refunds
graph.add_node("request_approval", request_human_approval)
# In the approval node:
async def request_human_approval(state: SupportState) -> SupportState:
if state["order"]["amount"] > 10000: # > $100
# This pauses the graph until a human responds
raise NodeInterrupt("Refund > $100 needs approval")
return state
# Resume after human approves:
await app.ainvoke(
None, # No new input, just resume
config={"configurable": {"thread_id": "conv-123"}}
)
LangGraph Trade-offs
| Pros | Cons |
|---|---|
| Visual graph structure | LangChain ecosystem lock-in |
| Built-in checkpointing | Learning curve for graph concepts |
| Human-in-the-loop native | Debugging complex graphs is hard |
| Streaming support | Overhead for simple workflows |
| LangGraph Studio for visualization | State management gets complex |
Approach 2: Temporal — Durable Workflow Orchestration
Temporal is an enterprise workflow engine originally designed for microservices. It's overkill for simple agents but perfect for long-running, mission-critical workflows that must never lose state.
Why Temporal for Agents?
- Durable execution — If your server crashes mid-workflow, Temporal resumes exactly where it left off
- Built-in retries — Configure retry policies per activity (exponential backoff, max attempts)
- Timeouts — Activity timeouts, workflow timeouts, heartbeat timeouts
- Versioning — Deploy new workflow versions without breaking running workflows
- Visibility — Built-in UI shows every workflow, its state, and history
Example: Research Report Agent
from temporalio import workflow, activity
from datetime import timedelta
@activity.defn
async def search_web(query: str) -> list[str]:
"""Search the web and return relevant URLs."""
return await web_search_tool.search(query, top_k=10)
@activity.defn
async def scrape_page(url: str) -> str:
"""Scrape and extract content from a URL."""
return await scraper.extract(url)
@activity.defn
async def analyze_content(content: str, question: str) -> dict:
"""Use LLM to analyze scraped content."""
return await llm.analyze(content, question)
@activity.defn
async def write_report(findings: list[dict], topic: str) -> str:
"""Generate final research report."""
return await llm.generate_report(findings, topic)
@workflow.defn
class ResearchWorkflow:
@workflow.run
async def run(self, topic: str) -> str:
# Step 1: Search for sources (with retry)
urls = await workflow.execute_activity(
search_web,
topic,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3)
)
# Step 2: Scrape pages in parallel
scrape_tasks = [
workflow.execute_activity(
scrape_page,
url,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=2)
)
for url in urls[:5] # Top 5 results
]
contents = await asyncio.gather(*scrape_tasks, return_exceptions=True)
contents = [c for c in contents if isinstance(c, str)]
# Step 3: Analyze each source
findings = []
for content in contents:
finding = await workflow.execute_activity(
analyze_content,
args=[content, topic],
start_to_close_timeout=timedelta(seconds=120),
)
findings.append(finding)
# Step 4: Write final report
report = await workflow.execute_activity(
write_report,
args=[findings, topic],
start_to_close_timeout=timedelta(seconds=180),
)
return report
Temporal Trade-offs
| Pros | Cons |
|---|---|
| Battle-tested durability (used by Uber, Netflix) | Heavy infrastructure (Temporal server + DB) |
| Survives crashes, deploys, outages | Steep learning curve |
| Built-in retry, timeout, versioning | Overkill for simple agents |
| Great visibility UI | Operational complexity |
| Language-agnostic (Python, Go, Java, TS) | Adds 50-100ms latency per activity |
Approach 3: Custom Orchestrator
Sometimes you don't need a framework. A custom orchestrator gives you full control with minimal dependencies.
import asyncio
from dataclasses import dataclass, field
from enum import Enum
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Step:
name: str
fn: callable
depends_on: list[str] = field(default_factory=list)
retry_count: int = 2
timeout_seconds: int = 60
condition: callable = None # Skip if returns False
status: StepStatus = StepStatus.PENDING
result: any = None
error: str = None
class Orchestrator:
def __init__(self):
self.steps: dict[str, Step] = {}
self.context: dict = {}
def add_step(self, step: Step):
self.steps[step.name] = step
async def run(self) -> dict:
while self._has_pending_steps():
# Find steps ready to run (dependencies met)
ready = [s for s in self.steps.values()
if s.status == StepStatus.PENDING
and self._dependencies_met(s)]
if not ready:
break # Deadlock or all done
# Run ready steps in parallel
tasks = [self._execute_step(s) for s in ready]
await asyncio.gather(*tasks)
return self.context
async def _execute_step(self, step: Step):
# Check condition
if step.condition and not step.condition(self.context):
step.status = StepStatus.SKIPPED
return
step.status = StepStatus.RUNNING
for attempt in range(step.retry_count + 1):
try:
result = await asyncio.wait_for(
step.fn(self.context),
timeout=step.timeout_seconds
)
step.result = result
step.status = StepStatus.COMPLETED
self.context[step.name] = result
return
except asyncio.TimeoutError:
step.error = f"Timeout after {step.timeout_seconds}s"
except Exception as e:
step.error = str(e)
if attempt < step.retry_count:
await asyncio.sleep(2 ** attempt) # Exponential backoff
step.status = StepStatus.FAILED
def _dependencies_met(self, step: Step) -> bool:
return all(
self.steps[dep].status == StepStatus.COMPLETED
for dep in step.depends_on
)
def _has_pending_steps(self) -> bool:
return any(s.status == StepStatus.PENDING for s in self.steps.values())
# Usage
orch = Orchestrator()
orch.add_step(Step("classify", classify_intent))
orch.add_step(Step("authenticate", authenticate, depends_on=["classify"],
condition=lambda ctx: ctx["classify"]["requires_auth"]))
orch.add_step(Step("lookup", lookup_order, depends_on=["authenticate"]))
orch.add_step(Step("respond", generate_response, depends_on=["lookup"]))
result = await orch.run()
Custom Orchestrator Trade-offs
| Pros | Cons |
|---|---|
| Full control, zero dependencies | You build everything yourself |
| Minimal overhead | No built-in persistence/recovery |
| Easy to understand and debug | Parallel execution logic is tricky |
| Fits any pattern | Grows complex over time |
Orchestration Patterns
Pattern 1: Sequential Pipeline
Steps run one after another. Output of step N is input to step N+1.
Use when: Each step depends on the previous result. Example: classify → retrieve → generate → validate.
Pattern 2: Fan-Out / Fan-In
One step spawns multiple parallel tasks, then a final step aggregates results.
Use when: Independent subtasks can run simultaneously. Example: search 5 sources in parallel → merge findings.
# Fan-out / Fan-in with LangGraph
from langgraph.graph import StateGraph
async def fan_out(state):
"""Create parallel research tasks."""
tasks = [{"query": q} for q in state["queries"]]
return {"parallel_tasks": tasks}
async def research(state):
"""Run research for one query."""
return {"finding": await search_and_analyze(state["query"])}
async def fan_in(state):
"""Merge all findings into a report."""
return {"report": await synthesize(state["findings"])}
Pattern 3: Conditional Branching
Different paths based on runtime conditions.
Use when: The workflow varies by input type. Example: refund requests go through approval, general questions go straight to response.
Pattern 4: Loop with Exit Condition
Repeat a sequence until a condition is met.
Use when: Iterative refinement is needed. Example: generate → evaluate → if score < threshold, regenerate.
# Loop pattern in LangGraph
def should_continue(state) -> Literal["regenerate", "finalize"]:
if state["quality_score"] >= 0.8:
return "finalize"
if state["attempts"] >= 3:
return "finalize" # Give up after 3 tries
return "regenerate"
graph.add_conditional_edges("evaluate", should_continue)
Pattern 5: Saga (Compensating Transactions)
When a later step fails, undo earlier steps.
Use when: Multi-step operations that should be atomic. Example: reserve inventory → charge card → if shipping fails → refund card → release inventory.
Choosing Your Orchestration Approach
| Scenario | Best Approach | Why |
|---|---|---|
| Simple agent (3-5 steps) | Custom or just ReAct | Frameworks add unnecessary complexity |
| Complex but short-lived (< 5 min) | LangGraph | Good graph model, checkpointing, HITL |
| Long-running (hours/days) | Temporal | Durable execution survives crashes |
| Human approval workflows | LangGraph or Temporal | Both have native interrupt/signal support |
| Mission-critical / financial | Temporal | Battle-tested, audit trail, exactly-once |
| Maximum flexibility | Custom | No framework constraints |
| Team already uses LangChain | LangGraph | Ecosystem integration |
Common Orchestration Mistakes
1. Over-Orchestrating Simple Agents
If your agent has 3 steps and no branching, you don't need LangGraph or Temporal. A simple while loop with tool calling is fine. Add orchestration when complexity justifies it.
2. No Error Boundaries
A failure in step 4 shouldn't crash the entire workflow. Each step needs its own error handling: retry policy, fallback behavior, and graceful degradation.
3. Missing Timeouts
LLM calls can hang. Tool calls can hang. Without timeouts, your workflow hangs forever. Set timeouts on every async operation: 30s for LLM calls, 60s for tool calls, 5 minutes for the full workflow.
4. No Observability
If you can't see which step is running, which failed, and why, you can't debug production issues. Log every step transition with timing, input/output, and status.
5. Tight Coupling Between Steps
Steps should communicate through state, not direct calls. This makes them independently testable, replaceable, and reorderable.
Designing agent orchestration? AI Agents Weekly covers workflows, frameworks, and production deployment patterns 3x/week. Join free.
Conclusion
Orchestration is the boring infrastructure that makes agents reliable. It's the difference between a demo that works 80% of the time and a production system that handles edge cases, recovers from failures, and scales.
Start simple: if your agent has fewer than 5 steps, a custom orchestrator or plain ReAct loop is enough. Move to LangGraph when you need conditional branching, human-in-the-loop, or checkpointing. Graduate to Temporal when workflows run for hours, involve financial transactions, or must survive infrastructure failures.
The best orchestration is the one you don't notice — it just makes your agent work, every time.