【Kafka源码解读和使用指南】第88篇:日志收集平台的Kafka实战——百亿日志的接入、传输与清洗
上一篇【第87篇】电商订单系统的Kafka实战——从下单到通知的完整消息链路设计下一篇【第89篇】实时数据同步平台的Kafka实战——MySQL CDC与Kafka的最佳组合摘要日活千万的应用每天产生几百亿条日志——如果每个微服务直接把日志打到ElasticsearchES早就被打挂了。Kafka在这条链路上扮演缓冲海绵的角色前端的Filebeat/Logstash把日志快速投进Kafka后端的消费者按自己的节奏慢慢消费写入ES两端完全解耦谁也拖不垮谁。本文从零设计一个能扛住百亿日活的日志收集平台覆盖架构选型Filebeat vs Logstash谁放前端、Topic分区策略按日志级别/应用分组、高峰期自动扩容方案、JSON统一日志格式的落地规范以及采样与过滤降低80%存储成本的实战技巧。代码和配置都能直接抄作业。一、日志收集架构全景图【百亿日志收集平台架构】 ┌─────────────────────────────────────────────────────────────────────────┐ │ 日志产生层应用集群 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 服务 A │ │ 服务 B │ │ 服务 C │ │ 服务 D │ │ 服务 N │ │ │ │(容器化) │ │(容器化) │ │(容器化) │ │(容器化) │ │(容器化) │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │══════════════│══════════════│══════════════│══════════════│ │ │ │ │ │ │ │ │ │ │ stdout → │ 文件日志 → │ syslog → │ 自定义管道 → │ │ └───────┼──────────────┼──────────────┼──────────────┼──────────────┼──────┘ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 采集层轻量级Agent │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │Filebeat 1│ │Filebeat 2│ │Filebeat 3│ │Filebeat 4│ │Filebeat N│ │ │ │ CPU: 1% │ │ CPU: 1% │ │ CPU: 1% │ │ CPU: 1% │ │ CPU: 1% │ │ │ │ MEM: 30M │ │ MEM: 30M │ │ MEM: 30M │ │ MEM: 30M │ │ MEM: 30M │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │══════════════│══════════════│══════════════│══════════════│ │ └───────┼──────────────┼──────────────┼──────────────┼──────────────┼──────┘ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ 缓冲层Kafka Cluster │ │ │ │ ┌────────────────────────────────────────────────────────────────┐ │ │ │ Topic: app-logs (按日志级别分区) │ │ │ │ ├── Partition 0-5: ERROR 日志 (高优先级先消费) │ │ │ │ ├── Partition 6-9: WARN 日志 │ │ │ │ ├── Partition 10-11: INFO 日志 (大部分数据) │ │ │ │ └── Partition 12-15: DEBUG 日志 (高吞吐可采样丢弃) │ │ │ └────────────────────────────────────────────────────────────────┘ │ │ │ │ ┌────────────────────────────────────────────────────────────────┐ │ │ │ Topic: app-logs-filtered (过滤后只有ERROR和WARN) │ │ │ │ └── Partition 0-3: 采样 过滤后的日志 │ │ │ └────────────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────┬──────────────────────────────────────┘ │ ┌──────────────┼──────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │Logstash 1│ │Logstash 2│ │Logstash 3│ │ 清洗 │ │ 清洗 │ │ 清洗 │ │ 解析 │ │ 解析 │ │ 解析 │ │ 过滤 │ │ 过滤 │ │ 过滤 │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └──────────────┼──────────────┘ ▼ ┌──────────────────────┐ │ Elasticsearch │ │ ├── hot节点(SSD) │ │ ├── warm节点(HDD) │ │ └── cold节点(归档) │ └──────────┬───────────┘ ▼ ┌──────────────────────┐ │ Kibana / Grafana │ │ 日志查询 / 监控面板 │ └──────────────────────┘缓冲层为什么用Kafka而不用RedisRedis内存贵存不下百亿日志Kafka磁盘便宜顺序写快天然适合当日志缓冲。二、Topic与分区策略设计2.1 两种Topic设计方案对比方案设计优点缺点按日志级别ERROR/WARN/INFO/DEBUG各一个Topic优先级清晰、重要日志不被淹Topic多、管理复杂按应用分组每个服务一个Topic隔离性好、各自管理跨应用查询不方便统一Topic分区路由一个Topic按级别路由到不同分区管理简单、灵活流量大时单Topic压力大推荐方案一个统一的app-logsTopic用日志级别作为分区Key。// Producer端按日志级别路由分区publicclassLogPartitionerimplementsPartitioner{Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){Stringlevel(String)key;// key 日志级别intnumPartitionscluster.partitionCountForTopic(topic);// ERROR → 前几号分区优先消费// WARN → 中间分区// INFO/DEBUG → 尾部分区returnswitch(level){caseERROR-0;// 保证ERROR进入固定分区caseWARN-1;caseINFO-Math.abs(INFO.hashCode())%(numPartitions-2)2;caseDEBUG-Math.abs(DEBUG.hashCode())%(numPartitions-2)2;default-2;};}}2.2 分区数量规划日日志量推荐分区数副本数保留时间 1亿条627天1-10亿条1233天10-100亿条2431天 100亿条48312小时2.3 Filebeat配置前端采集# filebeat.ymlfilebeat.inputs:-type:logenabled:truepaths:-/var/log/app/*.log# 多行日志合并Java异常堆栈multiline:pattern:^\d{4}-\d{2}-\d{2}negate:truematch:after# 添加元数据fields:app:order-serviceenv:productiondatacenter:shanghaifields_under_root:true# 输出到Kafkaoutput.kafka:hosts:[broker1:9092,broker2:9092,broker3:9092]topic:app-logskey:%{[log.level]}# 用日志级别做Key → 确定分区路由partition.round_robin:reachable_only:false# 压缩与批处理compression:lz4max_message_bytes:1048576worker:4# 背压保护Kafka不可用时先写磁盘缓冲queue.mem:events:4096flush.min_events:2048flush.timeout:5s三、高峰期的分区扩容策略3.1 动态扩容流程【分区扩容操作流程】 Step 1: 发现预警 Consumer Lag 100万 OR Kafka磁盘使用率 80% │ ▼ Step 2: 执行扩容 kafka-topics.sh --alter --topic app-logs --partitions 24 (原12分区扩容到24) │ ▼ Step 3: 扩容消费者实例 kubectl scale deployment logstash-consumer --replicas 12 (消费者从6扩容到12) │ ▼ Step 4: 监控观察 等待Rebalance完成观察Lag是否下降 │ ▼ Step 5: 高峰期过后缩容 (消费者缩回6分区数不缩减——Kafka不支持减少分区)重要提醒Kafka支持在线增加分区--alter --partitions 24但不支持减少分区。所以初始分区数别设太大按需扩容即可。# 在线扩容无需重启Broker毫秒级生效kafka-topics.sh --bootstrap-server broker1:9092\--alter--topicapp-logs--partitions24# 扩容后立即监控消费Lagkafka-consumer-groups.sh --bootstrap-server broker1:9092\--grouplogstash-consumer--describe3.2 Logstash Consumer扩容后的无缝衔接# logstash-consumer.confinput{kafka{bootstrap_serversbroker1:9092,broker2:9092,broker3:9092topics[app-logs]group_idlogstash-consumerclient_idlogstash-consumer-${POD_NAME}# 用Pod名做client_idauto_offset_resetlatestenable_auto_commitfalsemax_poll_records500consumer_threads1# 每个Logstash实例1个消费者线程}}filter{# 过滤不必要的DEBUG日志降本80%if[log][level]DEBUG{drop{}}# JSON解析json{sourcemessageskip_on_invalid_jsontrue}# 提取关键字段mutate{add_field{kafka_topic%{[metadata][kafka][topic]}kafka_offset%{[metadata][kafka][offset]}kafka_partition%{[metadata][kafka][partition]}}}}output{elasticsearch{hosts[es-hot-1:9200,es-hot-2:9200,es-hot-3:9200]indexapp-logs-%{YYYY.MM.dd}template/etc/logstash/templates/app-logs.jsontemplate_nameapp-logsbulk_max_size500idle_flush_time5}}四、日志格式规范化——JSON是唯一标准4.1 统一日志规范不管是Java的logback、Python的logging还是Go的zap输出必须统一成JSON格式{timestamp:2026-05-30T10:30:00.123Z,level:ERROR,logger:com.example.order.OrderService,thread:http-nio-8080-exec-5,message:订单创建失败: 库存不足,service:order-service,instance:order-service-pod-abc12,traceId:trace_abc123def456,spanId:span_789,userId:U10086,orderId:ORD2026053000001,duration:150,exception:{class:com.example.InsufficientStockException,message:商品SKU001库存不足,stacktrace:com.example.order.OrderService.createOrder(OrderService.java:156)\n ...},tags:{env:production,region:shanghai}}4.2 Logback配置Java!-- logback-spring.xml --configurationappendernameCONSOLE_JSONclassch.qos.logback.core.ConsoleAppenderencoderclassnet.logstash.logback.encoder.LoggingEventCompositeJsonEncoderproviderstimestamptimeZoneUTC/timeZone/timestamplogLevel/loggerName/threadName/message/stackTracethrowableConverterclassnet.logstash.logback.stacktrace.ShortenedThrowableConvertermaxDepthPerThrowable30/maxDepthPerThrowablemaxLength2048/maxLengthshortenedClassNameLength20/shortenedClassNameLength/throwableConverter/stackTrace!-- MDC中的traceId、userId等 --mdc/context/!-- 自定义字段 --patternpattern{ service: order-service }/pattern/pattern/providers/encoder/appenderrootlevelINFOappender-refrefCONSOLE_JSON//root/configuration4.3 字段规范速查表字段类型必填说明timestampISO8601是日志产生时间UTC时区levelstring是ERROR/WARN/INFO/DEBUGmessagestring是日志内容servicestring是产生日志的服务名traceIdstring推荐分布式追踪ID透传userIdstring可选关联的用户ID方便排查durationnumber可选请求耗时毫秒exceptionobject可选异常信息classmessagestacktrace五、采样与过滤——降本的关键5.1 为什么需要采样百亿日志如果全量写入ES存储成本是天文数字。事实上80%的DEBUG和INFO日志可能永远不会被查看全部存下来就是浪费。5.2 分层过滤策略【日志分层过滤】 原始日志量 (100%) │ ▼ ┌─────────────────────────────┐ │ Level 1: 前端Filebeat过滤 │ │ 丢弃: 健康检查/心跳/静态资源 │ │ 结果: 过滤掉约15% │ └─────────────┬───────────────┘ │ 剩余约85% ▼ ┌─────────────────────────────┐ │ Level 2: Logstash清洗过滤 │ │ ERROR → 100%保留 │ │ WARN → 100%保留 │ │ INFO → 采样10% │ │ DEBUG → 完全丢弃 │ │ 结果: 再过滤掉约70% │ └─────────────┬───────────────┘ │ 剩余约15% ▼ ┌─────────────────────────────┐ │ Level 3: ES Rollover 冷热 │ │ hot(SSD,1天) → warm(HDD,7天) │ │ → cold(归档,30天) │ │ 结果: 存储成本降低90% │ └─────────────────────────────┘5.3 Filebeat端的过滤# filebeat.yml - 丢弃健康检查等无用日志filebeat.inputs:-type:logpaths:-/var/log/app/*.log# 排除健康检查日志exclude_lines:[/health,/metrics,/actuator,status:200.*/health]# 只采集大于200字节的日志排除空行include_lines:[.{200,}]5.4 Logstash端的采样# logstash-sampling.conffilter{# 判断日志级别决定是否保留ruby{code levelevent.get([log][level])iflevelERRORorlevelWARN# ERROR和WARN 100%保留event.tag(keep)elsiflevelINFO# INFO级别10%采样event.tag(keep)ifrand0.1end# DEBUG直接丢弃不tag keep}# 未标记keep的事件全部丢弃ifkeepnotin[tags]{drop{}}}六、关键优化技巧总结优化维度技术手段效果写入速度Filebeat异步批量压缩(lz4)单实例轻松10万条/秒Kafka缓冲适当保留时间(6-24小时)后端故障时有充足时间恢复消费吞吐消费者数量分区数max.poll.records500充分利用并行能力ES写入bulk批量写入调整refresh_interval写入速度提升3-5倍存储成本分级采样 冷热分离 索引生命周期降低80-90%成本查询性能合理的索引模板和字段映射查询速度提升10倍本篇小结一个能扛住百亿日志的收集平台核心设计原则只有三条Kafka做缓冲别让日志洪峰打垮后端Filebeat轻量采集→Kafka缓冲→Logstash消费→ES存储前端和后端完全解耦Topic按日志级别分区重要日志优先处理ERROR放前面分区DEBUG可以整批丢弃采样过滤降本核心ERROR/WARN全量保留INFO采10%DEBUG直接丢——不影响排查能力的同时降低80%存储成本记住日志平台的价值不在于存了多少而在于出了问题能多快定位。把最重要的日志保留好比什么都存更有意义。上一篇【第87篇】电商订单系统的Kafka实战——从下单到通知的完整消息链路设计下一篇【第89篇】实时数据同步平台的Kafka实战——MySQL CDC与Kafka的最佳组合