A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 我是楠楠 于 2018-10-10 16:16 编辑

【郑州校区】Storm学习笔记之案例:实时日志监控告警系统

8.1、课程目标
  • 介绍企业监控系统的一种实现方式
  • 集成Flume+Kafka+Storm+Redis
  • 配置Flume脚本及自定义Flume的拦截器
  • 规划Kafka集群及设计Topic的分片&副本数量
  • Storm整合Mysql数据库
  • Storm集成发送短信、邮件功能
  • 定时将数据从mysql数据中更新到Storm程序
  • 学生动手开发一套流式计算的程序
8.2、开发步骤
1)创建数据库的表架构、初始化业务系统名称、业务系统需要监控的字段、业务系统的开发人员(手机号、邮箱地址)
2)编写Flume脚本去收集数据
3)创建Kafka的topic
4)编写Storm程序。
8.3、数据库表结构1)用户表(开发人员表)
用户编号、用户名称、用户手机号、用户邮箱地址、是否可用
2)应用程序表
用来保存应用的信息,包括应用名称、应用描述、应用是否在线等信息
3)规则表(每个应用系统要监控哪些规则)
4)结果表
用来保存触发规则后的记录,包括告警编号、是否短信告知、是否邮件告知、告警明细等信息。
8.4、Flume+Kafka整合
1)启动zookeeper
2)启动 Kafka
3)创建 Kafka Topic
        kafka-topics.sh --create --zookeeper zk01:2181 --topic system_log --partitions 6 --replication-factor 2
4)配置 Flume配置文件
[AppleScript] 纯文本查看 复制代码
[root@node01 nginx_flume_kafka]# cat exec.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

 

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /export/data/flume/click_log/data.log

a1.sources.r1.channels = c1

 

a1.channels.c1.type=memory

a1.channels.c1.capacity=10000

a1.channels.c1.transactionCapacity=100

 

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic = system_log

a1.sinks.k1.brokerList = kafka01:9092

a1.sinks.k1.requiredAcks = 1

a1.sinks.k1.batchSize = 20

a1.sinks.k1.channel = c1
5)启动 Flume
[AppleScript] 纯文本查看 复制代码
flume-ng agent -n a1 -c /export/servers/flume/conf -f /export/servers/flume/myconfig/nginx_flume_kafka/exec.conf -Dflume.root.logger=INFO,console
6)启动模拟器生成日志
[AppleScript] 纯文本查看 复制代码
[root@node01 nginx_flume_kafka]# pwd

/export/servers/flume/myconfig/nginx_flume_kafka

[root@node01 nginx_flume_kafka]# cat click_log_out.sh

for((i=0;i<=500000;i++));

do echo "i am lilei "+$i >> /export/data/flume/click_log/data.log;

done
7)启动Kafka Consumer
[AppleScript] 纯文本查看 复制代码
kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning –topic system_log
8.5Flume拦截器使用
1)上传资料中的文件到 flume的lib包下
2)生产模拟日志生成器和flume配置文件到集群上
上传到(没有目录就创建)
/export/servers/flume/myconfig/app_interceptor
3)启动模拟日志生成器
4)创建topic
kafka-topics.sh --create --zookeeper zk01:2181 --topic log_monitor --partitions 6 --replication-factor 2
5)启动Flume
6)启动Kafka 消费者
kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic log_monitor
8.6Flume拦截器实现
1)配置 在flumesource上配置拦截器
2)使用反编译工具查看 AppInterceptorBuilder
        Configure方法在类初始化之后会被调用,由flume框架传入context上下文对象
        从context上下文中获取配置文件中配置的appid配置项
        使用build方法构建AppInterceptor类,并将appid传入到AppInterceptor
3) 使用 反编译工具查看AppInterceptor
        通过构造器获得AppInterceptorBuilder传入的appid
        在拦截器方法调用的时候,对原始数据进行获取,然后添加内容。
8.7 Storm程序worker&Task数量设置
1) KafkaSpout并行度初始化状态
2) KafkaSpout读取数据的5个字段
8.8 业务逻辑
1)KafkaSpout 负责读取数据。并行度设置6个,在一个消费组中启动6个消费者(Task)
2)ProcessBolt 负责检验数据中是否包含关键词
        读取上游发送的数据,上游的数据会有五个字段。Value字段就是我们需要的。
        aid:1||msg:error java.lang.ArrayIndexOutOfBoundsException
        对数据进行分割,得到应用的编号和错误的信息。
        通过应用编号获得应用所有的规则信息。
        迭代所有的规则信息,如果有一个规则被触发了,直接返回这个规则的编号。
        返回规则编号,如果不是0000,就向下游发送数据。
3)notifyBolt 发送短信和发送邮件等信息
        先判断这个应用的这个规则,是否是在五分钟内已经发送过短信。
        如果没有发过,发送短信(聚合数据API,3分钱一条)、发送邮件(自己编写)
        拼装以下触发规则的信息,准备保存到数据库。
4)save2db 将触发规则的信息保存到数据库
传智播客·黑马程序员郑州校区地址
河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
联系电话 0371-56061160/61/62
来校路线  地铁一号线梧桐街站A口出

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马