AI 自动化运维:从 Runbook 编排到智能决策的运维 Agent 架构
AI 自动化运维从 Runbook 编排到智能决策的运维 Agent 架构一、Runbook 的困境当标准操作手册遇上非标准故障运维团队维护了 200 多个 Runbook覆盖了从服务重启到数据库切换的各种标准操作流程。但现实是60% 的故障不在 Runbook 覆盖范围内需要人工分析判断30% 的故障虽然匹配了 Runbook但执行过程中遇到了 Runbook 未预料的异常需要人工介入只有 10% 的故障能完全按照 Runbook 自动化执行。更深层的问题是Runbook 是静态的——它描述的是已知故障的标准解法而运维的核心挑战恰恰是处理未知故障。AI 自动化运维的目标不是替代 Runbook而是在 Runbook 的基础上增加智能决策层——当故障匹配到 Runbook 时自动执行当故障超出 Runbook 范围时通过 Agent 的推理能力给出处置建议由人工确认后执行。本文将从运维 Agent 的架构设计出发深入分析 AI 自动化运维的工程实现。二、运维 Agent 架构从规则执行到智能推理的分层设计运维 Agent 的核心架构是感知→推理→执行的闭环。感知层采集系统状态推理层基于状态做出决策执行层将决策转化为操作。关键设计原则是人在回路——高风险操作必须经过人工确认低风险操作可以自动执行。flowchart TD subgraph 感知层[感知层多源数据采集] S1[指标数据br/Prometheus] S2[日志数据br/ELK] S3[告警数据br/Alertmanager] S4[变更数据br/Git / CMDB] S5[拓扑数据br/服务网格] end subgraph 推理层[推理层智能决策引擎] R1[故障分类器br/匹配已知故障模式] R2[Runbook 匹配br/查找对应的自动化流程] R3[LLM 推理br/处理未知故障模式] R4[风险评估br/评估操作的影响和风险] end subgraph 执行层[执行层分级操作执行] E1[L0: 自动执行br/低风险操作br/服务重启/日志轮转] E2[L1: 人工确认br/中风险操作br/扩缩容/配置变更] E3[L2: 人工执行br/高风险操作br/数据库切换/版本回滚] end 感知层 -- 推理层 推理层 -- 执行层 执行层 -- |执行结果反馈| 感知层 R1 -- |匹配成功| R2 R1 -- |匹配失败| R3 R2 -- R4 R3 -- R4 R4 -- |风险低| E1 R4 -- |风险中| E2 R4 -- |风险高| E3感知层是 Agent 的眼睛。多源数据采集确保 Agent 获得完整的系统状态视图。单一数据源如仅依赖告警会导致推理不完整——告警只告诉你出了什么问题但指标和日志才能告诉你为什么出了问题。推理层是 Agent 的大脑。故障分类器将当前故障与已知模式匹配匹配成功则执行对应 Runbook匹配失败则交由 LLM 推理基于多源数据生成处置建议。风险评估是推理的关键环节——每个操作都必须评估其影响范围和风险等级决定执行方式。执行层是 Agent 的手。分级执行确保高风险操作不会误执行。L0 级操作如重启一个无状态服务可以自动执行L1 级操作如扩缩容需要人工确认L2 级操作如数据库主从切换需要人工执行。三、生产级运维 Agent 实现#!/usr/bin/env python3 运维 Agent 核心引擎 感知 → 推理 → 执行 的闭环实现 import json import time import hashlib from dataclasses import dataclass, field from typing import Optional from enum import Enum from collections import defaultdict from datetime import datetime class RiskLevel(Enum): 操作风险等级 L0 auto # 自动执行 L1 confirm # 人工确认 L2 manual # 人工执行 class FaultCategory(Enum): 故障分类 RESOURCE_EXHAUSTION resource_exhaustion # 资源耗尽 SERVICE_UNAVAILABLE service_unavailable # 服务不可用 NETWORK_PARTITION network_partition # 网络分区 CONFIGURATION_ERROR configuration_error # 配置错误 DEPENDENCY_FAILURE dependency_failure # 依赖故障 UNKNOWN unknown # 未知故障 dataclass class SystemState: 系统状态快照感知层的输出 alerts: list[dict] # 活跃告警 metrics: dict[str, float] # 关键指标 topology: dict[str, list[str]] # 服务拓扑 recent_changes: list[dict] # 近期变更记录 timestamp: datetime field(default_factorydatetime.now) dataclass class Diagnosis: 诊断结果推理层的输出 fault_category: FaultCategory root_service: str affected_services: list[str] confidence: float # 诊断置信度 0-1 evidence: list[str] # 支撑诊断的证据 recommended_actions: list[dict] # 推荐操作列表 runbook_id: Optional[str] None # 匹配的 Runbook ID dataclass class Action: 操作定义 action_id: str action_type: str # restart / scale / config_change / failover / drain target: str # 目标服务或节点 parameters: dict # 操作参数 risk_level: RiskLevel description: str estimated_impact: str # 预估影响描述 rollback_command: str # 回滚命令 class FaultClassifier: 故障分类器 基于规则 指标模式匹配将故障归入已知类别 def classify(self, state: SystemState) - FaultCategory: 根据系统状态判断故障类别 优先级资源耗尽 依赖故障 服务不可用 网络分区 配置错误 metrics state.metrics # 资源耗尽CPU/内存/磁盘超过阈值 if (metrics.get(cpu_usage_percent, 0) 90 or metrics.get(memory_usage_percent, 0) 90 or metrics.get(disk_usage_percent, 0) 85): return FaultCategory.RESOURCE_EXHAUSTION # 依赖故障上游服务异常导致下游连锁反应 alert_services [a.get(service, ) for a in state.alerts] if self._is_cascade_failure(alert_services, state.topology): return FaultCategory.DEPENDENCY_FAILURE # 服务不可用单个服务异常 critical_alerts [ a for a in state.alerts if a.get(severity) critical ] if critical_alerts: return FaultCategory.SERVICE_UNAVAILABLE # 网络分区连接超时或丢包 if (metrics.get(packet_loss_percent, 0) 1 or metrics.get(connection_timeout_rate, 0) 0.05): return FaultCategory.NETWORK_PARTITION # 近期有变更可能是配置错误 if state.recent_changes: return FaultCategory.CONFIGURATION_ERROR return FaultCategory.UNKNOWN def _is_cascade_failure( self, alert_services: list[str], topology: dict[str, list[str]] ) - bool: 判断告警是否呈级联模式上游故障影响下游 if len(alert_services) 2: return False # 检查告警服务之间是否存在依赖关系 for svc in alert_services: deps topology.get(svc, []) for dep in deps: if dep in alert_services: return True return False class RunbookMatcher: Runbook 匹配器 根据故障类别和受影响服务查找对应的自动化流程 def __init__(self): # Runbook 注册表category service → runbook_id self._runbooks: dict[str, str] {} def register(self, category: FaultCategory, service: str, runbook_id: str): 注册 Runbook key f{category.value}:{service} self._runbooks[key] runbook_id def match( self, category: FaultCategory, service: str ) - Optional[str]: 查找匹配的 Runbook # 精确匹配类别 服务 key f{category.value}:{service} if key in self._runbooks: return self._runbooks[key] # 模糊匹配类别 通配符 wildcard_key f{category.value}:* if wildcard_key in self._runbooks: return self._runbooks[wildcard_key] return None class RiskAssessor: 操作风险评估器 根据操作类型和目标服务评估风险等级 # 服务关键度决定操作的风险等级 SERVICE_CRITICALITY { mysql-primary: critical, redis-cluster: high, kafka: high, api-gateway: medium, user-service: medium, order-service: medium, payment-service: critical, } # 操作类型的基础风险 ACTION_BASE_RISK { restart: RiskLevel.L0, scale: RiskLevel.L1, config_change: RiskLevel.L1, failover: RiskLevel.L2, drain: RiskLevel.L1, } def assess(self, action: Action) - RiskLevel: 评估操作的风险等级 规则基础风险 服务关键度修正 base_risk self.ACTION_BASE_RISK.get( action.action_type, RiskLevel.L2 ) # 服务关键度修正关键服务的操作升级一个风险等级 criticality self.SERVICE_CRITICALITY.get( action.target, medium ) if criticality critical: # 关键服务L0 → L1, L1 → L2, L2 保持 if base_risk RiskLevel.L0: return RiskLevel.L1 if base_risk RiskLevel.L1: return RiskLevel.L2 return base_risk class OperationsAgent: 运维 Agent 核心引擎 串联感知、推理、执行三层 def __init__(self): self.classifier FaultClassifier() self.runbook_matcher RunbookMatcher() self.risk_assessor RiskAssessor() self._setup_runbooks() def _setup_runbooks(self): 注册标准 Runbook # 资源耗尽类 Runbook self.runbook_matcher.register( FaultCategory.RESOURCE_EXHAUSTION, *, RB-RES-001 ) # 服务不可用类 Runbook self.runbook_matcher.register( FaultCategory.SERVICE_UNAVAILABLE, api-gateway, RB-SVC-API-001 ) self.runbook_matcher.register( FaultCategory.SERVICE_UNAVAILABLE, user-service, RB-SVC-USER-001 ) # 依赖故障类 Runbook self.runbook_matcher.register( FaultCategory.DEPENDENCY_FAILURE, *, RB-DEP-001 ) def diagnose(self, state: SystemState) - Diagnosis: 执行诊断感知 → 推理 返回诊断结果包含故障类别、根因和推荐操作 # 第一步故障分类 category self.classifier.classify(state) # 第二步定位根因服务 root_service self._locate_root_service(state, category) # 第三步确定影响范围 affected self._find_affected_services(root_service, state.topology) # 第四步匹配 Runbook runbook_id self.runbook_matcher.match(category, root_service) # 第五步生成推荐操作 actions self._generate_actions( category, root_service, affected, state ) # 第六步收集诊断证据 evidence self._collect_evidence(state, category, root_service) # 第七步计算置信度 confidence self._compute_confidence(category, runbook_id, evidence) return Diagnosis( fault_categorycategory, root_serviceroot_service, affected_servicesaffected, confidenceconfidence, evidenceevidence, recommended_actionsactions, runbook_idrunbook_id, ) def execute_action(self, action: Action) - dict: 执行操作根据风险等级决定执行方式 # 重新评估风险等级 assessed_risk self.risk_assessor.assess(action) action.risk_level assessed_risk if assessed_risk RiskLevel.L0: # L0自动执行 return self._auto_execute(action) elif assessed_risk RiskLevel.L1: # L1需要人工确认 return { status: pending_confirmation, action: action, message: ( f操作 [{action.description}] 风险等级 L1 f需要人工确认后执行 ), } else: # L2需要人工执行 return { status: manual_required, action: action, message: ( f操作 [{action.description}] 风险等级 L2 f需要人工执行。回滚命令: {action.rollback_command} ), } def _locate_root_service( self, state: SystemState, category: FaultCategory ) - str: 定位根因服务 if category FaultCategory.RESOURCE_EXHAUSTION: # 资源耗尽找到资源使用率最高的服务 max_metric max_service for key, value in state.metrics.items(): if usage_percent in key and value (state.metrics.get(max_metric, 0)): max_metric key max_service key.split(_)[0] return max_service or unknown if category FaultCategory.DEPENDENCY_FAILURE: # 依赖故障找到告警中最上游的服务 alert_services set(a.get(service, ) for a in state.alerts) for svc in alert_services: deps state.topology.get(svc, []) if not any(d in alert_services for d in deps): return svc # 默认取第一个 Critical 告警的服务 for alert in state.alerts: if alert.get(severity) critical: return alert.get(service, unknown) return unknown def _find_affected_services( self, root: str, topology: dict[str, list[str]] ) - list[str]: 查找受影响的服务 affected [root] visited {root} queue [root] while queue: current queue.pop(0) for svc, deps in topology.items(): if current in deps and svc not in visited: affected.append(svc) visited.add(svc) queue.append(svc) return affected def _generate_actions( self, category: FaultCategory, root: str, affected: list[str], state: SystemState, ) - list[dict]: 根据故障类别生成推荐操作 actions [] if category FaultCategory.RESOURCE_EXHAUSTION: # 资源耗尽扩容 清理 actions.append({ action_type: scale, target: root, parameters: {replicas: 2}, description: f扩容 {root} 增加 2 个副本, estimated_impact: 扩容期间服务可用性不受影响, rollback_command: fkubectl scale deployment {root} --replicas当前值-2, }) actions.append({ action_type: restart, target: root, parameters: {}, description: f重启 {root} 释放内存碎片, estimated_impact: 短暂不可用约 10 秒, rollback_command: 无需回滚, }) elif category FaultCategory.SERVICE_UNAVAILABLE: # 服务不可用重启 检查依赖 actions.append({ action_type: restart, target: root, parameters: {}, description: f重启不可用服务 {root}, estimated_impact: 服务短暂不可用, rollback_command: 无需回滚, }) elif category FaultCategory.DEPENDENCY_FAILURE: # 依赖故障修复根因服务 actions.append({ action_type: restart, target: root, parameters: {}, description: f重启根因服务 {root}, estimated_impact: 依赖链上的服务可能短暂受影响, rollback_command: 无需回滚, }) elif category FaultCategory.CONFIGURATION_ERROR: # 配置错误回滚最近的变更 if state.recent_changes: latest state.recent_changes[0] actions.append({ action_type: config_change, target: latest.get(service, unknown), parameters: {revert_to: latest.get(previous_version)}, description: f回滚配置变更: {latest.get(description, )}, estimated_impact: 服务需要重启以加载旧配置, rollback_command: fgit revert {latest.get(commit, )}, }) # 将操作字典转化为 Action 对象并评估风险 result [] for a in actions: action Action( action_idhashlib.md5( f{a[action_type]}:{a[target]}:{time.time()}.encode() ).hexdigest()[:8], action_typea[action_type], targeta[target], parametersa[parameters], risk_levelRiskLevel.L0, # 初始值后续由 assessor 修正 descriptiona[description], estimated_impacta[estimated_impact], rollback_commanda[rollback_command], ) action.risk_level self.risk_assessor.assess(action) result.append({ action_id: action.action_id, action_type: action.action_type, target: action.target, risk_level: action.risk_level.value, description: action.description, estimated_impact: action.estimated_impact, rollback_command: action.rollback_command, }) return result def _collect_evidence( self, state: SystemState, category: FaultCategory, root: str, ) - list[str]: 收集诊断证据 evidence [] evidence.append(f故障类别: {category.value}) evidence.append(f根因服务: {root}) for key, value in state.metrics.items(): if usage_percent in key and value 80: evidence.append(f指标异常: {key} {value:.1f}%) for alert in state.alerts: if alert.get(severity) in (critical, warning): evidence.append( f告警: [{alert.get(severity)}] f{alert.get(service)} - {alert.get(summary, )} ) return evidence def _compute_confidence( self, category: FaultCategory, runbook_id: Optional[str], evidence: list[str], ) - float: 计算诊断置信度 confidence 0.5 # 基础置信度 # 有匹配的 Runbook 提升置信度 if runbook_id: confidence 0.2 # 故障类别不是 UNKNOWN 提升置信度 if category ! FaultCategory.UNKNOWN: confidence 0.1 # 证据充分提升置信度 if len(evidence) 3: confidence 0.1 return min(confidence, 1.0) def _auto_execute(self, action: Action) - dict: 自动执行操作L0 级别 # 生产环境应替换为实际的执行逻辑 # 如调用 kubectl API、Ansible playbook 等 return { status: executed, action_id: action.action_id, action_type: action.action_type, target: action.target, message: f已自动执行: {action.description}, timestamp: datetime.now().isoformat(), } # 使用示例 if __name__ __main__: agent OperationsAgent() # 模拟系统状态 state SystemState( alerts[ {service: mysql-primary, severity: critical, summary: MySQL 主库连接池耗尽}, {service: user-service, severity: warning, summary: 用户服务查询超时}, {service: order-service, severity: warning, summary: 订单服务查询超时}, ], metrics{ mysql_cpu_usage_percent: 92.5, mysql_memory_usage_percent: 88.3, mysql_disk_usage_percent: 72.1, api-gateway_cpu_usage_percent: 45.0, }, topology{ api-gateway: [user-service, order-service], user-service: [mysql-primary, redis-cluster], order-service: [mysql-primary, kafka], mysql-primary: [], redis-cluster: [], kafka: [], }, recent_changes[], ) # 执行诊断 diagnosis agent.diagnose(state) print(f诊断结果:) print(f 故障类别: {diagnosis.fault_category.value}) print(f 根因服务: {diagnosis.root_service}) print(f 影响范围: {diagnosis.affected_services}) print(f 置信度: {diagnosis.confidence:.2f}) print(f Runbook: {diagnosis.runbook_id}) print(f 证据:) for e in diagnosis.evidence: print(f - {e}) print(f 推荐操作:) for action in diagnosis.recommended_actions: print(f - [{action[risk_level]}] {action[description]}) print(f 影响: {action[estimated_impact]})四、运维 Agent 的边界自动化与可控性的永恒张力LLM 推理的不可靠性当故障超出 Runbook 覆盖范围时Agent 需要依赖 LLM 生成处置建议。但 LLM 的输出不可预测——可能生成错误的操作命令如删除生产数据可能遗漏关键步骤可能对故障的严重性判断错误。解决方案是LLM 生成 规则校验——LLM 的输出必须经过规则引擎校验如命令白名单、参数范围检查校验通过后才能进入执行流程。级联操作的风险放大Agent 执行一个操作后可能触发新的告警Agent 再次诊断并执行操作形成级联。如果第一次操作的方向错误级联效应会放大错误的影响。解决方案是设置操作冷却期——同一服务在 5 分钟内只允许执行一次自动操作后续操作需要人工确认。状态感知的完整性Agent 的推理质量取决于感知层的数据完整性。如果指标数据缺失、拓扑数据过期、告警数据延迟Agent 的诊断可能基于不完整的信息做出错误决策。生产环境必须确保感知层的数据质量——指标采集的完整性、拓扑数据的实时性、告警数据的准确性。人在回路的效率瓶颈L1/L2 级操作需要人工确认但人工确认的响应时间通常在 5-15 分钟。如果故障快速恶化等待确认的时间窗口可能导致故障扩大。解决方案是引入渐进式自动化——随着 Agent 的诊断准确率提升逐步将 L1 操作降级为 L0 自动执行但 L2 操作始终保持人工确认。五、总结运维 Agent 的核心架构是感知→推理→执行的闭环关键设计原则是人在回路——低风险操作自动执行提升效率高风险操作人工确认保障安全。故障分类器和 Runbook 匹配器处理已知故障模式LLM 推理处理未知故障模式风险评估器决定操作执行方式。但 Agent 的可靠性受限于感知数据的质量、LLM 推理的不可靠性和级联操作的风险放大必须在自动化与可控性之间找到平衡。落地路线建议第一步实现感知层和故障分类器验证诊断准确率第二步注册核心 Runbook实现 L0 级自动执行第三步引入 LLM 推理处理未知故障但所有输出必须经过规则校验第四步持续度量 Agent 的诊断准确率和操作成功率逐步扩大自动化范围。Agent 的能力边界必须清晰——它是一个辅助工具不是替代运维工程师的方案。