目录摘要一、异常检测引擎概述1.1 什么是异常检测引擎1.2 异常检测引擎特点1.3 适用场景二、创建异常检测引擎2.1 基本语法2.2 创建简单引擎2.3 输出格式三、异常检测规则3.1 阈值规则3.2 统计规则3.3 组合规则3.4 复杂规则四、窗口模式4.1 无窗口模式4.2 有窗口模式五、实战案例5.1 设备温度告警系统5.2 多指标组合告警5.3 统计异常检测六、引擎管理6.1 查看引擎状态6.2 删除引擎6.3 引擎监控七、告警通知7.1 告警推送7.2 告警聚合八、总结参考资料摘要本文深入讲解DolphinDB异常检测引擎。从引擎原理到创建配置从规则定义到告警输出从多规则组合到实战应用全面介绍异常检测引擎的核心功能。通过丰富的代码示例帮助读者掌握实时告警系统的核心技能。一、异常检测引擎概述1.1 什么是异常检测引擎异常检测引擎是DolphinDB内置的流计算引擎用于实时检测数据异常异常检测引擎匹配不匹配数据流异常规则规则匹配输出告警正常数据规则类型阈值规则统计规则组合规则1.2 异常检测引擎特点特点说明实时检测毫秒级延迟多规则支持多规则组合灵活配置Lambda表达式定义规则自动输出异常数据自动输出1.3 适用场景场景说明设备告警设备参数异常告警阈值监控实时阈值监控趋势异常数据趋势异常检测组合告警多条件组合告警二、创建异常检测引擎2.1 基本语法//创建异常检测引擎 aggcreateAnomalyDetectionEngine(engine_name,//引擎名称 metrics,//异常检测规则Lambda表达式列表 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列可选[windowSize],//窗口大小[garbageSize]//垃圾回收阈值)2.2 创建简单引擎//创建输入流表 share streamTable(1:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,device_idtimestamptemperatureanomaly_type,[INT,TIMESTAMP,DOUBLE,SYMBOL])asoutput_table//创建异常检测引擎 aggcreateAnomalyDetectionEngine(anomaly_engine,[temperature30,//规则1温度30temperature10],//规则2温度10output_table,timestamp,device_id)//订阅流表 subscribeTable(,input_stream,anomaly_detect,-1,agg,true)2.3 输出格式//异常检测输出格式//-device_id:分组列//-timestamp:时间列//-temperature:检测列//-anomaly_type:异常类型规则索引//例如//device_id|timestamp|temperature|anomaly_type//1|10:00:00|35.0|0(规则0匹配温度30)//2|10:00:01|5.0|1(规则1匹配温度10)三、异常检测规则3.1 阈值规则//阈值规则简单的阈值判断 aggcreateAnomalyDetectionEngine(threshold_engine,[temperature30,//高温告警 temperature10,//低温告警 humidity80,//高湿度告警 humidity20],//低湿度告警 output_table,timestamp,device_id)3.2 统计规则//统计规则基于统计指标//需要配合窗口使用 aggcreateAnomalyDetectionEngine(stats_engine,[temperatureavg(temperature)3*std(temperature),//超过3倍标准差 temperatureavg(temperature)-3*std(temperature)],//低于3倍标准差 output_table,timestamp,device_id,60000)//窗口大小60秒3.3 组合规则//组合规则多条件组合 aggcreateAnomalyDetectionEngine(combo_engine,[temperature30andhumidity70,//高温高湿 temperature10andhumidity30,//低温低湿 temperature35orhumidity90],//高温或高湿 output_table,timestamp,device_id)3.4 复杂规则//复杂规则使用函数 aggcreateAnomalyDetectionEngine(complex_engine,[abs(temperature-prev(temperature))5,//温度突变5度 temperature30andtemperatureprev(temperature)*1.2],//温度突增20%output_table,timestamp,device_id)四、窗口模式4.1 无窗口模式//无窗口每条数据独立判断 aggcreateAnomalyDetectionEngine(no_window_engine,[temperature30],output_table,timestamp,device_id)//适用场景//-简单阈值判断//-不需要历史数据4.2 有窗口模式//有窗口基于窗口内数据判断 aggcreateAnomalyDetectionEngine(window_engine,[temperatureavg(temperature)3*std(temperature)],output_table,timestamp,device_id,60000)//60秒窗口//适用场景//-统计规则//-需要历史数据五、实战案例5.1 设备温度告警系统//1.创建流表share streamTable(100000:0,device_idtimestamptemperaturehumiditypressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//2.创建告警输出表share table(1:0,device_idtimestampvalueanomaly_typealert_level,[INT,TIMESTAMP,DOUBLE,SYMBOL,INT])asalert_table//3.创建异常检测引擎aggcreateAnomalyDetectionEngine(temp_alert_engine,[temperature35,//规则0严重高温 temperature30,//规则1一般高温 temperature5,//规则2严重低温 temperature10],//规则3一般低温 alert_table,timestamp,device_id)//4.订阅流表subscribeTable(,sensor_stream,temp_alert,-1,agg,true)//5.添加告警级别share table(1:0,device_idtimestampvalueanomaly_typealert_level,[INT,TIMESTAMP,DOUBLE,SYMBOL,INT])asfinal_alert_table subscribeTable(,alert_table,level_handler,-1,def(msg){resultselect device_id,timestamp,value,anomaly_type,casewhen anomaly_typein[0,2]then2//严重else1endasalert_levelfrommsg final_alert_table.append!(result)},true)//6.模拟数据defsimulateAlert(){for(iin1..100){sensor_stream.append!(table(take(1..10,100)asdevice_id,take(now(),100)astimestamp,rand(0.0..40.0,100)astemperature,//包含异常温度 rand(40.0..60.0,100)ashumidity,rand(1000.0..1020.0,100)aspressure))sleep(100)}}simulateAlert()//查看告警 select*fromfinal_alert_table order by timestamp desc limit205.2 多指标组合告警//1.创建流表share streamTable(100000:0,device_idtimestamptemperaturehumidityvibration,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//2.创建告警输出表share table(1:0,device_idtimestampanomaly_typedescription,[INT,TIMESTAMP,SYMBOL,STRING])ascombo_alert_table//3.创建组合告警引擎aggcreateAnomalyDetectionEngine(combo_alert_engine,[temperature30andhumidity70,//高温高湿 temperature30andvibration5,//高温高振动 humidity80andvibration3,//高湿高振动 temperature35orvibration10],//严重异常 combo_alert_table,timestamp,device_id)//4.订阅流表subscribeTable(,sensor_stream,combo_alert,-1,agg,true)5.3 统计异常检测//1.创建流表share streamTable(100000:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//2.创建告警输出表share table(1:0,device_idtimestamptemperatureavg_tempstd_tempanomaly_type,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,SYMBOL])asstats_alert_table//3.创建统计异常引擎aggcreateAnomalyDetectionEngine(stats_alert_engine,[temperatureavg(temperature)3*std(temperature),temperatureavg(temperature)-3*std(temperature)],stats_alert_table,timestamp,device_id,300000)//5分钟窗口//4.订阅流表subscribeTable(,sensor_stream,stats_alert,-1,agg,true)六、引擎管理6.1 查看引擎状态//查看所有引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat(anomaly_engine)6.2 删除引擎//删除引擎 dropStreamEngine(anomaly_engine)6.3 引擎监控//引擎监控函数defmonitorAnomalyEngine(){statgetStreamEngineStat()for(rowinstat){if(row.typeAnomalyDetectionEngine){print(异常检测引擎: row.name)print( 状态: row.status)print( 规则数: string(row.numMetrics))print( 处理行数: string(row.processedRows))print( 告警数: string(row.outputRows))}}}monitorAnomalyEngine()七、告警通知7.1 告警推送//告警推送函数defpushAlert(alert){//发送邮件//sendEmail(alert)//发送短信//sendSMS(alert)//发送微信//sendWeChat(alert)print(告警推送: alert.device_id - alert.anomaly_type)}//订阅告警表 subscribeTable(,alert_table,push_handler,-1,def(msg){for(rowinmsg){pushAlert(row)}},true)7.2 告警聚合//告警聚合避免告警风暴 share table(1:0,device_idanomaly_typefirst_timelast_timecount,[INT,SYMBOL,TIMESTAMP,TIMESTAMP,LONG])asaggregated_alerts//聚合逻辑defaggregateAlerts(msg){for(rowinmsg){existingselect*fromaggregated_alerts where device_idrow.device_idandanomaly_typerow.anomaly_typeandlast_timenow()-60000//1分钟内if(existing.rows()0){update aggregated_alertssetlast_timerow.timestamp,countcount1where device_idrow.device_idandanomaly_typerow.anomaly_typeandlast_timenow()-60000}else{insert into aggregated_alerts values(row.device_id,row.anomaly_type,row.timestamp,row.timestamp,1)}}}subscribeTable(,alert_table,aggregate_handler,-1,aggregateAlerts,true)八、总结本文详细介绍了DolphinDB异常检测引擎引擎原理实时异常检测创建方法简单引擎、分组引擎检测规则阈值规则、统计规则、组合规则窗口模式无窗口、有窗口实战应用温度告警、组合告警、统计异常告警通知告警推送、告警聚合思考题如何设计合理的告警规则如何避免告警风暴统计异常检测适合什么场景参考资料DolphinDB异常检测引擎DolphinDB流计算