AI Agent for Printing & Packaging: Automate Prepress, Quality Control & Production Planning

March 28, 2026 15 min read Printing & Packaging

The global printing and packaging industry generates over $900 billion in annual revenue, yet most commercial printers still rely on manual preflight checks, subjective color approvals, and spreadsheet-based production scheduling. A single prepress error that reaches the press floor can waste $5,000-15,000 in substrate, ink, and machine time. Color complaints account for 35% of all customer reprints, and makeready waste on offset presses averages 3-7% of total substrate consumption.

AI agents built for print production go far beyond basic automation rules. They can analyze PDF files for dozens of technical issues simultaneously, match colors across substrates using spectral data, detect print defects at press speed through inline cameras, and optimize job scheduling across multiple presses to minimize setup time and maximize throughput. These agents reason about the interdependencies between prepress decisions, press capabilities, and finishing requirements.

This guide covers six core areas where AI agents transform printing and packaging operations, with production-ready Python code for each. Whether you run a single sheetfed press or a multi-site operation with 20 presses, these patterns deliver measurable results from day one.

Table of Contents

1. Prepress Automation

Prepress is where most print defects originate. A file with 200 DPI images destined for a 175 LPI offset press, missing bleed on a die-cut package, embedded RGB images in a CMYK workflow, or fonts that are not embedded properly — any of these will cause problems downstream. Traditional preflight tools flag issues but still require a human operator to evaluate each warning, decide on corrections, and manually fix files. An AI agent can perform the entire preflight, evaluate severity, apply corrections automatically for known patterns, and escalate only genuinely ambiguous cases to a human.

Beyond individual file checking, the agent handles imposition optimization — deciding how to arrange multiple jobs on a press sheet to maximize substrate utilization. Gang-run planning for digital and sheetfed offset presses can improve sheet utilization from a typical 65-75% up to 85-92% by intelligently grouping jobs with compatible substrates, color requirements, and delivery dates. The agent also manages color separation, verifying ICC profile assignments, optimizing spot-to-process conversions, and ensuring that total ink coverage stays within press limits.

PDF correction automation is where the agent saves the most operator time. Instead of opening each file in Acrobat or PitStop, the agent programmatically converts RGB to CMYK using the correct profile, adds bleed by mirroring edge content, embeds or outlines non-embedded fonts, and flattens transparency for RIP compatibility — all in seconds, with a complete audit trail.

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import Enum
import math

class Severity(Enum):
    CRITICAL = "critical"    # will cause print failure
    WARNING = "warning"      # may cause quality issues
    INFO = "info"            # suboptimal but printable

@dataclass
class PreflightIssue:
    category: str
    description: str
    severity: Severity
    auto_fixable: bool
    fix_applied: str = ""

@dataclass
class PDFPageSpec:
    width_mm: float
    height_mm: float
    bleed_mm: Dict[str, float]     # {"top": 3, "bottom": 3, ...}
    images: List[Dict]              # [{"dpi": 300, "colorspace": "CMYK", ...}]
    fonts: List[Dict]               # [{"name": "Helvetica", "embedded": True}]
    color_spaces: List[str]         # ["CMYK", "Spot:Pantone 185 C"]
    total_ink_coverage: float       # max TAC percentage
    has_transparency: bool
    trim_box_mm: Tuple[float, float, float, float]

@dataclass
class JobSheet:
    sheet_width_mm: float
    sheet_height_mm: float
    gripper_mm: float = 12.0
    tail_margin_mm: float = 8.0
    side_margin_mm: float = 6.0
    color_bar_mm: float = 10.0

class PrepressAutomationAgent:
    """AI agent for PDF preflight, imposition, and automated correction."""

    MIN_DPI_OFFSET = 300
    MIN_DPI_DIGITAL = 200
    MIN_BLEED_MM = 3.0
    MAX_TAC_OFFSET = 320         # sheetfed coated
    MAX_TAC_FLEXO = 280
    MAX_TAC_DIGITAL = 300

    def __init__(self, press_type: str = "offset", lpi: int = 175):
        self.press_type = press_type
        self.lpi = lpi
        self.min_dpi = self.MIN_DPI_OFFSET if press_type == "offset" else self.MIN_DPI_DIGITAL

    def preflight_page(self, page: PDFPageSpec) -> Dict:
        """Run full preflight check on a PDF page."""
        issues = []

        # Resolution check
        for img in page.images:
            if img["dpi"] < self.min_dpi:
                issues.append(PreflightIssue(
                    category="resolution",
                    description=f"Image at {img['dpi']} DPI, minimum {self.min_dpi} required",
                    severity=Severity.CRITICAL if img["dpi"] < 150 else Severity.WARNING,
                    auto_fixable=False
                ))

        # Bleed check
        for side, value in page.bleed_mm.items():
            if value < self.MIN_BLEED_MM:
                issues.append(PreflightIssue(
                    category="bleed",
                    description=f"{side} bleed {value}mm, minimum {self.MIN_BLEED_MM}mm required",
                    severity=Severity.CRITICAL,
                    auto_fixable=True,
                    fix_applied=f"Mirror-extended {side} bleed to {self.MIN_BLEED_MM}mm"
                ))

        # Font embedding
        for font in page.fonts:
            if not font["embedded"]:
                issues.append(PreflightIssue(
                    category="fonts",
                    description=f"Font '{font['name']}' not embedded",
                    severity=Severity.CRITICAL,
                    auto_fixable=True,
                    fix_applied=f"Embedded font '{font['name']}' as subset"
                ))

        # Color space validation
        for cs in page.color_spaces:
            if cs == "RGB":
                issues.append(PreflightIssue(
                    category="colorspace",
                    description="RGB color space detected in CMYK workflow",
                    severity=Severity.WARNING,
                    auto_fixable=True,
                    fix_applied="Converted RGB to CMYK using ISO Coated v2 profile"
                ))

        # Total ink coverage
        max_tac = self.MAX_TAC_OFFSET if self.press_type == "offset" else self.MAX_TAC_DIGITAL
        if page.total_ink_coverage > max_tac:
            issues.append(PreflightIssue(
                category="ink_coverage",
                description=f"TAC {page.total_ink_coverage}% exceeds {max_tac}% limit",
                severity=Severity.WARNING,
                auto_fixable=True,
                fix_applied=f"Reduced TAC to {max_tac}% using GCR adjustment"
            ))

        # Transparency
        if page.has_transparency and self.press_type == "offset":
            issues.append(PreflightIssue(
                category="transparency",
                description="Live transparency may cause RIP issues",
                severity=Severity.INFO,
                auto_fixable=True,
                fix_applied="Flattened transparency at 1200 DPI"
            ))

        auto_fixed = [i for i in issues if i.auto_fixable and i.fix_applied]
        critical = [i for i in issues if i.severity == Severity.CRITICAL and not i.auto_fixable]

        return {
            "total_issues": len(issues),
            "critical_unfixed": len(critical),
            "auto_fixed": len(auto_fixed),
            "pass": len(critical) == 0,
            "issues": [{"category": i.category, "description": i.description,
                        "severity": i.severity.value, "fix": i.fix_applied} for i in issues]
        }

    def optimize_imposition(self, jobs: List[Dict], sheet: JobSheet) -> Dict:
        """Calculate optimal imposition layout for gang-run printing."""
        printable_w = sheet.sheet_width_mm - 2 * sheet.side_margin_mm
        printable_h = (sheet.sheet_height_mm - sheet.gripper_mm
                       - sheet.tail_margin_mm - sheet.color_bar_mm)

        placed_jobs = []
        total_print_area = 0
        sheet_area = printable_w * printable_h

        # Sort jobs by area (largest first) for better packing
        sorted_jobs = sorted(jobs, key=lambda j: j["width"] * j["height"], reverse=True)

        # Simple shelf-based bin packing
        shelves = []  # [(y_start, height, remaining_width, jobs)]

        for job in sorted_jobs:
            jw = job["width"] + self.MIN_BLEED_MM * 2
            jh = job["height"] + self.MIN_BLEED_MM * 2
            ups = job.get("quantity_up", 1)
            placed = False

            for orientation in [(jw, jh), (jh, jw)]:
                w, h = orientation
                for shelf in shelves:
                    if w <= shelf["remaining"] and h <= shelf["height"]:
                        shelf["remaining"] -= w
                        shelf["jobs"].append({"job_id": job["id"], "ups": ups})
                        total_print_area += w * h * ups
                        placed = True
                        break
                if placed:
                    break

            if not placed:
                # Start new shelf
                current_y = sum(s["height"] for s in shelves)
                if current_y + jh <= printable_h and jw <= printable_w:
                    shelves.append({
                        "y": current_y, "height": jh,
                        "remaining": printable_w - jw,
                        "jobs": [{"job_id": job["id"], "ups": ups}]
                    })
                    total_print_area += jw * jh * ups
                    placed = True

            if placed:
                placed_jobs.append(job["id"])

        utilization = (total_print_area / sheet_area * 100) if sheet_area > 0 else 0

        return {
            "sheet_size": f"{sheet.sheet_width_mm}x{sheet.sheet_height_mm}mm",
            "jobs_placed": len(placed_jobs),
            "jobs_total": len(jobs),
            "sheet_utilization_pct": round(min(utilization, 100), 1),
            "shelves_used": len(shelves),
            "waste_area_pct": round(100 - min(utilization, 100), 1),
            "layout": [{"shelf": i, "jobs": s["jobs"]} for i, s in enumerate(shelves)]
        }
Key insight: Automated preflight with auto-correction handles 70-80% of file issues without operator intervention. The remaining 20-30% are genuinely ambiguous cases (e.g., intentionally low-resolution artistic effects, spot colors used decoratively) that benefit from human judgment. This shifts the prepress operator's role from repetitive checking to exception handling.

2. Color Management & Consistency

Color consistency is the single largest source of customer complaints in commercial printing. A brand owner expects their Pantone 186 C red to look identical whether it is printed on coated stock via offset, on corrugated board via flexo, or on a poly-wrapped package via gravure. The human eye can detect color differences as small as Delta E 2.0 under controlled lighting, and trained brand managers often reject prints at Delta E 3.0 or above. Achieving this consistency across substrates, inks, and press conditions requires spectral measurement, not visual judgment.

An AI color management agent integrates spectrophotometer readings from the press floor, maintains press-specific ICC profiles, and continuously adjusts ink formulations to hit target Lab values. For Pantone-to-CMYK simulation, the agent goes beyond standard lookup tables by factoring in the actual substrate's whiteness, surface finish, and ink absorption characteristics. It also manages G7 calibration, the industry-standard method for gray balance and tonality, by automatically calculating NPDC (Neutral Print Density Curve) corrections and generating press curves.

The agent tracks color compliance across every job and substrate combination, building a historical database of achievable Delta E values per press, ink set, and paper. When a new job comes in with strict brand color specifications, the agent can predict whether the target is achievable on a given press and recommend the best press/substrate combination to minimize color risk. This prevents jobs from being assigned to presses that physically cannot hit the required color targets.

from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import math

@dataclass
class LabColor:
    L: float    # lightness 0-100
    a: float    # green-red axis
    b: float    # blue-yellow axis

@dataclass
class SpectralReading:
    press_id: str
    job_id: str
    patch_name: str
    target: LabColor
    measured: LabColor
    substrate: str
    timestamp: str

@dataclass
class InkFormulation:
    ink_name: str
    components: Dict[str, float]   # {"cyan": 0.45, "magenta": 0.92, ...}
    target_lab: LabColor
    substrate: str

@dataclass
class PressProfile:
    press_id: str
    dot_gain_curves: Dict[str, List[float]]   # {"C": [0,5,12,18,...], "M": [...]}
    density_targets: Dict[str, float]          # {"C": 1.40, "M": 1.50, ...}
    max_tac: float
    substrate_type: str

class ColorManagementAgent:
    """AI agent for spectral color matching, ink optimization, and G7 calibration."""

    DELTA_E_EXCELLENT = 1.0
    DELTA_E_ACCEPTABLE = 3.0
    DELTA_E_REJECT = 5.0

    def __init__(self):
        self.readings_history = []
        self.press_profiles = {}

    def delta_e_2000(self, lab1: LabColor, lab2: LabColor) -> float:
        """Calculate CIEDE2000 color difference."""
        avg_L = (lab1.L + lab2.L) / 2
        C1 = math.sqrt(lab1.a**2 + lab1.b**2)
        C2 = math.sqrt(lab2.a**2 + lab2.b**2)
        avg_C = (C1 + C2) / 2

        G = 0.5 * (1 - math.sqrt(avg_C**7 / (avg_C**7 + 25**7)))
        a1p = lab1.a * (1 + G)
        a2p = lab2.a * (1 + G)

        C1p = math.sqrt(a1p**2 + lab1.b**2)
        C2p = math.sqrt(a2p**2 + lab2.b**2)
        avg_Cp = (C1p + C2p) / 2

        h1p = math.degrees(math.atan2(lab1.b, a1p)) % 360
        h2p = math.degrees(math.atan2(lab2.b, a2p)) % 360

        dLp = lab2.L - lab1.L
        dCp = C2p - C1p

        if abs(h1p - h2p) <= 180:
            dhp = h2p - h1p
        elif h2p - h1p > 180:
            dhp = h2p - h1p - 360
        else:
            dhp = h2p - h1p + 360

        dHp = 2 * math.sqrt(C1p * C2p) * math.sin(math.radians(dhp / 2))

        avg_Hp = (h1p + h2p) / 2 if abs(h1p - h2p) <= 180 else (h1p + h2p + 360) / 2
        T = (1 - 0.17 * math.cos(math.radians(avg_Hp - 30))
             + 0.24 * math.cos(math.radians(2 * avg_Hp))
             + 0.32 * math.cos(math.radians(3 * avg_Hp + 6))
             - 0.20 * math.cos(math.radians(4 * avg_Hp - 63)))

        SL = 1 + 0.015 * (avg_L - 50)**2 / math.sqrt(20 + (avg_L - 50)**2)
        SC = 1 + 0.045 * avg_Cp
        SH = 1 + 0.015 * avg_Cp * T

        RT_angle = 30 * math.exp(-((avg_Hp - 275) / 25)**2)
        RC = 2 * math.sqrt(avg_Cp**7 / (avg_Cp**7 + 25**7))
        RT = -math.sin(math.radians(2 * RT_angle)) * RC

        dE = math.sqrt(
            (dLp / SL)**2 + (dCp / SC)**2 + (dHp / SH)**2
            + RT * (dCp / SC) * (dHp / SH)
        )
        return round(dE, 2)

    def evaluate_color_match(self, readings: List[SpectralReading]) -> Dict:
        """Score color accuracy across all patches in a press run."""
        results = []
        for r in readings:
            de = self.delta_e_2000(r.target, r.measured)
            status = ("excellent" if de <= self.DELTA_E_EXCELLENT
                      else "acceptable" if de <= self.DELTA_E_ACCEPTABLE
                      else "out_of_spec")
            results.append({
                "patch": r.patch_name,
                "target_lab": (r.target.L, r.target.a, r.target.b),
                "measured_lab": (r.measured.L, r.measured.a, r.measured.b),
                "delta_e": de,
                "status": status
            })
            self.readings_history.append(r)

        avg_de = sum(r["delta_e"] for r in results) / max(len(results), 1)
        max_de = max(r["delta_e"] for r in results) if results else 0
        out_of_spec = sum(1 for r in results if r["status"] == "out_of_spec")

        return {
            "total_patches": len(results),
            "avg_delta_e": round(avg_de, 2),
            "max_delta_e": round(max_de, 2),
            "out_of_spec_count": out_of_spec,
            "pass": out_of_spec == 0 and avg_de <= self.DELTA_E_ACCEPTABLE,
            "patches": results,
            "recommendation": self._color_recommendation(avg_de, max_de, out_of_spec)
        }

    def optimize_ink_formulation(self, target: LabColor, substrate: str,
                                  current_formula: InkFormulation,
                                  measured: LabColor) -> Dict:
        """Adjust CMYK ink percentages to minimize Delta E to target."""
        de_current = self.delta_e_2000(target, measured)
        adjustments = {}

        # Calculate directional corrections based on Lab error
        dL = target.L - measured.L    # + = need lighter
        da = target.a - measured.a    # + = need more red/less green
        db = target.b - measured.b    # + = need more yellow/less blue

        components = dict(current_formula.components)

        # Lightness: adjust black (K) component
        if abs(dL) > 1.0:
            k_adjust = -dL * 0.008
            components["black"] = max(0, min(1, components.get("black", 0) + k_adjust))
            adjustments["black"] = round(k_adjust * 100, 1)

        # Red-green axis: adjust magenta and cyan
        if abs(da) > 1.0:
            m_adjust = da * 0.006
            c_adjust = -da * 0.004
            components["magenta"] = max(0, min(1, components.get("magenta", 0) + m_adjust))
            components["cyan"] = max(0, min(1, components.get("cyan", 0) + c_adjust))
            adjustments["magenta"] = round(m_adjust * 100, 1)
            adjustments["cyan"] = round(c_adjust * 100, 1)

        # Blue-yellow axis: adjust yellow
        if abs(db) > 1.0:
            y_adjust = db * 0.007
            components["yellow"] = max(0, min(1, components.get("yellow", 0) + y_adjust))
            adjustments["yellow"] = round(y_adjust * 100, 1)

        return {
            "current_delta_e": de_current,
            "target_lab": (target.L, target.a, target.b),
            "adjustments_pct": adjustments,
            "new_formula": {k: round(v * 100, 1) for k, v in components.items()},
            "substrate": substrate,
            "expected_improvement": round(de_current * 0.4, 2)
        }

    def g7_calibration(self, profile: PressProfile,
                        measured_curves: Dict[str, List[float]]) -> Dict:
        """Calculate G7 NPDC corrections for gray balance."""
        corrections = {}
        for channel in ["C", "M", "Y", "K"]:
            target = profile.dot_gain_curves.get(channel, [])
            measured = measured_curves.get(channel, [])
            if len(target) != len(measured):
                continue
            channel_corrections = []
            for t, m in zip(target, measured):
                diff = t - m
                channel_corrections.append(round(diff, 1))
            corrections[channel] = channel_corrections

        max_correction = max(abs(v) for vals in corrections.values() for v in vals) if corrections else 0

        return {
            "press_id": profile.press_id,
            "substrate": profile.substrate_type,
            "corrections": corrections,
            "max_correction_pct": round(max_correction, 1),
            "calibration_needed": max_correction > 3.0,
            "status": "pass" if max_correction <= 3.0 else "needs_adjustment"
        }
Key insight: Most printers check color at the start of a run and assume it stays consistent. In reality, ink temperature, fountain solution pH, and blanket condition cause drift throughout the run. An AI agent that processes inline spectrophotometer data every 500 sheets catches drift before it produces out-of-spec product, reducing color-related waste by 40-60%.

3. Print Quality Inspection

Inline print inspection has moved from expensive niche technology to an essential component of modern print production. High-speed cameras mounted above the web or sheet capture every impression at resolutions of 200-400 DPI, generating terabytes of image data per shift. The challenge is not capturing images — it is analyzing them fast enough to catch defects before hundreds or thousands of defective sheets pile up. Traditional vision systems use fixed thresholds that generate excessive false positives or miss subtle defects.

An AI inspection agent uses trained classification models to distinguish between different defect types — registration errors, hickeys (ink specks from dried particles), streaks, banding, ghosting, scumming, and substrate defects like wrinkles or holes. Each defect type has different severity implications and different corrective actions. A hickey may only require cleaning the blanket during the next pile change, while banding indicates a more serious issue with the ink train or roller settings that requires immediate attention.

For packaging, the agent also verifies barcodes and QR codes against ISO 15416 (linear) and ISO 15415 (2D) grading standards, measures die-cut registration accuracy, checks label placement on converted products, and classifies substrate defects that originated from the paper mill. This end-to-end inspection chain ensures that defective product never reaches the customer while maintaining detailed quality records for SPC (Statistical Process Control) analysis.

from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
from datetime import datetime
import math

class DefectType(Enum):
    REGISTRATION = "registration"
    HICKEY = "hickey"
    STREAK = "streak"
    BANDING = "banding"
    GHOSTING = "ghosting"
    SCUMMING = "scumming"
    SUBSTRATE = "substrate"
    BARCODE_FAIL = "barcode_fail"
    DIE_CUT_ERROR = "die_cut_error"
    LABEL_MISPLACE = "label_misplacement"

class DefectSeverity(Enum):
    MINOR = "minor"           # cosmetic only
    MAJOR = "major"           # functional impact
    CRITICAL = "critical"     # stop press immediately

@dataclass
class InspectionFrame:
    frame_id: int
    sheet_number: int
    timestamp: datetime
    defects: List[Dict]       # [{"type": DefectType, "x": px, "y": px, "size_mm": float}]
    barcode_grades: List[Dict] # [{"code": "EAN13", "grade": "A", "symbol": "..."}]
    registration_mm: Dict[str, float]  # {"C": 0.05, "M": -0.03, "Y": 0.02, "K": 0.0}
    die_cut_offset_mm: Optional[float]

@dataclass
class BarcodeGrade:
    symbology: str
    data: str
    overall_grade: str      # A, B, C, D, F per ISO 15416/15415
    edge_contrast: float
    modulation: float
    decodability: float
    quiet_zone_ok: bool

class PrintQualityInspectionAgent:
    """AI agent for inline defect detection, barcode verification, and SPC."""

    REG_TOLERANCE_MM = 0.15       # registration tolerance
    HICKEY_SIZE_THRESHOLD_MM = 0.3
    BARCODE_MIN_GRADE = "C"       # minimum acceptable grade
    DIE_CUT_TOLERANCE_MM = 0.5
    DEFECT_RATE_ALARM = 0.02      # 2% defect rate triggers alarm

    def __init__(self, job_spec: Dict):
        self.job_spec = job_spec
        self.inspection_log = []
        self.defect_counts = {}
        self.total_inspected = 0

    def inspect_sheet(self, frame: InspectionFrame) -> Dict:
        """Analyze a single captured sheet for all defect categories."""
        alerts = []
        defects_found = []
        self.total_inspected += 1

        # Registration analysis
        for channel, offset in frame.registration_mm.items():
            if abs(offset) > self.REG_TOLERANCE_MM:
                severity = (DefectSeverity.CRITICAL if abs(offset) > 0.3
                           else DefectSeverity.MAJOR)
                defects_found.append({
                    "type": DefectType.REGISTRATION.value,
                    "channel": channel,
                    "offset_mm": offset,
                    "tolerance_mm": self.REG_TOLERANCE_MM,
                    "severity": severity.value
                })
                alerts.append(f"Registration {channel}: {offset:.2f}mm "
                            f"(limit {self.REG_TOLERANCE_MM}mm)")

        # Defect classification
        for defect in frame.defects:
            dtype = defect["type"]
            size = defect.get("size_mm", 0)
            severity = self._classify_severity(dtype, size)

            self.defect_counts[dtype] = self.defect_counts.get(dtype, 0) + 1
            defects_found.append({
                "type": dtype,
                "position": (defect["x"], defect["y"]),
                "size_mm": size,
                "severity": severity.value,
                "action": self._corrective_action(dtype, severity)
            })

        # Barcode verification (ISO 15416/15415)
        barcode_results = []
        for bc in frame.barcode_grades:
            grade_pass = self._grade_acceptable(bc["grade"])
            barcode_results.append({
                "symbology": bc["code"],
                "grade": bc["grade"],
                "pass": grade_pass,
                "edge_contrast": bc.get("edge_contrast", 0),
                "quiet_zone": bc.get("quiet_zone_ok", True)
            })
            if not grade_pass:
                defects_found.append({
                    "type": DefectType.BARCODE_FAIL.value,
                    "grade": bc["grade"],
                    "severity": DefectSeverity.CRITICAL.value,
                    "action": "Stop press — barcode unreadable at retail"
                })

        # Die-cut registration
        if frame.die_cut_offset_mm is not None:
            if abs(frame.die_cut_offset_mm) > self.DIE_CUT_TOLERANCE_MM:
                defects_found.append({
                    "type": DefectType.DIE_CUT_ERROR.value,
                    "offset_mm": frame.die_cut_offset_mm,
                    "tolerance_mm": self.DIE_CUT_TOLERANCE_MM,
                    "severity": DefectSeverity.MAJOR.value
                })

        # SPC trend analysis
        defect_rate = len(defects_found) / max(self.total_inspected, 1)
        trending = self._detect_trend()

        result = {
            "sheet_number": frame.sheet_number,
            "defects": defects_found,
            "defect_count": len(defects_found),
            "barcode_results": barcode_results,
            "defect_rate": round(defect_rate, 4),
            "trend": trending,
            "stop_press": any(d["severity"] == "critical" for d in defects_found),
            "alerts": alerts
        }

        self.inspection_log.append(result)
        return result

    def generate_spc_report(self, window_sheets: int = 1000) -> Dict:
        """Generate Statistical Process Control report."""
        recent = self.inspection_log[-window_sheets:]
        if not recent:
            return {"error": "No inspection data"}

        defect_rates = [r["defect_count"] / 1 for r in recent]  # per sheet
        avg_rate = sum(defect_rates) / len(defect_rates)
        std_rate = (sum((r - avg_rate)**2 for r in defect_rates) / len(defect_rates))**0.5

        ucl = avg_rate + 3 * std_rate  # upper control limit
        lcl = max(0, avg_rate - 3 * std_rate)
        out_of_control = sum(1 for r in defect_rates if r > ucl)

        # Defect Pareto
        pareto = sorted(self.defect_counts.items(), key=lambda x: -x[1])

        return {
            "sheets_analyzed": len(recent),
            "avg_defect_rate": round(avg_rate, 4),
            "std_deviation": round(std_rate, 4),
            "ucl": round(ucl, 4),
            "lcl": round(lcl, 4),
            "out_of_control_pct": round(out_of_control / len(recent) * 100, 1),
            "defect_pareto": [{"type": t, "count": c} for t, c in pareto[:5]],
            "process_capability": "capable" if avg_rate < self.DEFECT_RATE_ALARM else "investigate"
        }

    def _classify_severity(self, dtype: str, size_mm: float) -> DefectSeverity:
        if dtype in ["banding", "scumming"]:
            return DefectSeverity.CRITICAL
        if dtype == "hickey" and size_mm > 1.0:
            return DefectSeverity.MAJOR
        if dtype == "hickey" and size_mm <= self.HICKEY_SIZE_THRESHOLD_MM:
            return DefectSeverity.MINOR
        if dtype == "streak":
            return DefectSeverity.MAJOR
        if dtype == "substrate":
            return DefectSeverity.MAJOR if size_mm > 2.0 else DefectSeverity.MINOR
        return DefectSeverity.MINOR

    def _corrective_action(self, dtype: str, severity: DefectSeverity) -> str:
        actions = {
            "hickey": "Clean blanket at next pile change",
            "streak": "Check ink roller settings and ductor blade",
            "banding": "STOP: Check gear train and ink oscillation",
            "scumming": "STOP: Check fountain solution pH and plate",
            "ghosting": "Adjust ink form roller pressure",
            "substrate": "Flag for paper mill complaint, log defect"
        }
        return actions.get(dtype, "Investigate and log")

    def _grade_acceptable(self, grade: str) -> bool:
        grade_order = {"A": 4, "B": 3, "C": 2, "D": 1, "F": 0}
        min_order = grade_order.get(self.BARCODE_MIN_GRADE, 2)
        return grade_order.get(grade, 0) >= min_order

    def _detect_trend(self) -> str:
        if len(self.inspection_log) < 20:
            return "insufficient_data"
        recent_10 = [r["defect_count"] for r in self.inspection_log[-10:]]
        prior_10 = [r["defect_count"] for r in self.inspection_log[-20:-10]]
        avg_recent = sum(recent_10) / 10
        avg_prior = sum(prior_10) / 10
        if avg_recent > avg_prior * 1.5:
            return "degrading"
        elif avg_recent < avg_prior * 0.7:
            return "improving"
        return "stable"
Key insight: The real value of AI inspection is not just catching defects — it is correlating defect patterns with root causes. When the agent detects that hickey frequency increases every 3,000 impressions, it knows the blanket washing cycle needs adjustment. When banding appears only on the gear side, it points to a specific gear in the train. This predictive capability reduces unplanned downtime by 25-35%.

4. Production Scheduling & Planning

Production scheduling in a print shop is a constraint satisfaction problem of surprising complexity. Each job has specific press requirements (sheet size, color count, substrate type, coating capability), setup times that vary based on the previous job (a changeover from a 6-color UV job to a 2-color conventional job takes much longer than running two similar 4-color jobs back-to-back), finishing requirements (folding, binding, die-cutting, lamination), and delivery deadlines with penalties for lateness.

An AI scheduling agent optimizes across multiple dimensions simultaneously. It groups jobs by substrate to minimize paper changes, sequences colors to reduce wash-up time, balances press utilization across all available presses, and accounts for finishing bottlenecks that constrain upstream press scheduling. The agent also handles rush orders by evaluating the impact of inserting a new job on all existing commitments and recommending the insertion point that minimizes total disruption.

Material requirements planning is tightly coupled with scheduling. The agent tracks substrate inventory by grade, size, and grain direction, forecasts ink consumption by color based on the job queue, and generates purchase orders when stock falls below minimum levels. It also predicts when plate-making capacity will become a bottleneck and adjusts the prepress release schedule accordingly.

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta

@dataclass
class PrintJob:
    job_id: str
    customer: str
    quantity: int
    colors: int                    # number of ink stations
    sheet_size: str                # "B1", "B2", "SRA3"
    substrate: str                 # "130gsm_coated", "300gsm_board"
    substrate_quantity_kg: float
    coating: Optional[str]         # "UV", "aqueous", None
    finishing: List[str]           # ["fold", "saddle_stitch", "die_cut"]
    deadline: datetime
    priority: int                  # 1=highest
    estimated_impressions: int
    setup_minutes: int = 30

@dataclass
class Press:
    press_id: str
    name: str
    max_colors: int
    max_sheet_size: str
    press_speed_sph: int           # sheets per hour
    has_uv: bool
    has_coater: bool
    current_substrate: Optional[str] = None
    current_colors: int = 0
    available_from: Optional[datetime] = None

@dataclass
class FinishingUnit:
    unit_id: str
    unit_type: str                 # "folder", "binder", "die_cutter", "laminator"
    capacity_sheets_hour: int
    available_from: Optional[datetime] = None

class ProductionSchedulingAgent:
    """AI agent for job scheduling, press allocation, and material planning."""

    SUBSTRATE_CHANGE_MIN = 45      # minutes for substrate change
    COLOR_WASHUP_MIN = 15          # per-station ink washup
    SIMILAR_JOB_SETUP_MIN = 15     # reduced setup for similar jobs
    RUSH_PREMIUM_FACTOR = 1.5

    def __init__(self, presses: List[Press], finishing: List[FinishingUnit]):
        self.presses = {p.press_id: p for p in presses}
        self.finishing = {f.unit_id: f for f in finishing}
        self.schedule = []
        self.material_forecast = {}

    def schedule_jobs(self, jobs: List[PrintJob]) -> Dict:
        """Optimize job sequence across presses minimizing setup and meeting deadlines."""
        sorted_jobs = sorted(jobs, key=lambda j: (j.priority, j.deadline))
        scheduled = []
        unscheduled = []
        total_setup_minutes = 0

        for job in sorted_jobs:
            best_assignment = None
            best_score = float("inf")

            for pid, press in self.presses.items():
                # Capability check
                if job.colors > press.max_colors:
                    continue
                if job.coating == "UV" and not press.has_uv:
                    continue
                if not self._sheet_size_fits(job.sheet_size, press.max_sheet_size):
                    continue

                # Calculate setup time based on previous job
                setup = self._calculate_setup(press, job)
                start_time = press.available_from or datetime.now()
                run_hours = job.estimated_impressions / press.press_speed_sph
                end_time = start_time + timedelta(minutes=setup, hours=run_hours)

                # Check finishing availability
                finish_end = self._schedule_finishing(job, end_time)

                # Score: weighted sum of lateness, setup, and utilization
                lateness = max(0, (finish_end - job.deadline).total_seconds() / 3600)
                score = (lateness * 100        # heavy penalty for late
                         + setup * 2            # setup time cost
                         + run_hours * 0.5)     # prefer faster presses

                if score < best_score:
                    best_score = score
                    best_assignment = {
                        "press_id": pid,
                        "setup_minutes": setup,
                        "start_time": start_time,
                        "print_end": end_time,
                        "finish_end": finish_end,
                        "run_hours": round(run_hours, 1)
                    }

            if best_assignment:
                press = self.presses[best_assignment["press_id"]]
                press.available_from = best_assignment["print_end"]
                press.current_substrate = job.substrate
                press.current_colors = job.colors
                total_setup_minutes += best_assignment["setup_minutes"]

                on_time = best_assignment["finish_end"] <= job.deadline
                scheduled.append({
                    "job_id": job.job_id,
                    "customer": job.customer,
                    "press": best_assignment["press_id"],
                    "start": best_assignment["start_time"].isoformat(),
                    "print_end": best_assignment["print_end"].isoformat(),
                    "delivery_ready": best_assignment["finish_end"].isoformat(),
                    "deadline": job.deadline.isoformat(),
                    "on_time": on_time,
                    "setup_min": best_assignment["setup_minutes"],
                    "run_hours": best_assignment["run_hours"]
                })
            else:
                unscheduled.append({
                    "job_id": job.job_id,
                    "reason": "No press meets requirements"
                })

        # Calculate utilization
        utilization = self._calculate_utilization(scheduled)

        return {
            "scheduled_jobs": len(scheduled),
            "unscheduled_jobs": len(unscheduled),
            "total_setup_minutes": total_setup_minutes,
            "on_time_pct": round(
                sum(1 for s in scheduled if s["on_time"]) / max(len(scheduled), 1) * 100, 1
            ),
            "press_utilization": utilization,
            "schedule": scheduled,
            "unscheduled": unscheduled
        }

    def forecast_materials(self, jobs: List[PrintJob]) -> Dict:
        """Forecast substrate and ink requirements for upcoming jobs."""
        substrate_needs = {}
        ink_forecast = {"C": 0, "M": 0, "Y": 0, "K": 0}

        for job in jobs:
            # Substrate with waste allowance
            waste_factor = 1.05 + (500 / max(job.quantity, 1))  # higher waste % for short runs
            total_kg = job.substrate_quantity_kg * waste_factor
            substrate_needs.setdefault(job.substrate, 0)
            substrate_needs[job.substrate] += total_kg

            # Ink estimate: ~1.5g per impression per color station
            ink_per_impression = 0.0015  # kg
            total_ink = job.estimated_impressions * ink_per_impression
            for c in ["C", "M", "Y", "K"][:job.colors]:
                ink_forecast[c] += total_ink * 0.25  # even distribution estimate

        return {
            "substrate_kg": {k: round(v, 1) for k, v in substrate_needs.items()},
            "ink_kg": {k: round(v, 2) for k, v in ink_forecast.items()},
            "total_substrate_kg": round(sum(substrate_needs.values()), 1),
            "total_ink_kg": round(sum(ink_forecast.values()), 2),
            "purchase_alerts": self._check_stock_levels(substrate_needs, ink_forecast)
        }

    def _calculate_setup(self, press: Press, job: PrintJob) -> int:
        setup = job.setup_minutes
        if press.current_substrate and press.current_substrate != job.substrate:
            setup += self.SUBSTRATE_CHANGE_MIN
        if press.current_colors > 0:
            color_diff = abs(press.current_colors - job.colors)
            setup += color_diff * self.COLOR_WASHUP_MIN
        if (press.current_substrate == job.substrate
                and press.current_colors == job.colors):
            setup = self.SIMILAR_JOB_SETUP_MIN
        return setup

    def _schedule_finishing(self, job: PrintJob, print_end: datetime) -> datetime:
        end = print_end
        for step in job.finishing:
            unit = self._find_finishing_unit(step)
            if unit:
                processing_hours = job.quantity / max(unit.capacity_sheets_hour, 1)
                start = max(end, unit.available_from or datetime.now())
                end = start + timedelta(hours=processing_hours)
                unit.available_from = end
        return end

    def _find_finishing_unit(self, step_type: str) -> Optional[FinishingUnit]:
        for uid, unit in self.finishing.items():
            if unit.unit_type == step_type:
                return unit
        return None

    def _sheet_size_fits(self, job_size: str, press_size: str) -> bool:
        size_order = {"SRA3": 1, "B2": 2, "B1": 3, "B0": 4}
        return size_order.get(job_size, 0) <= size_order.get(press_size, 0)

    def _calculate_utilization(self, scheduled: List[Dict]) -> Dict:
        press_hours = {}
        for s in scheduled:
            pid = s["press"]
            press_hours.setdefault(pid, 0)
            press_hours[pid] += s["run_hours"] + s["setup_min"] / 60
        shift_hours = 8  # per day
        return {pid: round(h / shift_hours * 100, 1) for pid, h in press_hours.items()}

    def _check_stock_levels(self, substrate: Dict, ink: Dict) -> List[str]:
        alerts = []
        # Placeholder — in production this checks against actual inventory
        for sub, qty in substrate.items():
            if qty > 500:
                alerts.append(f"Order {sub}: {qty:.0f}kg needed, verify stock")
        for color, qty in ink.items():
            if qty > 5:
                alerts.append(f"Order {color} ink: {qty:.1f}kg forecasted")
        return alerts
Key insight: The biggest scheduling gain comes from substrate grouping. Running all 130gsm coated jobs consecutively eliminates substrate changeovers that cost 45 minutes each. For a shop running 15 jobs per shift, intelligent sequencing can recover 2-3 hours of productive press time daily — equivalent to adding a fourth press without buying one.

5. Waste Reduction & Sustainability

Makeready waste is the largest controllable cost in commercial printing. Every time a press starts a new job, the first 200-500 sheets (offset) or 50-150 feet of web are consumed getting color density, registration, and coating weight dialed in. An AI agent that pre-sets ink keys based on plate coverage data, predicts register positions from previous job data, and learns from historical makeready patterns can reduce waste sheets by 30-50% per makeready. On a shop running 20 makereadies per day, that is 4,000-10,000 fewer waste sheets daily.

Beyond makeready, the agent tracks substrate waste throughout the entire production chain. Trim waste from cutting sheets to size, spoilage from defective impressions caught by inspection, finishing waste from mis-folds or binding errors, and remnant roll waste on web presses all contribute to total material loss. The agent calculates optimal cut-up plans to maximize sheet yield from parent rolls or pallets, and flags when a different parent size would reduce trim waste below a threshold. It also monitors makeready trends per press operator, providing data for targeted training.

Sustainability reporting is increasingly required by brand owners and regulatory frameworks. The agent tracks energy consumption per job (press motor kWh, dryer energy for UV or heatset), calculates the carbon footprint using emission factors for each substrate and ink type, monitors recycled content percentages, and generates reports that meet GHG Protocol Scope 1-3 requirements. This data becomes a competitive advantage when responding to RFPs from sustainability-conscious brands.

from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime

@dataclass
class MakereadyRecord:
    press_id: str
    job_id: str
    operator: str
    waste_sheets: int
    target_sheets: int            # planned makeready allowance
    ink_key_preset: bool          # was CIP3/CIP4 data used?
    register_predicted: bool
    time_minutes: float
    timestamp: datetime

@dataclass
class SubstrateUsage:
    job_id: str
    substrate_type: str
    ordered_kg: float
    consumed_kg: float
    good_output_kg: float
    trim_waste_kg: float
    spoilage_kg: float
    remnant_kg: float

@dataclass
class EnergyReading:
    press_id: str
    job_id: str
    motor_kwh: float
    dryer_kwh: float
    compressor_kwh: float
    duration_hours: float

class WasteReductionAgent:
    """AI agent for makeready optimization, waste tracking, and sustainability."""

    MAKEREADY_TARGET_OFFSET = 250     # target sheets for offset makeready
    MAKEREADY_TARGET_DIGITAL = 20     # target sheets for digital
    CO2_PER_KWH = 0.4                # kg CO2 per kWh (grid average)
    CO2_PER_KG_PAPER = 1.1           # kg CO2 per kg virgin paper
    CO2_PER_KG_RECYCLED = 0.7        # kg CO2 per kg recycled paper

    def __init__(self):
        self.makeready_history = []
        self.waste_log = []
        self.energy_log = []

    def preset_ink_keys(self, plate_coverage: Dict[str, List[float]],
                         press_profile: Dict) -> Dict:
        """Calculate ink key presets from CIP3/CIP4 plate coverage data."""
        presets = {}
        for channel, zones in plate_coverage.items():
            key_openings = []
            density_target = press_profile.get("density_targets", {}).get(channel, 1.4)

            for zone_coverage in zones:
                # Map coverage % to key opening %
                # Non-linear relationship: low coverage needs proportionally less ink
                if zone_coverage < 5:
                    opening = zone_coverage * 0.5
                elif zone_coverage < 30:
                    opening = zone_coverage * 0.8
                elif zone_coverage < 70:
                    opening = zone_coverage * 1.0
                else:
                    opening = min(zone_coverage * 1.1, 100)

                # Adjust for press-specific dot gain
                dot_gain = press_profile.get("dot_gain_50", {}).get(channel, 18)
                gain_factor = 1 - (dot_gain - 15) * 0.005
                opening *= gain_factor

                key_openings.append(round(max(0, min(100, opening)), 1))

            presets[channel] = key_openings

        return {
            "ink_key_presets": presets,
            "zones_per_channel": len(next(iter(plate_coverage.values()), [])),
            "expected_makeready_reduction": "30-50% fewer waste sheets",
            "ductor_setting": self._calculate_ductor(plate_coverage)
        }

    def analyze_makeready(self, records: List[MakereadyRecord]) -> Dict:
        """Analyze makeready performance and identify improvement opportunities."""
        self.makeready_history.extend(records)

        total_waste = sum(r.waste_sheets for r in records)
        total_target = sum(r.target_sheets for r in records)
        avg_waste = total_waste / max(len(records), 1)
        avg_time = sum(r.time_minutes for r in records) / max(len(records), 1)

        # Compare preset vs non-preset
        preset_records = [r for r in records if r.ink_key_preset]
        no_preset = [r for r in records if not r.ink_key_preset]

        preset_avg = (sum(r.waste_sheets for r in preset_records) /
                      max(len(preset_records), 1)) if preset_records else 0
        no_preset_avg = (sum(r.waste_sheets for r in no_preset) /
                         max(len(no_preset), 1)) if no_preset else 0

        # Operator comparison
        by_operator = {}
        for r in records:
            by_operator.setdefault(r.operator, []).append(r.waste_sheets)
        operator_stats = {
            op: {"avg_waste": round(sum(ws)/len(ws), 0), "makereadies": len(ws)}
            for op, ws in by_operator.items()
        }

        return {
            "total_makereadies": len(records),
            "avg_waste_sheets": round(avg_waste, 0),
            "avg_time_minutes": round(avg_time, 1),
            "waste_vs_target_pct": round(total_waste / max(total_target, 1) * 100, 1),
            "preset_avg_waste": round(preset_avg, 0),
            "no_preset_avg_waste": round(no_preset_avg, 0),
            "preset_savings_pct": round(
                (1 - preset_avg / max(no_preset_avg, 1)) * 100, 1
            ) if no_preset_avg > 0 else 0,
            "operator_performance": operator_stats,
            "recommendation": self._makeready_recommendation(avg_waste, preset_avg)
        }

    def track_substrate_waste(self, usages: List[SubstrateUsage]) -> Dict:
        """Track and analyze substrate waste across all sources."""
        self.waste_log.extend(usages)

        totals = {
            "consumed_kg": sum(u.consumed_kg for u in usages),
            "good_output_kg": sum(u.good_output_kg for u in usages),
            "trim_waste_kg": sum(u.trim_waste_kg for u in usages),
            "spoilage_kg": sum(u.spoilage_kg for u in usages),
            "remnant_kg": sum(u.remnant_kg for u in usages)
        }
        total_waste = totals["trim_waste_kg"] + totals["spoilage_kg"] + totals["remnant_kg"]
        waste_pct = total_waste / max(totals["consumed_kg"], 1) * 100

        # By substrate type
        by_substrate = {}
        for u in usages:
            by_substrate.setdefault(u.substrate_type, {"consumed": 0, "waste": 0})
            by_substrate[u.substrate_type]["consumed"] += u.consumed_kg
            by_substrate[u.substrate_type]["waste"] += (
                u.trim_waste_kg + u.spoilage_kg + u.remnant_kg
            )

        return {
            "total_consumed_kg": round(totals["consumed_kg"], 1),
            "total_waste_kg": round(total_waste, 1),
            "waste_pct": round(waste_pct, 1),
            "waste_breakdown": {
                "trim": round(totals["trim_waste_kg"], 1),
                "spoilage": round(totals["spoilage_kg"], 1),
                "remnant": round(totals["remnant_kg"], 1)
            },
            "by_substrate": {k: {"waste_pct": round(v["waste"]/max(v["consumed"],1)*100, 1)}
                            for k, v in by_substrate.items()},
            "cost_of_waste_usd": round(total_waste * 1.2, 0)  # avg $1.20/kg
        }

    def calculate_carbon_footprint(self, substrate_usage: List[SubstrateUsage],
                                     energy: List[EnergyReading],
                                     recycled_pct: float = 0.3) -> Dict:
        """Calculate carbon footprint per job and total."""
        # Substrate emissions
        total_paper_kg = sum(u.consumed_kg for u in substrate_usage)
        virgin_kg = total_paper_kg * (1 - recycled_pct)
        recycled_kg = total_paper_kg * recycled_pct
        paper_co2 = virgin_kg * self.CO2_PER_KG_PAPER + recycled_kg * self.CO2_PER_KG_RECYCLED

        # Energy emissions
        total_kwh = sum(e.motor_kwh + e.dryer_kwh + e.compressor_kwh for e in energy)
        energy_co2 = total_kwh * self.CO2_PER_KWH

        total_co2 = paper_co2 + energy_co2

        return {
            "total_co2_kg": round(total_co2, 1),
            "paper_co2_kg": round(paper_co2, 1),
            "energy_co2_kg": round(energy_co2, 1),
            "total_energy_kwh": round(total_kwh, 1),
            "recycled_content_pct": round(recycled_pct * 100, 1),
            "co2_per_1000_sheets": round(total_co2 / max(total_paper_kg / 0.1, 1), 2),
            "reduction_potential": self._sustainability_recommendations(
                recycled_pct, total_kwh, total_paper_kg
            )
        }

    def _calculate_ductor(self, coverage: Dict) -> Dict:
        avg_coverage = {}
        for ch, zones in coverage.items():
            avg_coverage[ch] = round(sum(zones) / max(len(zones), 1), 1)
        return avg_coverage

    def _makeready_recommendation(self, avg_waste: float, preset_avg: float) -> str:
        if avg_waste > 400:
            return "HIGH WASTE: Enforce CIP3 presets on all jobs and review operator training"
        elif avg_waste > 250:
            return "MODERATE: CIP3 presets cutting waste — expand to all presses"
        return "GOOD: Makeready waste within targets"

    def _sustainability_recommendations(self, recycled_pct: float,
                                         kwh: float, paper_kg: float) -> List[str]:
        recs = []
        if recycled_pct < 0.5:
            recs.append("Increase recycled content to 50%+ to reduce paper CO2 by 20%")
        if kwh / max(paper_kg, 1) > 2.0:
            recs.append("Energy intensity high — audit press idle time and dryer settings")
        recs.append("Consider carbon offset program for remaining emissions")
        return recs
Key insight: CIP3/CIP4 ink key presetting from plate coverage data is the single highest-ROI waste reduction investment. Shops that implement it consistently see makeready waste drop from 350-500 sheets to 150-250 sheets per job. At 20 makereadies per day and $0.08 per sheet in substrate cost, that is $24,000-48,000 in annual savings per press — often paying for the entire CIP3 link infrastructure in under 6 months.

6. ROI Analysis: Commercial Printer (3 Presses, $15M Revenue)

The critical question for any commercial printer evaluating AI agents is whether the investment pencils out. Below is a detailed breakdown for a mid-size commercial printer operating 3 sheetfed offset presses (B1 format), handling approximately 5,000 jobs per year with a mix of commercial, packaging, and publication work.

Assumptions

Category Improvement Annual Savings
Prepress Automation 60% fewer manual preflight hours, 85% auto-fix rate $180,000 - $320,000
Color Quality (Fewer Reprints) Reprints drop from 3% to 0.8% of jobs $280,000 - $520,000
Makeready Waste Reduction 400 sheets/job down to 200 sheets/job $165,000 - $250,000
Scheduling Optimization 15% less setup time through job grouping $350,000 - $580,000
Inline Inspection (Spoilage Reduction) Spoilage from 4.5% to 2.0% $375,000 - $560,000
Substrate Optimization Gang-run utilization from 70% to 88% $220,000 - $380,000
Imposition & Trim Optimization 3-5% trim waste reduction $120,000 - $210,000
Energy & Sustainability Reporting Idle time reduction, brand RFP wins $110,000 - $380,000
Total Annual Savings $1,800,000 - $3,200,000

Implementation Cost vs. Return

from dataclasses import dataclass

@dataclass
class PrintShopROIModel:
    """Calculate ROI for AI agent deployment in a commercial print shop."""

    num_presses: int = 3
    annual_revenue: float = 15_000_000
    jobs_per_year: int = 5000
    makereadies_per_day: int = 20
    working_days: int = 260
    current_waste_sheets: int = 400
    improved_waste_sheets: int = 200
    sheet_cost_usd: float = 0.10
    press_rate_per_hour: float = 450
    current_reprint_pct: float = 0.03
    improved_reprint_pct: float = 0.008
    avg_reprint_cost: float = 2500
    current_spoilage_pct: float = 0.045
    improved_spoilage_pct: float = 0.020
    annual_substrate_spend: float = 4_500_000

    def calculate_prepress_savings(self) -> dict:
        """Prepress operator time savings from automated preflight."""
        manual_hours_per_job = 0.4
        automated_hours_per_job = 0.15
        prepress_rate = 45  # USD/hour
        saved_hours = (manual_hours_per_job - automated_hours_per_job) * self.jobs_per_year
        savings = saved_hours * prepress_rate
        return {
            "hours_saved": round(saved_hours, 0),
            "cost_savings_usd": round(savings, 0),
            "operator_capacity_freed_pct": round(
                (manual_hours_per_job - automated_hours_per_job) / manual_hours_per_job * 100, 0
            )
        }

    def calculate_color_savings(self) -> dict:
        """Savings from reduced reprints due to better color management."""
        current_reprints = self.jobs_per_year * self.current_reprint_pct
        improved_reprints = self.jobs_per_year * self.improved_reprint_pct
        avoided_reprints = current_reprints - improved_reprints
        savings = avoided_reprints * self.avg_reprint_cost
        return {
            "current_reprints_year": round(current_reprints, 0),
            "improved_reprints_year": round(improved_reprints, 0),
            "avoided_reprints": round(avoided_reprints, 0),
            "savings_usd": round(savings, 0)
        }

    def calculate_makeready_savings(self) -> dict:
        """Savings from reduced makeready waste sheets."""
        annual_makereadies = self.makereadies_per_day * self.working_days
        sheets_saved = (self.current_waste_sheets - self.improved_waste_sheets) * annual_makereadies
        savings = sheets_saved * self.sheet_cost_usd
        return {
            "annual_makereadies": annual_makereadies,
            "sheets_saved": sheets_saved,
            "savings_usd": round(savings, 0)
        }

    def calculate_scheduling_savings(self) -> dict:
        """Savings from reduced setup time through intelligent sequencing."""
        avg_setup_min = 35
        reduced_setup_min = 25
        saved_min_per_makeready = avg_setup_min - reduced_setup_min
        annual_makereadies = self.makereadies_per_day * self.working_days
        saved_hours = (saved_min_per_makeready * annual_makereadies) / 60
        savings = saved_hours * self.press_rate_per_hour
        return {
            "setup_time_reduced_min": saved_min_per_makeready,
            "annual_hours_recovered": round(saved_hours, 0),
            "savings_usd": round(savings, 0),
            "equivalent_extra_production_days": round(saved_hours / 8, 0)
        }

    def calculate_spoilage_savings(self) -> dict:
        """Savings from inline inspection reducing spoilage rate."""
        current_spoilage = self.annual_substrate_spend * self.current_spoilage_pct
        improved_spoilage = self.annual_substrate_spend * self.improved_spoilage_pct
        savings = current_spoilage - improved_spoilage
        return {
            "current_spoilage_usd": round(current_spoilage, 0),
            "improved_spoilage_usd": round(improved_spoilage, 0),
            "savings_usd": round(savings, 0)
        }

    def full_roi_analysis(self) -> dict:
        prepress = self.calculate_prepress_savings()
        color = self.calculate_color_savings()
        makeready = self.calculate_makeready_savings()
        scheduling = self.calculate_scheduling_savings()
        spoilage = self.calculate_spoilage_savings()

        total_annual_benefit = (
            prepress["cost_savings_usd"]
            + color["savings_usd"]
            + makeready["savings_usd"]
            + scheduling["savings_usd"]
            + spoilage["savings_usd"]
        )

        # Implementation costs
        setup_cost = 180_000         # integration, training, calibration
        annual_license = 96_000      # software licenses
        annual_support = 36_000      # support and maintenance
        total_annual_cost = annual_license + annual_support
        total_year1_cost = setup_cost + total_annual_cost

        roi_year1 = ((total_annual_benefit - total_year1_cost) / total_year1_cost) * 100
        roi_year2 = ((total_annual_benefit - total_annual_cost) / total_annual_cost) * 100
        payback_months = (total_year1_cost / max(total_annual_benefit, 1)) * 12

        return {
            "shop_profile": {
                "presses": self.num_presses,
                "revenue": self.annual_revenue,
                "jobs_per_year": self.jobs_per_year
            },
            "annual_benefits": {
                "prepress_automation": prepress["cost_savings_usd"],
                "color_quality": color["savings_usd"],
                "makeready_reduction": makeready["savings_usd"],
                "scheduling_optimization": scheduling["savings_usd"],
                "spoilage_reduction": spoilage["savings_usd"],
                "total": round(total_annual_benefit, 0)
            },
            "costs": {
                "year_1_total": total_year1_cost,
                "annual_recurring": total_annual_cost
            },
            "returns": {
                "roi_year_1_pct": round(roi_year1, 0),
                "roi_year_2_pct": round(roi_year2, 0),
                "payback_months": round(payback_months, 1),
                "net_benefit_year_1": round(total_annual_benefit - total_year1_cost, 0)
            }
        }

# Run the analysis
model = PrintShopROIModel(num_presses=3, annual_revenue=15_000_000)
results = model.full_roi_analysis()

print(f"Shop: {results['shop_profile']['presses']} presses, ${results['shop_profile']['revenue']:,.0f} revenue")
print(f"Total Annual Benefits: ${results['annual_benefits']['total']:,.0f}")
print(f"Year 1 Cost: ${results['costs']['year_1_total']:,.0f}")
print(f"Year 1 ROI: {results['returns']['roi_year_1_pct']}%")
print(f"Year 2 ROI: {results['returns']['roi_year_2_pct']}%")
print(f"Payback Period: {results['returns']['payback_months']} months")
Bottom line: A 3-press commercial printer investing $312,000 in year one (setup + annual costs) can expect $1.8-3.2M in annual benefits, yielding a payback period under 3 months and year-2 ROI exceeding 1,400%. Even using conservative estimates at half the projected savings, the investment pays for itself within the first two quarters. The largest single contributor is scheduling optimization, which recovers 500+ hours of productive press time annually.

Getting Started: Implementation Roadmap

Deploying AI agents across a print operation works best as a phased rollout, starting with the highest-impact, lowest-disruption module:

  1. Month 1-2: Prepress automation. Deploy automated preflight and PDF correction on all incoming files. Measure auto-fix rates and operator time savings. Connect CIP3/CIP4 data to press consoles for ink key presetting.
  2. Month 3-4: Color management. Install or connect spectrophotometers on each press. Build press-specific ICC profiles. Implement Delta E tracking and G7 calibration workflows.
  3. Month 5-6: Production scheduling. Integrate the scheduling agent with your MIS/ERP system. Start with substrate grouping and setup time optimization. Measure on-time delivery improvement.
  4. Month 7-9: Inline inspection. Deploy camera systems and connect the defect detection agent. Establish SPC baselines. Train operators on the alert and corrective action workflows.
  5. Month 10-12: Waste tracking and sustainability. Implement comprehensive waste monitoring. Set up carbon footprint reporting. Use the data to pursue sustainability-conscious brand accounts.

The key to adoption is treating each agent as a decision support tool that makes press operators and prepress technicians more effective, not as a replacement. The operator still approves color, the scheduler still handles rush jobs, and the prepress tech still handles edge cases. The AI agent handles the repetitive analysis and optimization, freeing skilled workers to focus on the exceptions that require human judgment.

Build Your Own AI Agents for Print & Packaging

Get step-by-step templates, workflow blueprints, and security checklists for deploying AI agents in production environments.

Get the Playbook — $19