本帖最后由 我是楠楠 于 2018-10-10 16:16 编辑
【郑州校区】Storm学习笔记之案例:实时日志监控告警系统
8.1、课程目标- 介绍企业监控系统的一种实现方式
- 集成Flume+Kafka+Storm+Redis
- 配置Flume脚本及自定义Flume的拦截器
- 规划Kafka集群及设计Topic的分片&副本数量
- Storm整合Mysql数据库
- Storm集成发送短信、邮件功能
- 定时将数据从mysql数据中更新到Storm程序
- 学生动手开发一套流式计算的程序
8.2、开发步骤1)创建数据库的表架构、初始化业务系统名称、业务系统需要监控的字段、业务系统的开发人员(手机号、邮箱地址) 2)编写Flume脚本去收集数据 3)创建Kafka的topic 4)编写Storm程序。 8.3、数据库表结构1)用户表(开发人员表)用户编号、用户名称、用户手机号、用户邮箱地址、是否可用 2)应用程序表用来保存应用的信息,包括应用名称、应用描述、应用是否在线等信息 3)规则表(每个应用系统要监控哪些规则)4)结果表用来保存触发规则后的记录,包括告警编号、是否短信告知、是否邮件告知、告警明细等信息。 8.4、Flume+Kafka整合1)启动zookeeper 2)启动 Kafka 3)创建 Kafka Topic kafka-topics.sh --create --zookeeper zk01:2181 --topic system_log --partitions 6 --replication-factor 2 4)配置 Flume配置文件 [AppleScript] 纯文本查看 复制代码 [root@node01 nginx_flume_kafka]# cat exec.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume/click_log/data.log
a1.sources.r1.channels = c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = system_log
a1.sinks.k1.brokerList = kafka01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1 | |
5)启动 Flume [AppleScript] 纯文本查看 复制代码 flume-ng agent -n a1 -c /export/servers/flume/conf -f /export/servers/flume/myconfig/nginx_flume_kafka/exec.conf -Dflume.root.logger=INFO,console |
6)启动模拟器生成日志 [AppleScript] 纯文本查看 复制代码 [root@node01 nginx_flume_kafka]# pwd
/export/servers/flume/myconfig/nginx_flume_kafka
[root@node01 nginx_flume_kafka]# cat click_log_out.sh
for((i=0;i<=500000;i++));
do echo "i am lilei "+$i >> /export/data/flume/click_log/data.log;
done |
7)启动Kafka Consumer [AppleScript] 纯文本查看 复制代码 kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning –topic system_log |
8.5、Flume拦截器使用1)上传资料中的文件到 flume的lib包下 2)生产模拟日志生成器和flume配置文件到集群上 上传到(没有目录就创建) /export/servers/flume/myconfig/app_interceptor 3)启动模拟日志生成器 4)创建topic kafka-topics.sh --create --zookeeper zk01:2181 --topic log_monitor --partitions 6 --replication-factor 2 5)启动Flume 6)启动Kafka 消费者 kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic log_monitor 8.6、Flume拦截器实现1)配置 在flumesource上配置拦截器 2)使用反编译工具查看 AppInterceptorBuilder Configure方法在类初始化之后会被调用,由flume框架传入context上下文对象 从context上下文中获取配置文件中配置的appid配置项 使用build方法构建AppInterceptor类,并将appid传入到AppInterceptor 3) 使用 反编译工具查看AppInterceptor 通过构造器获得AppInterceptorBuilder传入的appid 在拦截器方法调用的时候,对原始数据进行获取,然后添加内容。 8.7 Storm程序worker&Task数量设置 1) KafkaSpout并行度初始化状态 2) KafkaSpout读取数据的5个字段 8.8 业务逻辑1)KafkaSpout 负责读取数据。并行度设置6个,在一个消费组中启动6个消费者(Task) 2)ProcessBolt 负责检验数据中是否包含关键词 读取上游发送的数据,上游的数据会有五个字段。Value字段就是我们需要的。 aid:1||msg:error java.lang.ArrayIndexOutOfBoundsException 对数据进行分割,得到应用的编号和错误的信息。 通过应用编号获得应用所有的规则信息。 迭代所有的规则信息,如果有一个规则被触发了,直接返回这个规则的编号。 返回规则编号,如果不是0000,就向下游发送数据。 3)notifyBolt 发送短信和发送邮件等信息 先判断这个应用的这个规则,是否是在五分钟内已经发送过短信。 如果没有发过,发送短信(聚合数据API,3分钱一条)、发送邮件(自己编写) 拼装以下触发规则的信息,准备保存到数据库。 4)save2db 将触发规则的信息保存到数据库 传智播客·黑马程序员郑州校区地址 河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
|