AI Agent for Utilities: Automate Grid Management, Water Treatment & Customer Operations
The US electric grid experiences $150 billion per year in outage costs. Water utilities lose 20–30% of treated water to leaks before it reaches customers. Customer call centers handle 2–5 million calls per year per large utility, with 40% being simple billing inquiries. AI agents that predict equipment failure, optimize water treatment chemistry, detect leaks acoustically, and handle customer requests autonomously are reshaping the $2.1 trillion global utilities sector.
This guide covers building autonomous agents for electric, gas, and water utilities. Production-ready Python code, SCADA/OT integration patterns, and hard ROI numbers from real deployments.
Table of Contents
1. Smart Grid Optimization Agent
Modern grids must balance variable renewable generation (solar, wind), distributed energy resources (rooftop solar, batteries, EVs), and demand patterns that shift hourly. The agent processes real-time telemetry from thousands of sensors to optimize voltage regulation, reactive power, and load balancing across the distribution network.
import numpy as np
from datetime import datetime, timedelta
class SmartGridAgent:
"""Optimizes electric grid operations in real time."""
VOLTAGE_LIMITS = {
"distribution_120v": {"min": 114, "max": 126}, # ANSI C84.1 Range A
"distribution_240v": {"min": 228, "max": 252},
"primary_4kv": {"min": 3800, "max": 4200},
"primary_13kv": {"min": 12350, "max": 13650},
}
def __init__(self, scada_feed, weather_api, der_registry, market_api):
self.scada = scada_feed
self.weather = weather_api
self.der = der_registry # Distributed Energy Resources
self.market = market_api
def optimize_voltage(self, feeder_id):
"""Volt-VAR optimization for a distribution feeder."""
measurements = self.scada.get_feeder_measurements(feeder_id)
capacitor_banks = self.scada.get_capacitors(feeder_id)
voltage_regulators = self.scada.get_regulators(feeder_id)
violations = []
actions = []
for node in measurements:
voltage_class = node["voltage_class"]
limits = self.VOLTAGE_LIMITS.get(voltage_class, {})
v = node["voltage"]
if v < limits.get("min", 0):
violations.append({
"node": node["id"],
"type": "undervoltage",
"value": v,
"limit": limits["min"],
"deviation_pct": round((limits["min"] - v) / limits["min"] * 100, 2),
})
elif v > limits.get("max", 999999):
violations.append({
"node": node["id"],
"type": "overvoltage",
"value": v,
"limit": limits["max"],
"deviation_pct": round((v - limits["max"]) / limits["max"] * 100, 2),
})
# Determine corrections
if violations:
low_v = [v for v in violations if v["type"] == "undervoltage"]
high_v = [v for v in violations if v["type"] == "overvoltage"]
if low_v:
# Switch on capacitor banks to boost voltage
for cap in capacitor_banks:
if cap["status"] == "open":
actions.append({
"device": cap["id"],
"action": "close",
"type": "capacitor_bank",
"expected_voltage_boost_pct": 1.5,
})
break
# Raise voltage regulator taps
for reg in voltage_regulators:
if reg["tap_position"] < reg["max_tap"]:
actions.append({
"device": reg["id"],
"action": "raise_tap",
"type": "voltage_regulator",
"current_tap": reg["tap_position"],
"new_tap": reg["tap_position"] + 1,
})
if high_v:
# Opposite: open caps, lower taps
for cap in capacitor_banks:
if cap["status"] == "closed":
actions.append({
"device": cap["id"],
"action": "open",
"type": "capacitor_bank",
})
break
return {
"feeder_id": feeder_id,
"timestamp": datetime.utcnow().isoformat(),
"violations": violations,
"actions": actions,
"total_nodes": len(measurements),
"compliant_pct": round(
(len(measurements) - len(violations)) / len(measurements) * 100, 1
),
}
def forecast_renewable_output(self, feeder_id, hours_ahead=24):
"""Predict solar/wind generation on a feeder."""
der_assets = self.der.get_assets(feeder_id)
solar = [d for d in der_assets if d["type"] == "solar"]
wind = [d for d in der_assets if d["type"] == "wind"]
forecast = []
weather = self.weather.get_hourly_forecast(feeder_id, hours_ahead)
for hour_offset in range(hours_ahead):
wx = weather[hour_offset]
solar_output = sum(
s["capacity_kw"] * self._solar_capacity_factor(
wx["cloud_cover_pct"], wx["solar_elevation_deg"]
) for s in solar
)
wind_output = sum(
w["capacity_kw"] * self._wind_capacity_factor(
wx["wind_speed_ms"], w["hub_height_m"]
) for w in wind
)
forecast.append({
"hour_offset": hour_offset,
"solar_kw": round(solar_output, 1),
"wind_kw": round(wind_output, 1),
"total_der_kw": round(solar_output + wind_output, 1),
"cloud_cover": wx["cloud_cover_pct"],
"wind_speed": wx["wind_speed_ms"],
})
return forecast
def _solar_capacity_factor(self, cloud_cover_pct, elevation_deg):
if elevation_deg <= 0:
return 0
clear_sky = min(1.0, elevation_deg / 60) # Simplified
cloud_factor = 1 - (cloud_cover_pct / 100 * 0.75)
return clear_sky * cloud_factor
def _wind_capacity_factor(self, wind_speed_ms, hub_height):
# Simplified power curve (cut-in 3, rated 12, cut-out 25 m/s)
if wind_speed_ms < 3 or wind_speed_ms > 25:
return 0
if wind_speed_ms >= 12:
return 1.0
return ((wind_speed_ms - 3) / 9) ** 3
2. Outage Prediction & Response Agent
Power outages cost the US economy $150B per year. Most are caused by vegetation contact, equipment failure, and weather. The agent combines weather forecasts, vegetation growth models, equipment age data, and historical outage patterns to predict and prevent outages before they happen.
class OutagePredictionAgent:
"""Predicts and manages power outages."""
def __init__(self, oms_client, weather_api, asset_db, vegetation_model):
self.oms = oms_client # Outage Management System
self.weather = weather_api
self.assets = asset_db
self.veg = vegetation_model
def predict_outage_risk(self, region, hours_ahead=48):
"""Predict outage probability by circuit."""
circuits = self.assets.get_circuits(region)
weather = self.weather.get_severe_forecast(region, hours_ahead)
risk_scores = []
for circuit in circuits:
# Weather risk
weather_risk = 0
if weather.get("wind_gust_mph", 0) > 40:
weather_risk += 0.3
if weather.get("ice_accumulation_in", 0) > 0.25:
weather_risk += 0.5 # Ice storms are devastating
if weather.get("lightning_probability", 0) > 0.6:
weather_risk += 0.15
# Equipment age risk
avg_pole_age = circuit.get("avg_pole_age_years", 30)
equipment_risk = min(0.3, avg_pole_age / 100)
# Vegetation risk
veg_score = self.veg.get_encroachment_score(circuit["id"])
veg_risk = veg_score * 0.25
# Historical pattern
historical = self.oms.get_outage_frequency(
circuit["id"], years=3
)
history_risk = min(0.2, historical / 10)
composite = min(1.0, weather_risk + equipment_risk + veg_risk + history_risk)
risk_scores.append({
"circuit_id": circuit["id"],
"circuit_name": circuit["name"],
"customers_served": circuit["customer_count"],
"risk_score": round(composite, 3),
"risk_level": (
"critical" if composite > 0.7
else "high" if composite > 0.4
else "medium" if composite > 0.2
else "low"
),
"primary_driver": max(
[("weather", weather_risk), ("equipment", equipment_risk),
("vegetation", veg_risk), ("history", history_risk)],
key=lambda x: x[1]
)[0],
"components": {
"weather": round(weather_risk, 3),
"equipment": round(equipment_risk, 3),
"vegetation": round(veg_risk, 3),
"history": round(history_risk, 3),
},
})
return sorted(risk_scores, key=lambda r: -r["risk_score"])
def optimize_crew_dispatch(self, active_outages, available_crews):
"""Assign restoration crews to outages by priority."""
# Priority: customers affected * estimated restoration time * critical facilities
scored_outages = []
for outage in active_outages:
critical_count = outage.get("critical_facilities", 0) # hospitals, etc
customer_impact = outage["customers_affected"]
est_restore_hrs = outage["estimated_restore_hours"]
priority = (
customer_impact * 1.0 +
critical_count * 500 +
(1 / max(est_restore_hrs, 0.5)) * 100 # Faster fixes first
)
scored_outages.append({**outage, "priority_score": priority})
scored_outages.sort(key=lambda o: -o["priority_score"])
assignments = []
assigned_crews = set()
for outage in scored_outages:
best_crew = None
best_travel = float("inf")
for crew in available_crews:
if crew["id"] in assigned_crews:
continue
if crew["skill_level"] < outage.get("min_skill_required", 1):
continue
travel = self._estimate_travel_time(
crew["location"], outage["location"]
)
if travel < best_travel:
best_travel = travel
best_crew = crew
if best_crew:
assignments.append({
"outage_id": outage["id"],
"crew_id": best_crew["id"],
"travel_time_min": round(best_travel),
"customers_affected": outage["customers_affected"],
"priority_score": round(outage["priority_score"]),
})
assigned_crews.add(best_crew["id"])
return assignments
3. Water Treatment Optimization Agent
Water treatment plants consume 2–4% of national electricity and use $3–8 billion in chemicals annually. Raw water quality varies daily based on weather, seasonal runoff, and upstream events. The agent adjusts chemical dosing, filtration rates, and disinfection in real time to maintain EPA compliance while minimizing chemical and energy costs.
class WaterTreatmentAgent:
"""Optimizes water treatment plant operations."""
EPA_LIMITS = {
"turbidity_ntu": 0.3, # 95th percentile
"chlorine_residual_mg_l": {"min": 0.2, "max": 4.0},
"ph": {"min": 6.5, "max": 8.5},
"lead_ppb": 15,
"thm_ppb": 80, # Trihalomethanes
"haa5_ppb": 60, # Haloacetic acids
}
def __init__(self, scada_feed, lab_results, chemical_inventory, weather_api):
self.scada = scada_feed
self.lab = lab_results
self.chemicals = chemical_inventory
self.weather = weather_api
def optimize_coagulant_dose(self, plant_id):
"""Adjust coagulant dosing based on raw water quality."""
raw = self.scada.get_current(plant_id, "raw_water")
turbidity = raw["turbidity_ntu"]
ph = raw["ph"]
temperature = raw["temp_c"]
toc = raw.get("toc_mg_l", 3.0) # Total organic carbon
alkalinity = raw.get("alkalinity_mg_l", 100)
# Jar test correlation model
# Base dose from turbidity (empirical)
if turbidity < 5:
base_dose = 15 # mg/L alum
elif turbidity < 20:
base_dose = 25
elif turbidity < 100:
base_dose = 40
else:
base_dose = 60
# Temperature correction (cold water needs more coagulant)
if temperature < 5:
temp_factor = 1.3
elif temperature < 10:
temp_factor = 1.15
else:
temp_factor = 1.0
# TOC correction (more organics = more coagulant)
toc_factor = 1.0 + max(0, (toc - 2.0)) * 0.1
# pH adjustment needed?
optimal_ph = 6.8 # Optimal for alum coagulation
ph_adjustment = optimal_ph - ph
recommended_dose = round(base_dose * temp_factor * toc_factor, 1)
return {
"plant_id": plant_id,
"raw_turbidity_ntu": turbidity,
"raw_ph": ph,
"raw_temp_c": temperature,
"raw_toc_mg_l": toc,
"recommended_coagulant_mg_l": recommended_dose,
"current_dose_mg_l": self.scada.get_current_dose(plant_id, "coagulant"),
"ph_adjustment_needed": round(ph_adjustment, 2),
"alkalinity_sufficient": alkalinity > recommended_dose * 0.45,
}
def monitor_disinfection(self, plant_id):
"""Ensure adequate disinfection while minimizing DBP formation."""
clearwell = self.scada.get_current(plant_id, "clearwell")
ct = clearwell["chlorine_mg_l"] * clearwell["contact_time_min"]
# CT requirement depends on temperature and pH
required_ct = self._get_required_ct(
clearwell["temp_c"], clearwell["ph"],
target_log_removal=3.0 # 3-log Giardia
)
# DBP formation potential
toc = clearwell.get("toc_mg_l", 2.0)
chlorine = clearwell["chlorine_mg_l"]
temp = clearwell["temp_c"]
# Simplified THM formation model
estimated_thm = toc * chlorine * (temp / 20) * 8 # ppb estimate
return {
"plant_id": plant_id,
"actual_ct": round(ct, 1),
"required_ct": round(required_ct, 1),
"ct_ratio": round(ct / required_ct, 2),
"compliant": ct >= required_ct,
"chlorine_residual": clearwell["chlorine_mg_l"],
"estimated_thm_ppb": round(estimated_thm, 1),
"thm_limit_ppb": self.EPA_LIMITS["thm_ppb"],
"thm_margin_pct": round(
(1 - estimated_thm / self.EPA_LIMITS["thm_ppb"]) * 100, 1
),
"recommendation": (
"reduce_chlorine" if estimated_thm > 60 and ct > required_ct * 1.5
else "increase_chlorine" if ct < required_ct * 1.1
else "maintain"
),
}
4. Leak Detection Agent
Water utilities lose 20–30% of treated water to leaks — called non-revenue water (NRW). In developing countries, NRW can exceed 50%. The agent uses acoustic sensors, flow balance analysis, and pressure transient monitoring to detect and locate leaks in real time.
class LeakDetectionAgent:
"""Detects and locates water network leaks."""
def __init__(self, sensor_network, hydraulic_model, gis_data):
self.sensors = sensor_network
self.model = hydraulic_model
self.gis = gis_data
def district_metered_area_analysis(self, dma_id):
"""Flow balance analysis for a DMA to detect leaks."""
inflow = self.sensors.get_flow(dma_id, "inlet")
outflows = self.sensors.get_flow(dma_id, "outlets")
consumption = self.sensors.get_metered_consumption(dma_id)
total_inflow = sum(f["flow_m3h"] for f in inflow)
total_outflow = sum(f["flow_m3h"] for f in outflows)
total_consumption = consumption["total_m3h"]
net_inflow = total_inflow - total_outflow
unaccounted = net_inflow - total_consumption
nrw_pct = (unaccounted / net_inflow * 100) if net_inflow > 0 else 0
# Minimum Night Flow (MNF) analysis — best leak indicator
mnf = self.sensors.get_minimum_night_flow(dma_id)
legitimate_night_use = consumption.get("night_use_estimate_m3h", 0.5)
leak_estimate = max(0, mnf - legitimate_night_use)
# Trend analysis
mnf_history = self.sensors.get_mnf_history(dma_id, days=30)
mnf_trend = np.polyfit(range(len(mnf_history)), mnf_history, 1)[0]
return {
"dma_id": dma_id,
"net_inflow_m3h": round(net_inflow, 2),
"consumption_m3h": round(total_consumption, 2),
"unaccounted_m3h": round(unaccounted, 2),
"nrw_pct": round(nrw_pct, 1),
"mnf_m3h": round(mnf, 2),
"estimated_leakage_m3h": round(leak_estimate, 2),
"mnf_trend_m3h_per_day": round(mnf_trend, 4),
"alert": nrw_pct > 25 or mnf_trend > 0.05,
"severity": (
"critical" if nrw_pct > 40 or leak_estimate > 10
else "high" if nrw_pct > 25
else "medium" if nrw_pct > 15
else "low"
),
}
def acoustic_leak_locate(self, pipe_segment_id):
"""Use acoustic sensor correlation to pinpoint leak location."""
sensors = self.sensors.get_acoustic_pair(pipe_segment_id)
if len(sensors) < 2:
return {"error": "Need 2 acoustic sensors for correlation"}
s1, s2 = sensors[0], sensors[1]
pipe = self.gis.get_pipe(pipe_segment_id)
# Cross-correlation of acoustic signals
correlation = self._cross_correlate(
s1["signal"], s2["signal"], s1["sample_rate"]
)
time_delay = correlation["peak_delay_ms"]
# Calculate leak position
pipe_length = pipe["length_m"]
sound_speed = self._get_sound_speed(pipe["material"], pipe["diameter_mm"])
distance_from_s1 = (
pipe_length / 2 +
(time_delay / 1000 * sound_speed) / 2
)
return {
"pipe_segment": pipe_segment_id,
"distance_from_sensor_1_m": round(distance_from_s1, 1),
"confidence": correlation["confidence"],
"pipe_material": pipe["material"],
"pipe_diameter_mm": pipe["diameter_mm"],
"gps_location": self.gis.interpolate_position(
pipe_segment_id, distance_from_s1
),
}
5. Demand Response Agent
Peak electricity demand drives 10–25% of total grid investment despite occurring less than 100 hours per year. Demand response programs that reduce peak load by 5–10% can defer billions in infrastructure spending. The agent coordinates load curtailment across commercial, industrial, and residential customers.
class DemandResponseAgent:
"""Manages demand response events and distributed resources."""
def __init__(self, customer_db, der_registry, market_api, weather_api):
self.customers = customer_db
self.der = der_registry
self.market = market_api
self.weather = weather_api
def trigger_demand_response(self, target_reduction_mw, duration_hours):
"""Orchestrate a demand response event."""
# Get all enrolled resources sorted by cost
resources = self.customers.get_dr_enrolled()
resource_stack = []
for r in resources:
curtailable_kw = r["max_curtailment_kw"]
cost_per_kwh = r.get("incentive_rate", 0.25)
reliability = r.get("historical_performance", 0.85)
resource_stack.append({
"customer_id": r["id"],
"name": r["name"],
"type": r["customer_type"], # commercial, industrial, residential
"curtailable_kw": curtailable_kw,
"effective_kw": curtailable_kw * reliability,
"cost_per_kwh": cost_per_kwh,
"total_event_cost": curtailable_kw * cost_per_kwh * duration_hours,
})
# Sort by cost-effectiveness
resource_stack.sort(key=lambda r: r["cost_per_kwh"])
# Stack resources until target met
dispatched = []
total_dispatched_kw = 0
target_kw = target_reduction_mw * 1000
for resource in resource_stack:
if total_dispatched_kw >= target_kw * 1.1: # 10% over-dispatch
break
dispatched.append(resource)
total_dispatched_kw += resource["effective_kw"]
total_cost = sum(r["total_event_cost"] for r in dispatched)
avoided_market_cost = target_reduction_mw * self.market.get_peak_price() * duration_hours
return {
"target_mw": target_reduction_mw,
"dispatched_mw": round(total_dispatched_kw / 1000, 2),
"resources_dispatched": len(dispatched),
"duration_hours": duration_hours,
"total_incentive_cost": round(total_cost),
"avoided_market_cost": round(avoided_market_cost),
"net_savings": round(avoided_market_cost - total_cost),
"dispatched_resources": dispatched,
}
6. Customer Operations Agent
Utility customer centers handle 2–5 million calls per year. 40% are billing inquiries, 25% are outage reports, 15% are service requests. AI agents can handle 70–85% of these interactions autonomously, reducing call center costs by $15–25M per year.
class CustomerOpsAgent:
"""Handles utility customer inquiries autonomously."""
def __init__(self, billing_db, oms_client, crm_client, llm):
self.billing = billing_db
self.oms = oms_client
self.crm = crm_client
self.llm = llm
def handle_inquiry(self, customer_id, inquiry_text):
"""Route and resolve customer inquiries."""
# Classify intent
intent = self._classify_intent(inquiry_text)
customer = self.crm.get_customer(customer_id)
if intent == "billing_inquiry":
return self._handle_billing(customer, inquiry_text)
elif intent == "outage_report":
return self._handle_outage(customer)
elif intent == "high_bill":
return self._handle_high_bill(customer)
elif intent == "payment_arrangement":
return self._handle_payment_plan(customer)
elif intent == "service_request":
return self._handle_service_request(customer, inquiry_text)
else:
return {"action": "escalate_to_agent", "reason": "unclassified_intent"}
def _handle_high_bill(self, customer):
"""Analyze and explain high bill complaints."""
current = self.billing.get_current_bill(customer["id"])
previous = self.billing.get_bill_history(customer["id"], months=12)
avg_bill = np.mean([b["amount"] for b in previous])
# Usage analysis
usage_current = current["usage_kwh"]
usage_avg = np.mean([b["usage_kwh"] for b in previous])
usage_increase_pct = (usage_current - usage_avg) / usage_avg * 100
# Weather impact
hdd = current.get("heating_degree_days", 0)
cdd = current.get("cooling_degree_days", 0)
avg_hdd = np.mean([b.get("heating_degree_days", 0) for b in previous])
avg_cdd = np.mean([b.get("cooling_degree_days", 0) for b in previous])
# Rate change check
rate_changed = current["rate_per_kwh"] != previous[-1]["rate_per_kwh"]
explanations = []
if usage_increase_pct > 15:
if hdd > avg_hdd * 1.2:
explanations.append(
f"Heating demand was {round((hdd/avg_hdd - 1)*100)}% above average due to colder weather"
)
elif cdd > avg_cdd * 1.2:
explanations.append(
f"Cooling demand was {round((cdd/avg_cdd - 1)*100)}% above average due to hotter weather"
)
else:
explanations.append(
f"Usage increased {round(usage_increase_pct)}% vs your 12-month average"
)
if rate_changed:
explanations.append("A rate adjustment took effect this billing period")
return {
"action": "auto_respond",
"current_bill": current["amount"],
"average_bill": round(avg_bill, 2),
"usage_change_pct": round(usage_increase_pct, 1),
"explanations": explanations,
"suggestions": [
"Enroll in budget billing to spread costs evenly",
"Schedule a free energy audit",
"Review thermostat settings",
],
}
7. ROI Analysis
| Agent | Annual Savings | Implementation | Payback |
|---|---|---|---|
| Smart Grid (VVO) | $20–50M (loss reduction) | $5–10M | 3–6 months |
| Outage Prediction | $15–40M (avoided outages) | $3–6M | 3–5 months |
| Water Treatment | $5–12M (chemicals + energy) | $2–4M | 4–8 months |
| Leak Detection | $10–30M (water savings) | $4–8M | 4–8 months |
| Demand Response | $30–80M (deferred capex) | $5–10M | 2–4 months |
| Customer Operations | $15–25M (call center) | $2–4M | 2–3 months |
Total portfolio: $95–237M in annual savings against $21–42M in implementation costs. The biggest impact comes from demand response (deferred infrastructure) and smart grid optimization (distribution loss reduction).
Build Your Own AI Agent
Get the complete blueprint for building autonomous AI agents — includes templates, security checklists, and deployment guides.
Get The AI Agent Playbook — $19