You ship an AI agent. It runs. At first, everything looks fine. Then, three weeks later, you check your API bill and it's 8x what you budgeted. Or a user reports a problem and you realize the agent has been silently hallucinating details for days. Or the pipeline is stalling somewhere and nobody noticed because there was no alert.
This is the unsexy side of AI agents that almost nobody talks about: monitoring. Building the agent is the exciting part. Keeping it working correctly, cheaply, and reliably once it's live is where most teams fail.
This guide covers the 4 pillars of AI agent monitoring, practical Python setups for each, a comparison of the main observability tools, and the exact metrics and alert thresholds we use at Paxrel for our own autonomous agents.
Traditional software fails loudly. An exception is thrown, a 500 error is returned, a process crashes. You know something is broken.
AI agents fail quietly. The LLM returns a plausible-sounding response that happens to be wrong. The agent completes its task but with lower quality than yesterday because a model was silently updated. A tool call returns an error that the agent cheerfully works around by hallucinating the result instead. No exception thrown. No alert fired. Just wrong output, shipped.
Three categories of failure are unique to AI agents:
Monitoring is what separates an agent you can trust to run overnight from one you babysit. The goal is simple: know what your agent is doing, how much it costs, whether it's working correctly, and get alerted before problems compound.
For agents running autonomously 24/7 (like those described in our AI workflow automation guide), monitoring is not optional — it's the difference between a useful system and a liability.
Token costs are the most overlooked operational concern for AI agent builders. Every prompt, every completion, every tool call that triggers an LLM call has a price. Small inefficiencies compound fast.
What to track: Input tokens per run, output tokens per run, cost per run, cost per day/week/month, cost trend over time, most expensive operations, cost per task type.
Red flags: Any run that costs 3x the median. Steadily rising cost over the same workload. Unexplained spikes after prompt or model changes.
Errors in AI agents come in two types: hard failures (API errors, timeouts, exceptions) and soft failures (the agent completes but produces wrong output, skips steps, or handles an edge case incorrectly). Hard failures are easy to catch. Soft failures require intentional instrumentation.
What to track: API error rates by provider, retry counts, tool call failure rates, steps completed vs. steps expected, fallback activations, timeout frequency.
Red flags: Error rate above 5% on any single tool. Retry rate climbing without a corresponding root cause. Steps skipped in a pipeline that should always run them all.
Performance for an AI agent means both speed and throughput. How long does a run take? How many items does it process per unit time? Are some operations unexpectedly slow? Latency problems often surface before quality problems, so they're useful early warning signals.
What to track: End-to-end run time, per-step latency, LLM API latency (p50, p95, p99), items processed per run, queue depth (if applicable), time-to-first-token.
Red flags: Run time increasing over stable input size. LLM latency p95 exceeding 2x the p50 (indicates inconsistent model load). Any single step consistently taking more than 30% of total run time.
This is the hardest pillar and the most important. A fast, cheap, error-free agent that produces bad output is worse than useless — it's actively misleading. Quality monitoring requires defining what "good" looks like and checking against it programmatically.
What to track: Relevance scores on scored outputs, format compliance (did the agent return the expected structure?), factual spot-checks on a sample of outputs, user feedback signals (clicks, opens, complaints), output length distribution (sudden changes often indicate quality shifts).
Red flags: Average quality score dropping more than 10% week-over-week. Sudden change in output length distribution. Format compliance below 95%.
The simplest cost tracking setup uses a decorator pattern that wraps every LLM call and logs token usage to a local SQLite database. No external service required.
import sqlite3
import time
import functools
from datetime import datetime
# Token costs per million tokens (update as pricing changes)
COSTS = {
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00},
"deepseek-v3": {"input": 0.27, "output": 1.10},
"gpt-4o": {"input": 5.00, "output": 15.00},
}
def init_db(db_path="agent_costs.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS llm_calls (
id INTEGER PRIMARY KEY,
ts TEXT,
model TEXT,
operation TEXT,
input_tokens INTEGER,
output_tokens INTEGER,
cost_usd REAL,
latency_ms INTEGER
)
""")
conn.commit()
return conn
def track_cost(operation_name, model, db_path="agent_costs.db"):
"""Decorator that logs token usage and cost for any LLM call."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
conn = init_db(db_path)
start = time.time()
result = func(*args, **kwargs)
latency_ms = int((time.time() - start) * 1000)
# Extract token usage from response object
usage = getattr(result, "usage", None)
if usage:
input_tok = usage.input_tokens
output_tok = usage.output_tokens
pricing = COSTS.get(model, {"input": 0, "output": 0})
cost = (input_tok * pricing["input"] +
output_tok * pricing["output"]) / 1_000_000
conn.execute("""
INSERT INTO llm_calls
(ts, model, operation, input_tokens, output_tokens, cost_usd, latency_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (datetime.utcnow().isoformat(), model, operation_name,
input_tok, output_tok, cost, latency_ms))
conn.commit()
conn.close()
return result
return wrapper
return decorator
# Usage
@track_cost("score_articles", "deepseek-v3")
def score_articles_batch(articles):
# ... your LLM call here
pass
Once you have this data, you can query it to get a daily cost summary:
def daily_cost_report(db_path="agent_costs.db"):
conn = sqlite3.connect(db_path)
rows = conn.execute("""
SELECT
DATE(ts) as day,
operation,
COUNT(*) as calls,
SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cost_usd) as total_cost_usd,
AVG(latency_ms) as avg_latency_ms
FROM llm_calls
WHERE ts >= DATE('now', '-7 days')
GROUP BY day, operation
ORDER BY day DESC, total_cost_usd DESC
""").fetchall()
conn.close()
for row in rows:
print(f"{row[0]} | {row[1]:30s} | "
f"{row[2]:4d} calls | ${row[5]:.4f} | {int(row[6])}ms avg")
return rows
Structured error logging with run-level context is what lets you diagnose failures after the fact. The key is capturing enough context — what the agent was doing, what input it had, what the error was — without logging sensitive data.
import json
import traceback
from contextlib import contextmanager
class AgentRunLogger:
def __init__(self, run_id: str, log_path="agent_runs.jsonl"):
self.run_id = run_id
self.log_path = log_path
self.events = []
self.start_time = time.time()
def log_step(self, step: str, status: str, details: dict = None):
event = {
"run_id": self.run_id,
"ts": datetime.utcnow().isoformat(),
"step": step,
"status": status, # "ok" | "error" | "skipped" | "fallback"
"elapsed_s": round(time.time() - self.start_time, 2),
**(details or {})
}
self.events.append(event)
def log_error(self, step: str, error: Exception, context: dict = None):
self.log_step(step, "error", {
"error_type": type(error).__name__,
"error_msg": str(error)[:500], # truncate, never log full secrets
**(context or {})
})
def flush(self):
with open(self.log_path, "a") as f:
for event in self.events:
f.write(json.dumps(event) + "\n")
self.events = []
# Usage in a pipeline step
def scrape_sources(logger: AgentRunLogger, sources: list):
results = []
for source in sources:
try:
data = fetch_rss(source)
logger.log_step("scrape", "ok", {"source": source, "items": len(data)})
results.extend(data)
except Exception as e:
logger.log_error("scrape", e, {"source": source})
# Don't re-raise — continue with other sources
return results
A lightweight timing wrapper gives you per-step latency without any external dependencies. Combine it with the cost tracker above for a full picture of each run.
import time
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class RunMetrics:
run_id: str
start_time: float = field(default_factory=time.time)
steps: Dict[str, float] = field(default_factory=dict)
items_processed: int = 0
def time_step(self, step_name: str):
"""Context manager for timing individual pipeline steps."""
return StepTimer(self, step_name)
def summary(self) -> dict:
total = time.time() - self.start_time
return {
"run_id": self.run_id,
"total_s": round(total, 2),
"steps": {k: round(v, 2) for k, v in self.steps.items()},
"items": self.items_processed,
"throughput_per_min": round(self.items_processed / (total / 60), 1)
if total > 0 else 0
}
class StepTimer:
def __init__(self, metrics: RunMetrics, step: str):
self.metrics = metrics
self.step = step
def __enter__(self):
self.t0 = time.time()
return self
def __exit__(self, *args):
self.metrics.steps[self.step] = time.time() - self.t0
# Usage
metrics = RunMetrics(run_id="newsletter-2026-03-25")
with metrics.time_step("scrape"):
articles = scrape_all_sources()
with metrics.time_step("score"):
scored = score_relevance(articles)
metrics.items_processed = len(scored)
print(metrics.summary())
# {"run_id": "newsletter-2026-03-25", "total_s": 47.3, "steps":
# {"scrape": 12.1, "score": 33.8}, "items": 88, "throughput_per_min": 111.6}
Quality checks should be automated where possible. For a newsletter pipeline, this means verifying minimum length, required sections, no placeholder text, and a minimum relevance score threshold before publication.
def quality_gate(content: dict, thresholds: dict) -> tuple[bool, list]:
"""
Run quality checks before publishing or forwarding output.
Returns (passed: bool, failures: list[str]).
"""
failures = []
# Length check
word_count = len(content.get("body", "").split())
if word_count < thresholds.get("min_words", 300):
failures.append(f"Too short: {word_count} words (min {thresholds['min_words']})")
# Required sections present
for section in thresholds.get("required_sections", []):
if section.lower() not in content.get("body", "").lower():
failures.append(f"Missing required section: {section}")
# No placeholder text leaked through
placeholders = ["[PLACEHOLDER]", "TODO:", "INSERT HERE", "{{", "}}"]
for p in placeholders:
if p in content.get("body", ""):
failures.append(f"Placeholder text found: {p}")
# Relevance score above threshold
score = content.get("relevance_score", 0)
if score < thresholds.get("min_score", 0.6):
failures.append(f"Low relevance score: {score:.2f} (min {thresholds['min_score']})")
return len(failures) == 0, failures
# Usage
passed, issues = quality_gate(newsletter_draft, {
"min_words": 400,
"required_sections": ["summary", "why it matters"],
"min_score": 0.65
})
if not passed:
logger.log_step("quality_gate", "error", {"issues": issues})
send_alert(f"Quality gate failed: {issues}")
Choosing a monitoring tool depends on your scale, budget, and how much infrastructure you want to manage. Here's an honest comparison of the main options in 2026:
| Tool | Type | Free Tier | Self-hostable | Best For | Weakness |
|---|---|---|---|---|---|
| Langfuse | LLM observability | Yes (generous) | Yes (Docker) | Full trace visibility, evals, prompt versioning | Setup takes ~1h; overkill for simple scripts |
| Helicone | LLM proxy + logging | Yes (10k req/mo) | Yes (open source) | Zero-code setup; just change the base URL | Limited eval/quality features; data leaves your infra |
| LangSmith | LLM observability | Yes (limited) | No | Deep LangChain integration; eval datasets | LangChain-first; proprietary; costs scale fast |
| Weights & Biases | ML experiment tracking | Yes (personal) | Enterprise only | Teams doing evals and model comparison at scale | Heavy; designed for ML training, not production agents |
| Custom logging (SQLite/JSONL) | DIY | Free | Yes (it's yours) | Full control; no vendor dependency; $0 | No UI; you build and maintain everything |
| Grafana + Prometheus | Infrastructure metrics | Yes (self-hosted) | Yes | Production dashboards; alert routing | Not LLM-specific; requires significant setup |
Our recommendation: Start with custom JSONL logging (zero dependencies, you own the data, fast to implement). Add Langfuse when you need traces, a dashboard, and eval comparison. Skip LangSmith unless you're already deep in the LangChain ecosystem.
We run an autonomous newsletter pipeline that processes news about AI agents 3x per week. Here's exactly what we track and what our numbers look like, for calibration:
| Metric | Typical Value | Alert Threshold |
|---|---|---|
| Articles scraped per run | 88–120 | < 40 (source failure likely) |
| Articles passing quality score (≥0.65) | 22–35 | < 10 (scorer may be broken) |
| Cost per newsletter run | $0.08–$0.14 | > $0.40 (prompt regression) |
| Total AI budget per month | ~$3 | > $8 (investigate immediately) |
| End-to-end pipeline duration | 45–75s | > 180s (API latency issue) |
| Scoring step latency (88 articles) | 30–45s | > 120s |
| Publication API success rate | 100% | < 100% (alert immediately) |
| Output word count (newsletter body) | 600–900 words | < 400 or > 1500 words |
The $3/month AI budget for a complete, autonomous newsletter pipeline running 3x/week is achievable because we use DeepSeek V3 for high-volume scoring tasks (88+ articles per run) and Claude only for the final writing step. Cost discipline starts at model selection, not just monitoring.
This kind of pipeline is described in detail in our AI agent use cases guide. The monitoring layer is what makes it trustworthy enough to run without human review of every output.
Alerts have one job: tell you something is wrong before it becomes expensive or embarrassing. The failure mode with alerts is usually having too many (alert fatigue, ignored notifications) rather than too few. Keep your alert list short and meaningful.
If you're running agents on a VPS, the simplest alert channel that's always with you is Telegram. A lightweight wrapper sends messages to a bot:
import os
import requests
def send_alert(message: str, level: str = "warning"):
"""Send an alert to Telegram. Levels: info, warning, critical."""
prefix = {"info": "INFO", "warning": "WARN", "critical": "CRITICAL"}
token = os.environ["TELEGRAM_BOT_TOKEN"]
chat_id = os.environ["TELEGRAM_CHAT_ID"]
text = f"[{prefix.get(level, 'ALERT')}] {message}"
requests.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={"chat_id": chat_id, "text": text},
timeout=5
)
# Usage
if daily_cost > alert_threshold:
send_alert(
f"Cost spike: ${daily_cost:.3f} today vs ${avg_7day:.3f} 7-day avg",
level="critical"
)
This is the most important and most overlooked alert pattern. A dead man's switch sends a "heartbeat" message at the start of each scheduled run. A separate monitoring job checks that heartbeats arrived on schedule. If they didn't, the monitor fires an alert — catching cases where the cron job, VPS, or network silently failed.
# At the START of every scheduled pipeline run:
send_alert(f"Pipeline started: newsletter-{datetime.now().date()}", level="info")
# Separate cron (runs 30 min after expected pipeline start):
# Checks if "Pipeline started: newsletter-{today}" was received.
# If not — send a CRITICAL alert.
A good monitoring dashboard for an AI agent pipeline shows the last 7 days at a glance. You should be able to answer these questions in under 30 seconds by looking at it:
If you're self-hosting, a simple static HTML page that reads from your JSONL logs and renders charts with Chart.js is sufficient. No Grafana cluster needed for a single-agent setup.
For the security implications of logging agent behavior — specifically what not to log (credentials, PII, sensitive tool outputs) — see our AI agent security checklist. The monitoring and security layers need to be designed together.
The right time to add monitoring is before the agent runs in production, not after the first incident. Here's the integration pattern that puts all four pillars in place from day one:
import uuid
from datetime import datetime
def run_pipeline():
run_id = f"newsletter-{datetime.now().strftime('%Y-%m-%d-%H%M')}"
logger = AgentRunLogger(run_id)
metrics = RunMetrics(run_id)
# Heartbeat — proves the run started
send_alert(f"Pipeline started: {run_id}", level="info")
try:
with metrics.time_step("scrape"):
articles = scrape_sources(logger, SOURCES)
logger.log_step("scrape", "ok", {"count": len(articles)})
with metrics.time_step("score"):
scored = score_articles(articles) # wrapped with @track_cost
logger.log_step("score", "ok", {"scored": len(scored), "kept": sum(1 for a in scored if a["score"] >= 0.65)})
with metrics.time_step("write"):
draft = write_newsletter(scored[:10]) # wrapped with @track_cost
logger.log_step("write", "ok", {"words": len(draft["body"].split())})
passed, issues = quality_gate(draft, THRESHOLDS)
if not passed:
logger.log_step("quality_gate", "error", {"issues": issues})
send_alert(f"Quality gate failed on {run_id}: {issues}", level="critical")
return
with metrics.time_step("publish"):
publish_newsletter(draft)
logger.log_step("publish", "ok")
summary = metrics.summary()
logger.log_step("pipeline", "ok", summary)
send_alert(f"Pipeline done: {run_id} | {summary['total_s']}s | {summary['items']} items", level="info")
except Exception as e:
logger.log_error("pipeline", e)
send_alert(f"Pipeline FAILED: {run_id} — {e}", level="critical")
finally:
logger.flush()
if __name__ == "__main__":
run_pipeline()
This 60-line shell around your pipeline gives you run history, cost attribution, per-step timing, structured error logs, quality gates, and alert delivery — the full monitoring stack without any external service.
For agents that go further — making decisions autonomously about what actions to take — also read our guide to building AI agents for the underlying architecture that monitoring wraps around.
AI Agents Weekly covers monitoring, cost optimization, and production agent patterns. 3x/week, free.
Subscribe free →AI agent observability is the ability to understand what your agent is doing, why it's doing it, and whether it's working correctly — from the outside, without modifying the agent to inspect it. It includes logging every LLM call and tool invocation, tracking costs and latency, measuring output quality, and setting up alerts for anomalous behavior. Good observability means that when something goes wrong at 3am, you have enough data to diagnose the root cause without having been there. For context on what AI agents are and how they work, see What Are AI Agents?
For most independent developers and small teams: start with custom JSONL logging to a local file (zero dependencies, zero cost, complete control). When you need traces, a UI, and eval comparison, add Langfuse — it's open source, self-hostable, and has the best balance of features to complexity. Helicone is the fastest to set up (change one URL, done) but has less depth. LangSmith is good if you're using LangChain specifically. Avoid over-engineering your monitoring stack early; a well-structured log file beats a half-configured observability platform.
Wrap every LLM call with a decorator that reads the token usage from the API response and writes it to a SQLite database. Every LLM provider (Anthropic, OpenAI, DeepSeek) returns input and output token counts in their API responses. Multiply by the current per-million-token price to get cost. A daily query on that database gives you a cost report. The full implementation is in the Cost Tracking section above — it's about 40 lines of Python and requires no external service.
The minimum viable alert set: (1) API credit below 20% remaining, (2) two consecutive pipeline failures, (3) any run costing more than 3x the rolling average, (4) quality gate failure, (5) pipeline hasn't run in twice its scheduled interval (dead man's switch). Keep alerts to high-signal conditions only — alert fatigue from low-quality alerts is a real operational problem. Start with these five, then add more only when you identify a gap that existing alerts missed. Also see our AI agent security checklist for security-specific alert patterns.
You can't catch all hallucinations programmatically — that's what makes them dangerous. What you can do: (1) run output quality checks that verify required facts against a known-good source where possible, (2) check that output length and structure match expected ranges (sudden brevity often indicates the model is confabulating rather than reasoning), (3) for pipelines with scores (like relevance scoring), track the score distribution over time and alert on anomalous shifts, (4) sample a percentage of outputs for manual review on a regular cadence. For high-stakes outputs, consider a secondary validation LLM call that checks the primary output for consistency. The use cases guide has specific patterns for different agent types.