【郑州校区】基于Apache Storm Trident实时计算开发 下
2.5.5 测试生产Kafka消息如果能看到控制台不断打印输出以下消息,表示日志已经生产到kafka指定的topic中[AppleScript] 纯文本查看 复制代码 2018-12-25 08:17||14.5.1.156||/trident/admin/ttt.html
2018-12-25 08:17||16.1.3.155||/trident/admin/taaa.html
2018-12-25 08:17||16.0.0.155||/trident/admin/att.html
2018-12-25 08:17||13.2.3.157||/trident/admin/katk.html
2018-12-25 08:17||14.2.1.156||/trident/admin/kaka.html
2018-12-25 08:17||10.0.3.155||/trident/admin/akta.html
2018-12-25 08:17||15.3.1.155||/trident/admin/att.html
2018-12-25 08:17||12.3.5.156||/trident/admin/ktaa.html
2018-12-25 08:17||14.3.2.157||/trident/admin/aatt.html
2018-12-25 08:17||15.4.4.156||/trident/admin/taaa.html
2018-12-25 08:17||12.5.0.157||/trident/admin/atka.html
2018-12-25 08:17||13.2.3.156||/trident/admin/tkat.html
2018-12-25 08:17||13.3.0.156||/trident/admin/kaa.html
2018-12-25 08:17||11.0.1.156||/trident/admin/kta.html
2018-12-25 08:17||14.4.3.157||/trident/admin/kttt.html 在kafka中,使用以下命令检测是否能够消费topic中的数据[AppleScript] 纯文本查看 复制代码 bin/kafka-console-consumer.sh --zookeeper node-1:2181 --topic visit_url --from-beginning 2.5.6 Trident整合Kafka导入依赖 千万注意kafka版本号要与安装的版本号一致,否则程序无法运行 [AppleScript] 纯文本查看 复制代码 <dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build> Trident整合Kafka [AppleScript] 纯文本查看 复制代码 ZkHosts zkHosts = new ZkHosts("node-1:2181,node-2:2181,node-3:281");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "visit_url");
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); 2.5.7 防恶意服务器攻击Trident拓扑结构图
2.5.8 Trident实时程序开发JedisPoolClient.java,使用Jedis连接池获取jedis连接操作Redis [AppleScript] 纯文本查看 复制代码 package com.itheima.trident;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisPoolClient {
private static JedisPool pool = null;
static {
if (pool == null) {
JedisPoolConfig config = new JedisPoolConfig();
//控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
//如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(50);
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;单位毫秒
//小于零:阻塞不确定的时间, 默认-1
config.setMaxWaitMillis(1000 * 100);
//在borrow(引入)一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
//return 一个jedis实例给pool时,是否检查连接可用性(ping())
config.setTestOnReturn(true);
pool = new JedisPool(config, "node-1", 6379, 2000);
}
}
public static Jedis getJedis() {
return pool.getResource();
}
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedis.close();
}
}
} IpWatcherTopology.java,Trident的实现都在这个文件中 [AppleScript] 纯文本查看 复制代码 package com.itheima.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import java.nio.charset.Charset;
public class IpWatcherTopology {
public static void main(String[] args) {
// 配置ZooKeeper集群地址
ZkHosts zkHosts = new ZkHosts("node-1:2181,node-2:2181,node-3:281");
// 设置Kafka配置,指定消费的url
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "visit_url");
// 创建KafkaSpout
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
// 创建TridentTopology
TridentTopology tridentTopology = new TridentTopology();
// 基于KafkaSpout创建Trident流
tridentTopology.newStream("event", kafkaSpout)
// 指定对输入的bytes字段进行SplitFunc运行,并生成ip字段和url的tuple
.each(new Fields("bytes"), new SplitFunc(), new Fields("ip", "url"))
// 按照ip、url进行分组
.groupBy(new Fields("ip", "url"))
// 对分组内的数据进行聚合运行,输出ip1、url1、count(注意:输出的field不能和输入的field名称一致
.aggregate(new Fields("ip", "url"), new SumAggregate(), new Fields("ip1", "url1", "count"))
// 统计后的ip、url、计数保存到redis中
.each(new Fields("ip1", "url1", "count"), new RedisFunc(), new Fields("ip:url", "totalCount"))
// 检查ip、url是否访问异常,如果出现异常放入黑名单
.each(new Fields("ip:url", "totalCount"), new CheckIpFunc(), new Fields());
StormTopology stormTopology = tridentTopology.build();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("ipWatcher", new Config(), stormTopology);
}
/**
* 切割消息,获取到IP和URL
*/
private static class SplitFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String message = new String(tuple.getBinary(0), Charset.forName("utf-8"));
String[] fields = message.split("\\|\\|");
if(fields.length >= 3) {
System.out.println(fields[1]);
collector.emit(new Values(fields[1], fields[2]));
}
}
}
/**
* 聚合计算
*/
private static class SumAggregate extends BaseAggregator<SumAggregate.IpCount> {
/**
* 用来保存分组的中间结果
*/
static class IpCount {
String ip;
String url;
long count;
}
@Override
public IpCount init(Object batchId, TridentCollector collector) {
return new IpCount();
}
// aggregate会被调用多次,每一个组内的数据都会被用于计算
@Override
public void aggregate(IpCount ipCount, TridentTuple tuple, TridentCollector collector) {
if(StringUtils.isNotBlank(ipCount.ip)) {
ipCount.count += 1;
}
else {
ipCount.ip = tuple.getString(0);
ipCount.url = tuple.getString(1);
ipCount.count = 1;
}
}
// 聚合计算完后,需要将数据发送给下游
@Override
public void complete(IpCount ipCount, TridentCollector collector) {
collector.emit(new Values(ipCount.ip, ipCount.url, ipCount.count));
}
}
private static class RedisFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
// 从Jedis连接池中获取Jedis连接
Jedis jedis = JedisPoolClient.getJedis();
jedis.select(0);
String key = tuple.getString(0) + ":" + tuple.getString(1);
if(jedis.exists(key)) {
jedis.incrBy(key, tuple.getLong(2));
}
else {
// 设置监听60秒
jedis.setex(key, 60, tuple.getLong(2).toString());
}
String newCount = jedis.get(key);
JedisPoolClient.returnResource(jedis);
collector.emit(new Values(key, Long.parseLong(newCount)));
}
}
private static class CheckIpFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
Long totalCount = tuple.getLong(1);
if(totalCount > 5) {
Jedis jedis = JedisPoolClient.getJedis();
jedis.select(1);
String ip = tuple.getString(0).split(":")[0];
if(!jedis.exists(ip)) {
jedis.setex(ip, 1800, "1");
System.out.println(ip + "加入黑名单");
}
JedisPoolClient.returnResource(jedis);
}
}
}
} 2.5.9 测试启动Redis,直接运行Trident,可以看到程序不断在redis中写入数据。
db0中存放的是所有ip访问的url次数数据 db1中存放的黑名单,也就是“监狱” 2.6 关于Trident的并行度设置Trident的并行度可以在每一个Stream的操作后设置它的并行度数量: [AppleScript] 纯文本查看 复制代码 parallelismHint(4) 这个配置就表示需要几个executor来执行这个Stream操作,而每个executor就对应一个线程。而关于worker的设置,和storm保持一致,在config中可以设置worker的数量。 [AppleScript] 纯文本查看 复制代码 config.setNumWorkers(2); 以下代码是设置trident的并行度: [AppleScript] 纯文本查看 复制代码 package com.itheima.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import java.nio.charset.Charset;
public class IpWatcherTopology {
public static void main(String[] args) {
// 配置ZooKeeper集群地址
ZkHosts zkHosts = new ZkHosts("node-1:2181,node-2:2181,node-3:281");
// 设置Kafka配置,指定消费的url
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "visit_url");
// 创建KafkaSpout
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
// 创建TridentTopology
TridentTopology tridentTopology = new TridentTopology();
// 基于KafkaSpout创建Trident流
tridentTopology.newStream("event", kafkaSpout).parallelismHint(2)
// 指定对输入的bytes字段进行SplitFunc运行,并生成ip字段和url的tuple
.each(new Fields("bytes"), new SplitFunc(), new Fields("ip", "url")).parallelismHint(2)
// 按照ip、url进行分组
.groupBy(new Fields("ip", "url"))
// 对分组内的数据进行聚合运行,输出ip1、url11、count(注意:输出的field不能和输入的field名称一致
.aggregate(new Fields("ip", "url"), new SumAggregate(), new Fields("ip1", "url1", "count")).parallelismHint(2)
// 统计后的ip、url、计数保存到redis中
.each(new Fields("ip1", "url1", "count"), new RedisFunc(), new Fields("ip:url", "totalCount")).parallelismHint(4)
// 检查ip、url是否访问异常,如果出现异常放入黑名单
.each(new Fields("ip:url", "totalCount"), new CheckIpFunc(), new Fields()).parallelismHint(2);
StormTopology stormTopology = tridentTopology.build();
Config config = new Config();
config.setNumWorkers(3);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("ipWatcher", , stormTopology);
} 这里一共设置了12个线程,3个worker。 3. 总结 通过本次Trident的案例,我们可以看到,Trident编程要比Storm简洁很多,而且它是以Batch为单位来对数据进行处理,这样大大提高了执行效率,也减少了程序编写的复杂度。但是我们也发现了不足的地方,每一个Stream操作,都要实现自定义的方法,这种方式对比Spark Streaming体验就要差很多了。不过,优点是,它是基于Java编写的,对于熟悉Java开发的同学,或者与Java库的对接会更加友好。 以下为编写Trident程序的要点:
|