本帖最后由 我是楠楠 于 2020-4-27 14:32 编辑
【郑州校区】基于Apache Storm Trident实时计算开发 上
1. 概述 Apache Storm是一个经典的分布式流式计算编程框架,但编写程序比较麻烦。通常,一个基本的基于Storm的流式计算程序需要有以下几个部分:一个Spout、若干个Bolt、一个Topology。Spout和Bolt程序编写,以及Topology的编排不是太友好。今天,我要介绍的是,基于Storm的更高层次的抽象——Trident。它的API要比Storm更加简洁,而且支持迭代计算,不需要反复的创建Bolt来进行操作。 2. 主要内容2.1 Trident程序架构
简单解释下上图: TridentSpout负责读取数据源,并将数据以batch为基本单位,发射到下游,一个batch包含若干个tuple 每一个Trident的Stream操作都是针对一个batch来进行操作的,同时一些Stream操作需要实现特定的接口来自定义计算方式,并且需要在Stream操作中定义输入和输出的Field字段名。Stream操作可以将一个batch转换为另外一个batch(其实内部是tuple转换成另外一个tuple) groupBy操作,groupBy操作会将指定字段的相同的tuple放到一个组里,方便后边的聚合算子(aggregate)进行计算,这个操作对tuple的结构并不会有影响
2.2 Trident程序设计编写基于Storm Trident的程序,可以参考以下几个步骤: [AppleScript] 纯文本查看 复制代码 tridentTopology.newStream("textLineSpout", textLineSpout)
.each(new Fields("line"), new SplitFunc(), new Fields("word"))
.groupBy(new Fields("word"))
.aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
.each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
.parallelismHint(1);
创建一个用于发射数据的Spout Trident对Spout有自己的封装,Spout的作用和Storm中一致,它是流式计算的源头,数据都是从Spout发射到其他计算任务的 基于Trident算子构建TridentTopology 前面那段代码中的each、groupBy、aggregate都是针对一个小批量Batch的操作,我们也看到了,这里出现了groupBy和aggregate等批处理计算中的函数,因为Trident可以以Batch的方式来操作流,这样可以在一定程度上提高计算效率 定义用于Trident算子的函数操作(例如:累加、求个数等) 当我们要使用each、aggregate这类操作时,Trident并不知道我们到底要如何进行遍历、分组或者聚合。所以,Trident提供了一系列的接口,这些接口的目的就是要让我们提供具体的实现 将TridentTopology转换为StormTopology TridentTopology最终还是以StormTopology提交到Storm集群上运行,所以,Trident就是一套基于Storm的高层API,它让我们更加方便地编写Storm程序 提交执行
2.3 重要概念2.4 入门案例pom.xml [AppleScript] 纯文本查看 复制代码 <dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build> Topology [AppleScript] 纯文本查看 复制代码 package com.itheima.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
public class WordCountTopology {
public static void main(String[] args) {
// 创建一个Trident的拓扑结构
TridentTopology tridentTopology = new TridentTopology();
// 创建一个模拟发送数据的TridentSpout
FixedBatchSpout textLineSpout = new FixedBatchSpout(new Fields("line"), 10,
new Values("provides support for the Apache Community of Open Source software projects which provide software products for the public good"),
new Values("by collaborative consensus based processes an open pragmatic software license and a desire to create high quality software that leads the way in its field"),
new Values("not simply a group of projects sharing a server but rather a community of developers and users"),
new Values("not simply a group of of of of of of of a a a server server server a of community community community"));
// textLineSpout.setCycle(true);
// 创建一个TridentStream
tridentTopology.newStream("textLineSpout", textLineSpout)
// 遍历每一个文本行,并指定使用SplitFunc进行处理,输出字段名为word的tuple元组
.each(new Fields("line"), new SplitFunc(), new Fields("word"))
// 根据tuple的word字段进行分组
.groupBy(new Fields("word"))
// 指定聚合需要处理的tuple字段,并使用SumFunc进行聚合计算,输出字段为duWord何count的tuple元组
.aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
// 指定将tuple的duWord,count交给PrintFunc进行处理,输出空字段
.each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
.parallelismHint(1);
// 基于TridentTopology构建StormTopology
StormTopology stormTopology = tridentTopology.build();
Config config = new Config();
// 执行本地提交
LocalCluster local = new LocalCluster();
local.submitTopology("wordcount", config, stormTopology);
}
public static class SplitFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
// 实现tuple的转换
String line = tuple.getString(0);
String[] wordArr = line.split(" ");
for (String word : wordArr) {
collector.emit(new Values(word, 1));
}
}
}
public static class SumFunc extends BaseAggregator {
static class WordCountBean{
String word = "";
Long count = 0L;
}
@Override
public Object init(Object batchId, TridentCollector collector) {
// 创建用于分组计算的对象
return new WordCountBean();
}
@Override
public void aggregate(Object val, TridentTuple tuple, TridentCollector collector) {
if(val instanceof WordCountBean) {
// 累加单词数量
WordCountBean wc = (WordCountBean)val;
// 第一次执行初始化单词本身
if(StringUtils.isNotBlank(wc.word)) {
wc.word = tuple.getString(0);
}
wc.count++;
}
}
@Override
public void complete(Object val, TridentCollector collector) {
if(val instanceof WordCountBean) {
// 将最终计算好的结果发射出去
WordCountBean wc = (WordCountBean)val;
collector.emit(new Values(wc.word, wc.count));
}
}
}
private static class PrintFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println(tuple.getString(0) + " " + tuple.getLong(1));
collector.emit(new Values(""));
}
}
} 2.5 实时防恶意服务器攻击系统2.5.1 需求分析 这个实时系统,主要是检测在一定时间内,某个IP访问某个些页面超过了一定的访问量,就会将IP拉入到黑名单。所以,每当一个IP访问了一次页面,就会将次IP的访问次数进行累加,并检测到超过一定数量的访问IP,就加入到“监狱”中让这个IP“反省”半个小时。本项目需要用到的技术点如下: Kafka Redis Apache Storm Trident Spring Boot Spring Data Redis
2.5.2 架构图
2.5.3 创建topic启动Kafka集群 创建名字为visit_url的topic 指定partititon数量为3、副本数量为3
2.5.4 模拟发送数据到Kafka这里使用Spring Boot整合Kafka定时生成模拟测试的日志到Kafka中 pom.xml [AppleScript] 纯文本查看 复制代码 <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.6</version>
</dependency>
</dependencies> application.properties [AppleScript] 纯文本查看 复制代码 server.port=9999
# Kafka服务器
spring.kafka.producer.bootstrap-servers=node-1:9092,node-2:9092,node-3:9093
# key-value序列化、反序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer KafkaDataGen.java,定期随机生产消息到Kafka [AppleScript] 纯文本查看 复制代码 package com.itheima;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.text.RandomStringGenerator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@EnableScheduling
public class KafkaDataGen {
@Autowired
private KafkaTemplate kafkaTemplate;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
private RandomStringGenerator.Builder buider = new RandomStringGenerator.Builder();
/**
* 定时生产消息到Kafka
*
* 在Kafka集群中使用下列命令测试topic的消息是否能够被消费
* bin/kafka-console-consumer.sh --zookeeper node-1:2181 --topic visit_url --from-beginning
*/
@Scheduled(cron = "* * * * * ?")
public void gen() {
String date = sdf.format(new Date());
String ip = randomIp();
String pageUrl = randomUrl();
String msg = date + "||" + ip + "||" + pageUrl;
System.out.println(msg);
kafkaTemplate.send("visit_url", msg);
}
/**
* 生成一样的数据
*/
@Scheduled(cron = "* * * * * ?")
public void genViolation() {
String date = sdf.format(new Date());
String ip = randomIp1();
String pageUrl = "/trident/admin/index.html";
String msg = date + "||" + ip + "||" + pageUrl;
System.out.println(msg);
kafkaTemplate.send("visit_url", msg);
}
// 随机生成访问的URL
private String randomUrl() {
String prefix = "/trident/admin/";
String url = buider.selectFrom('a', 't', 'k').build().generate(3, 4);
return prefix + url + ".html";
}
// 随机生成IP地址
private String randomIp() {
int ip1 = RandomUtils.nextInt(10, 17);
int ip2 = RandomUtils.nextInt(0, 6);
int ip3 = RandomUtils.nextInt(0, 6);
int ip4 = RandomUtils.nextInt(155, 158);
return ip1 + "." + ip2 + "." + ip3 + "." + ip4;
}
// 随机生成IP地址
private String randomIp1() {
int ip1 = 10;
int ip2 = 0;
int ip3 = 1;
int ip4 = RandomUtils.nextInt(155, 156);
return ip1 + "." + ip2 + "." + ip3 + "." + ip4;
}
} Application.java,启动类 [AppleScript] 纯文本查看 复制代码 package com.itheima;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
|