深度解析安全监控与响应:Python 实现 SIEM 日志分析实战
深度解析安全监控与响应Python 实现 SIEM 日志分析实战1. 技术分析1.1 安全监控概述安全监控是实时检测和响应安全事件的实践监控类型 日志监控: 分析日志数据 网络监控: 监控网络流量 主机监控: 监控主机活动 用户行为监控: 监控用户行为 监控目的: 实时检测 异常识别 威胁预警 快速响应1.2 安全响应响应流程 检测阶段: 发现异常 分析阶段: 评估威胁 响应阶段: 采取行动 恢复阶段: 恢复正常 响应类型: 主动响应: 自动阻断 被动响应: 人工干预 预防性响应: 提前防护1.3 SIEM系统SIEM功能 日志收集: 聚合日志 威胁检测: 识别威胁 安全分析: 分析事件 报告生成: 生成报告 SIEM特性: 实时监控 关联分析 威胁情报 合规报告2. 核心功能实现2.1 安全监控系统import time import threading class SecurityMonitor: def __init__(self): self.rules [] self.alerts [] self.is_running False def add_rule(self, rule): self.rules.append(rule) def start_monitoring(self): self.is_running True self._monitor_loop() def stop_monitoring(self): self.is_running False def _monitor_loop(self): while self.is_running: for rule in self.rules: if rule.check(): alert { timestamp: time.time(), rule_name: rule.name, severity: rule.severity, message: rule.message } self.alerts.append(alert) self._handle_alert(alert) time.sleep(1) def _handle_alert(self, alert): print(fALERT [{alert[severity].upper()}]: {alert[message]}) def get_alerts(self, severityNone): if severity: return [a for a in self.alerts if a[severity] severity] return self.alerts def generate_summary(self): summary { total_alerts: len(self.alerts), by_severity: { critical: 0, high: 0, medium: 0, low: 0 } } for alert in self.alerts: sev alert[severity] if sev in summary[by_severity]: summary[by_severity][sev] 1 return summary2.2 规则引擎class SecurityRule: def __init__(self, name, condition, message, severitymedium): self.name name self.condition condition self.message message self.severity severity def check(self): return self.condition() class RuleEngine: def __init__(self): self.rules [] def add_rule(self, rule): self.rules.append(rule) def evaluate(self, event): matches [] for rule in self.rules: if rule.condition(event): matches.append(rule) return matches def evaluate_all(self, events): results [] for event in events: matching_rules self.evaluate(event) if matching_rules: results.append({ event: event, matches: matching_rules }) return results2.3 威胁情报系统class ThreatIntelligenceSystem: def __init__(self): self.threats {} def add_threat(self, threat_id, info): self.threats[threat_id] info def get_threat(self, threat_id): return self.threats.get(threat_id) def search_threats(self, **filters): results [] for threat_id, info in self.threats.items(): match True for key, value in filters.items(): if info.get(key) ! value: match False break if match: results.append(info) return results def check_ip(self, ip): malicious_ips [192.168.1.100, 10.0.0.5] if ip in malicious_ips: return { threat: True, category: malicious_ip, confidence: high } return { threat: False, category: None, confidence: None } def check_domain(self, domain): malicious_domains [malicious.com, phishing.net] if domain in malicious_domains: return { threat: True, category: malicious_domain, confidence: high } return { threat: False, category: None, confidence: None }2.4 安全响应自动化class SecurityResponseAutomation: def __init__(self): self.actions {} def register_action(self, name, action): self.actions[name] action def execute_action(self, action_name, **params): if action_name in self.actions: return self.actions[action_name](**params) raise ValueError(fUnknown action: {action_name}) def execute_response(self, alert): severity alert[severity] if severity critical: self.execute_action(block_ip, ipalert.get(source_ip)) self.execute_action(notify_admin) elif severity high: self.execute_action(quarantine) self.execute_action(log_event, eventalert) else: self.execute_action(log_event, eventalert) def block_ip(self, ip): print(fBlocking IP: {ip}) return True def quarantine(self): print(Quarantining affected system) return True def notify_admin(self): print(Notifying administrator) return True def log_event(self, event): print(fLogging event: {event}) return True3. 性能对比3.1 监控类型对比类型覆盖范围实时性复杂度日志监控高中低网络监控中高中主机监控低高中3.2 SIEM系统对比系统功能价格易用性Splunk全面高中ELK Stack开源低中IBM QRadar企业级高低3.3 响应类型对比类型速度准确性风险自动响应快中中人工响应慢高低4. 最佳实践4.1 监控配置示例def configure_monitor(): monitor SecurityMonitor() failed_login_count [0] def check_failed_logins(): failed_login_count[0] 1 return failed_login_count[0] 5 rule SecurityRule( nameBruteForceDetection, conditioncheck_failed_logins, messageMultiple failed login attempts detected, severityhigh ) monitor.add_rule(rule) monitor.start_monitoring() time.sleep(3) summary monitor.generate_summary() print(fMonitor summary: {summary}) monitor.stop_monitoring()4.2 威胁情报示例def threat_intelligence_example(): tis ThreatIntelligenceSystem() tis.add_threat(CVE-2021-34527, { name: Print Spooler Vulnerability, severity: critical, description: Remote code execution vulnerability }) ip_check tis.check_ip(192.168.1.100) print(fIP check result: {ip_check}) domain_check tis.check_domain(malicious.com) print(fDomain check result: {domain_check})5. 总结安全监控与响应是实时保护系统安全的关键安全监控实时检测异常规则引擎定义检测规则威胁情报获取威胁信息响应自动化自动响应安全事件对比数据如下网络监控实时性最高Splunk功能最全面自动响应速度最快推荐多层次监控体系安全监控与响应需要结合人工和自动化建立快速响应机制。