AI Agent Orchestration: LangGraph, Temporal & Custom Workflows (2026 Guide)

Mar 27, 2026 • 14 min read • By Paxrel

A single AI agent with a few tools can handle simple tasks. But real-world workflows — process a customer refund, generate a research report, onboard a new user — involve multiple steps, conditional logic, parallel execution, error recovery, and human approvals.

That's orchestration: the layer that coordinates what your agent does, in what order, and what happens when things go wrong. Without it, your agent is a talented freelancer with no project management. With it, your agent becomes a reliable workflow engine.

This guide covers three approaches to agent orchestration: LangGraph (graph-based), Temporal (durable workflows), and custom orchestrators — with trade-offs and code for each.

Why Orchestration Matters

Consider a simple task: "Process a refund for order #12345."

Without orchestration, your agent calls tools in whatever order the LLM decides. Sometimes it works. Sometimes it processes the refund before checking eligibility. Sometimes it retries a failed API call 47 times. Sometimes it forgets to send the confirmation email.

With orchestration:

verify_identity → check_eligibility → [approve if > $100] → process_refund → send_confirmation
      │                    │                    │                   │
      └── retry 2x         └── if ineligible    └── timeout 5min   └── retry 3x
          then escalate        → explain why        → escalate         then log error

Every step has defined behavior, error handling, and transitions. The workflow is testable, observable, and predictable.

Approach 1: LangGraph — Graph-Based Orchestration

LangGraph (from LangChain) models your agent as a state machine graph. Nodes are steps, edges are transitions, and state flows through the graph.

Core Concepts

Example: Customer Support Agent

from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class SupportState(TypedDict):
    messages: list          # Conversation history
    intent: str             # Classified intent
    customer_id: str | None # Authenticated customer
    order: dict | None      # Looked-up order
    resolution: str | None  # How we resolved the issue
    should_escalate: bool

# Node functions
async def classify_intent(state: SupportState) -> SupportState:
    intent = await llm.classify(state["messages"][-1])
    return {"intent": intent}

async def authenticate(state: SupportState) -> SupportState:
    customer = await lookup_customer(state["messages"])
    return {"customer_id": customer["id"] if customer else None}

async def lookup_order(state: SupportState) -> SupportState:
    order = await get_order(state["customer_id"], state["messages"])
    return {"order": order}

async def generate_response(state: SupportState) -> SupportState:
    response = await llm.respond(state)
    return {"messages": [response], "resolution": "resolved"}

async def escalate(state: SupportState) -> SupportState:
    ticket = await create_support_ticket(state)
    return {"resolution": f"Escalated: {ticket['id']}"}

# Conditional edges
def route_after_classify(state: SupportState) -> Literal["authenticate", "respond", "escalate"]:
    if state["intent"] in ["order_status", "refund"]:
        return "authenticate"
    if state["intent"] == "general_question":
        return "respond"
    return "escalate"

def route_after_auth(state: SupportState) -> Literal["lookup_order", "escalate"]:
    if state["customer_id"]:
        return "lookup_order"
    return "escalate"

# Build the graph
graph = StateGraph(SupportState)

graph.add_node("classify", classify_intent)
graph.add_node("authenticate", authenticate)
graph.add_node("lookup_order", lookup_order)
graph.add_node("respond", generate_response)
graph.add_node("escalate", escalate)

graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_after_classify)
graph.add_conditional_edges("authenticate", route_after_auth)
graph.add_edge("lookup_order", "respond")
graph.add_edge("respond", END)
graph.add_edge("escalate", END)

# Compile with checkpointing
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
app = graph.compile(checkpointer=checkpointer)

# Run
result = await app.ainvoke(
    {"messages": ["I want a refund for order #12345"], "should_escalate": False},
    config={"configurable": {"thread_id": "conv-123"}}
)
Tip: LangGraph's checkpointing is its killer feature. Every state transition is saved, so you can resume interrupted workflows, replay for debugging, and implement human-in-the-loop by pausing at any node.

Human-in-the-Loop with LangGraph

# Add an interrupt point before processing refunds
graph.add_node("request_approval", request_human_approval)

# In the approval node:
async def request_human_approval(state: SupportState) -> SupportState:
    if state["order"]["amount"] > 10000:  # > $100
        # This pauses the graph until a human responds
        raise NodeInterrupt("Refund > $100 needs approval")
    return state

# Resume after human approves:
await app.ainvoke(
    None,  # No new input, just resume
    config={"configurable": {"thread_id": "conv-123"}}
)

LangGraph Trade-offs

ProsCons
Visual graph structureLangChain ecosystem lock-in
Built-in checkpointingLearning curve for graph concepts
Human-in-the-loop nativeDebugging complex graphs is hard
Streaming supportOverhead for simple workflows
LangGraph Studio for visualizationState management gets complex

Approach 2: Temporal — Durable Workflow Orchestration

Temporal is an enterprise workflow engine originally designed for microservices. It's overkill for simple agents but perfect for long-running, mission-critical workflows that must never lose state.

Why Temporal for Agents?

Example: Research Report Agent

from temporalio import workflow, activity
from datetime import timedelta

@activity.defn
async def search_web(query: str) -> list[str]:
    """Search the web and return relevant URLs."""
    return await web_search_tool.search(query, top_k=10)

@activity.defn
async def scrape_page(url: str) -> str:
    """Scrape and extract content from a URL."""
    return await scraper.extract(url)

@activity.defn
async def analyze_content(content: str, question: str) -> dict:
    """Use LLM to analyze scraped content."""
    return await llm.analyze(content, question)

@activity.defn
async def write_report(findings: list[dict], topic: str) -> str:
    """Generate final research report."""
    return await llm.generate_report(findings, topic)

@workflow.defn
class ResearchWorkflow:
    @workflow.run
    async def run(self, topic: str) -> str:
        # Step 1: Search for sources (with retry)
        urls = await workflow.execute_activity(
            search_web,
            topic,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        # Step 2: Scrape pages in parallel
        scrape_tasks = [
            workflow.execute_activity(
                scrape_page,
                url,
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=RetryPolicy(maximum_attempts=2)
            )
            for url in urls[:5]  # Top 5 results
        ]
        contents = await asyncio.gather(*scrape_tasks, return_exceptions=True)
        contents = [c for c in contents if isinstance(c, str)]

        # Step 3: Analyze each source
        findings = []
        for content in contents:
            finding = await workflow.execute_activity(
                analyze_content,
                args=[content, topic],
                start_to_close_timeout=timedelta(seconds=120),
            )
            findings.append(finding)

        # Step 4: Write final report
        report = await workflow.execute_activity(
            write_report,
            args=[findings, topic],
            start_to_close_timeout=timedelta(seconds=180),
        )

        return report

Temporal Trade-offs

ProsCons
Battle-tested durability (used by Uber, Netflix)Heavy infrastructure (Temporal server + DB)
Survives crashes, deploys, outagesSteep learning curve
Built-in retry, timeout, versioningOverkill for simple agents
Great visibility UIOperational complexity
Language-agnostic (Python, Go, Java, TS)Adds 50-100ms latency per activity

Approach 3: Custom Orchestrator

Sometimes you don't need a framework. A custom orchestrator gives you full control with minimal dependencies.

import asyncio
from dataclasses import dataclass, field
from enum import Enum

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class Step:
    name: str
    fn: callable
    depends_on: list[str] = field(default_factory=list)
    retry_count: int = 2
    timeout_seconds: int = 60
    condition: callable = None  # Skip if returns False
    status: StepStatus = StepStatus.PENDING
    result: any = None
    error: str = None

class Orchestrator:
    def __init__(self):
        self.steps: dict[str, Step] = {}
        self.context: dict = {}

    def add_step(self, step: Step):
        self.steps[step.name] = step

    async def run(self) -> dict:
        while self._has_pending_steps():
            # Find steps ready to run (dependencies met)
            ready = [s for s in self.steps.values()
                    if s.status == StepStatus.PENDING
                    and self._dependencies_met(s)]

            if not ready:
                break  # Deadlock or all done

            # Run ready steps in parallel
            tasks = [self._execute_step(s) for s in ready]
            await asyncio.gather(*tasks)

        return self.context

    async def _execute_step(self, step: Step):
        # Check condition
        if step.condition and not step.condition(self.context):
            step.status = StepStatus.SKIPPED
            return

        step.status = StepStatus.RUNNING

        for attempt in range(step.retry_count + 1):
            try:
                result = await asyncio.wait_for(
                    step.fn(self.context),
                    timeout=step.timeout_seconds
                )
                step.result = result
                step.status = StepStatus.COMPLETED
                self.context[step.name] = result
                return
            except asyncio.TimeoutError:
                step.error = f"Timeout after {step.timeout_seconds}s"
            except Exception as e:
                step.error = str(e)

            if attempt < step.retry_count:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff

        step.status = StepStatus.FAILED

    def _dependencies_met(self, step: Step) -> bool:
        return all(
            self.steps[dep].status == StepStatus.COMPLETED
            for dep in step.depends_on
        )

    def _has_pending_steps(self) -> bool:
        return any(s.status == StepStatus.PENDING for s in self.steps.values())

# Usage
orch = Orchestrator()
orch.add_step(Step("classify", classify_intent))
orch.add_step(Step("authenticate", authenticate, depends_on=["classify"],
                    condition=lambda ctx: ctx["classify"]["requires_auth"]))
orch.add_step(Step("lookup", lookup_order, depends_on=["authenticate"]))
orch.add_step(Step("respond", generate_response, depends_on=["lookup"]))

result = await orch.run()

Custom Orchestrator Trade-offs

ProsCons
Full control, zero dependenciesYou build everything yourself
Minimal overheadNo built-in persistence/recovery
Easy to understand and debugParallel execution logic is tricky
Fits any patternGrows complex over time

Orchestration Patterns

Pattern 1: Sequential Pipeline

Steps run one after another. Output of step N is input to step N+1.

Use when: Each step depends on the previous result. Example: classify → retrieve → generate → validate.

Pattern 2: Fan-Out / Fan-In

One step spawns multiple parallel tasks, then a final step aggregates results.

Use when: Independent subtasks can run simultaneously. Example: search 5 sources in parallel → merge findings.

# Fan-out / Fan-in with LangGraph
from langgraph.graph import StateGraph

async def fan_out(state):
    """Create parallel research tasks."""
    tasks = [{"query": q} for q in state["queries"]]
    return {"parallel_tasks": tasks}

async def research(state):
    """Run research for one query."""
    return {"finding": await search_and_analyze(state["query"])}

async def fan_in(state):
    """Merge all findings into a report."""
    return {"report": await synthesize(state["findings"])}

Pattern 3: Conditional Branching

Different paths based on runtime conditions.

Use when: The workflow varies by input type. Example: refund requests go through approval, general questions go straight to response.

Pattern 4: Loop with Exit Condition

Repeat a sequence until a condition is met.

Use when: Iterative refinement is needed. Example: generate → evaluate → if score < threshold, regenerate.

# Loop pattern in LangGraph
def should_continue(state) -> Literal["regenerate", "finalize"]:
    if state["quality_score"] >= 0.8:
        return "finalize"
    if state["attempts"] >= 3:
        return "finalize"  # Give up after 3 tries
    return "regenerate"

graph.add_conditional_edges("evaluate", should_continue)

Pattern 5: Saga (Compensating Transactions)

When a later step fails, undo earlier steps.

Use when: Multi-step operations that should be atomic. Example: reserve inventory → charge card → if shipping fails → refund card → release inventory.

Choosing Your Orchestration Approach

ScenarioBest ApproachWhy
Simple agent (3-5 steps)Custom or just ReActFrameworks add unnecessary complexity
Complex but short-lived (< 5 min)LangGraphGood graph model, checkpointing, HITL
Long-running (hours/days)TemporalDurable execution survives crashes
Human approval workflowsLangGraph or TemporalBoth have native interrupt/signal support
Mission-critical / financialTemporalBattle-tested, audit trail, exactly-once
Maximum flexibilityCustomNo framework constraints
Team already uses LangChainLangGraphEcosystem integration

Common Orchestration Mistakes

1. Over-Orchestrating Simple Agents

If your agent has 3 steps and no branching, you don't need LangGraph or Temporal. A simple while loop with tool calling is fine. Add orchestration when complexity justifies it.

2. No Error Boundaries

A failure in step 4 shouldn't crash the entire workflow. Each step needs its own error handling: retry policy, fallback behavior, and graceful degradation.

3. Missing Timeouts

LLM calls can hang. Tool calls can hang. Without timeouts, your workflow hangs forever. Set timeouts on every async operation: 30s for LLM calls, 60s for tool calls, 5 minutes for the full workflow.

4. No Observability

If you can't see which step is running, which failed, and why, you can't debug production issues. Log every step transition with timing, input/output, and status.

5. Tight Coupling Between Steps

Steps should communicate through state, not direct calls. This makes them independently testable, replaceable, and reorderable.

Designing agent orchestration? AI Agents Weekly covers workflows, frameworks, and production deployment patterns 3x/week. Join free.

Conclusion

Orchestration is the boring infrastructure that makes agents reliable. It's the difference between a demo that works 80% of the time and a production system that handles edge cases, recovers from failures, and scales.

Start simple: if your agent has fewer than 5 steps, a custom orchestrator or plain ReAct loop is enough. Move to LangGraph when you need conditional branching, human-in-the-loop, or checkpointing. Graduate to Temporal when workflows run for hours, involve financial transactions, or must survive infrastructure failures.

The best orchestration is the one you don't notice — it just makes your agent work, every time.