A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

【郑州校区】基于Apache Storm Trident实时计算开发 下

2.5.5 测试生产Kafka消息
如果能看到控制台不断打印输出以下消息,表示日志已经生产到kafka指定的topic中
[AppleScript] 纯文本查看 复制代码
2018-12-25 08:17||14.5.1.156||/trident/admin/ttt.html
2018-12-25 08:17||16.1.3.155||/trident/admin/taaa.html
2018-12-25 08:17||16.0.0.155||/trident/admin/att.html
2018-12-25 08:17||13.2.3.157||/trident/admin/katk.html
2018-12-25 08:17||14.2.1.156||/trident/admin/kaka.html
2018-12-25 08:17||10.0.3.155||/trident/admin/akta.html
2018-12-25 08:17||15.3.1.155||/trident/admin/att.html
2018-12-25 08:17||12.3.5.156||/trident/admin/ktaa.html
2018-12-25 08:17||14.3.2.157||/trident/admin/aatt.html
2018-12-25 08:17||15.4.4.156||/trident/admin/taaa.html
2018-12-25 08:17||12.5.0.157||/trident/admin/atka.html
2018-12-25 08:17||13.2.3.156||/trident/admin/tkat.html
2018-12-25 08:17||13.3.0.156||/trident/admin/kaa.html
2018-12-25 08:17||11.0.1.156||/trident/admin/kta.html
2018-12-25 08:17||14.4.3.157||/trident/admin/kttt.html
在kafka中,使用以下命令检测是否能够消费topic中的数据
[AppleScript] 纯文本查看 复制代码
bin/kafka-console-consumer.sh --zookeeper node-1:2181 --topic visit_url --from-beginning
2.5.6 Trident整合Kafka
导入依赖
千万注意kafka版本号要与安装的版本号一致,否则程序无法运行
[AppleScript] 纯文本查看 复制代码
<dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>
Trident整合Kafka
[AppleScript] 纯文本查看 复制代码
ZkHosts zkHosts = new ZkHosts("node-1:2181,node-2:2181,node-3:281");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "visit_url");
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
2.5.7 防恶意服务器攻击Trident拓扑结构图
2.5.8 Trident实时程序开发
JedisPoolClient.java,使用Jedis连接池获取jedis连接操作Redis
[AppleScript] 纯文本查看 复制代码
package com.itheima.trident;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisPoolClient {

    private static JedisPool pool = null;

    static {
        if (pool == null) {
            JedisPoolConfig config = new JedisPoolConfig();
            //控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
            //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
            config.setMaxTotal(50);
            //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
            config.setMaxIdle(5);
            //表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;单位毫秒
            //小于零:阻塞不确定的时间,  默认-1
            config.setMaxWaitMillis(1000 * 100);
            //在borrow(引入)一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
            config.setTestOnBorrow(true);
            //return 一个jedis实例给pool时,是否检查连接可用性(ping())
            config.setTestOnReturn(true);
            pool = new JedisPool(config, "node-1", 6379, 2000);
        }
    }

    public static Jedis getJedis() {
        return pool.getResource();
    }

    public static void returnResource(Jedis jedis) {
        if (jedis != null) {
            jedis.close();
        }
    }
}
IpWatcherTopology.java,Trident的实现都在这个文件中
[AppleScript] 纯文本查看 复制代码
package com.itheima.trident;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.Jedis;

import java.nio.charset.Charset;

public class IpWatcherTopology {
    public static void main(String[] args) {

        // 配置ZooKeeper集群地址
        ZkHosts zkHosts = new ZkHosts("node-1:2181,node-2:2181,node-3:281");
        // 设置Kafka配置,指定消费的url
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "visit_url");
        // 创建KafkaSpout
        OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

        // 创建TridentTopology
        TridentTopology tridentTopology = new TridentTopology();
        // 基于KafkaSpout创建Trident流
        tridentTopology.newStream("event", kafkaSpout)
                // 指定对输入的bytes字段进行SplitFunc运行,并生成ip字段和url的tuple
                .each(new Fields("bytes"), new SplitFunc(), new Fields("ip", "url"))
                // 按照ip、url进行分组
                .groupBy(new Fields("ip", "url"))
                // 对分组内的数据进行聚合运行,输出ip1、url1、count(注意:输出的field不能和输入的field名称一致
                .aggregate(new Fields("ip", "url"), new SumAggregate(), new Fields("ip1", "url1", "count"))
                // 统计后的ip、url、计数保存到redis中
                .each(new Fields("ip1", "url1", "count"), new RedisFunc(), new Fields("ip:url", "totalCount"))
                // 检查ip、url是否访问异常,如果出现异常放入黑名单
                .each(new Fields("ip:url", "totalCount"), new CheckIpFunc(), new Fields());

        StormTopology stormTopology = tridentTopology.build();

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("ipWatcher", new Config(), stormTopology);
    }

    /**
     * 切割消息,获取到IP和URL
     */
    private static class SplitFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String message = new String(tuple.getBinary(0), Charset.forName("utf-8"));
            String[] fields = message.split("\\|\\|");

            if(fields.length >= 3) {
                System.out.println(fields[1]);
                collector.emit(new Values(fields[1], fields[2]));
            }
        }
    }

    /**
     * 聚合计算
     */
    private static class SumAggregate extends BaseAggregator<SumAggregate.IpCount> {

        /**
         * 用来保存分组的中间结果
         */
        static class IpCount {
            String ip;
            String url;
            long count;
        }

        @Override
        public IpCount init(Object batchId, TridentCollector collector) {
            return new IpCount();
        }

        // aggregate会被调用多次,每一个组内的数据都会被用于计算
        @Override
        public void aggregate(IpCount ipCount, TridentTuple tuple, TridentCollector collector) {
            if(StringUtils.isNotBlank(ipCount.ip)) {
                ipCount.count += 1;
            }
            else {
                ipCount.ip = tuple.getString(0);
                ipCount.url = tuple.getString(1);
                ipCount.count = 1;
            }
        }

        // 聚合计算完后,需要将数据发送给下游
        @Override
        public void complete(IpCount ipCount, TridentCollector collector) {
            collector.emit(new Values(ipCount.ip, ipCount.url, ipCount.count));
        }
    }

    private static class RedisFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            // 从Jedis连接池中获取Jedis连接
            Jedis jedis = JedisPoolClient.getJedis();
            jedis.select(0);

            String key = tuple.getString(0) + ":" + tuple.getString(1);

            if(jedis.exists(key)) {
                jedis.incrBy(key, tuple.getLong(2));
            }
            else {
                // 设置监听60秒
                jedis.setex(key, 60, tuple.getLong(2).toString());
            }

            String newCount = jedis.get(key);
            JedisPoolClient.returnResource(jedis);

            collector.emit(new Values(key, Long.parseLong(newCount)));
        }
    }

    private static class CheckIpFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            Long totalCount = tuple.getLong(1);

            if(totalCount > 5) {
                Jedis jedis = JedisPoolClient.getJedis();
                jedis.select(1);

                String ip = tuple.getString(0).split(":")[0];

                if(!jedis.exists(ip)) {
                    jedis.setex(ip, 1800, "1");
                    System.out.println(ip + "加入黑名单");
                }
                JedisPoolClient.returnResource(jedis);
            }
        }
    }
}
2.5.9 测试
启动Redis,直接运行Trident,可以看到程序不断在redis中写入数据。
db0中存放的是所有ip访问的url次数数据
db1中存放的黑名单,也就是“监狱”
2.6 关于Trident的并行度设置
Trident的并行度可以在每一个Stream的操作后设置它的并行度数量:
[AppleScript] 纯文本查看 复制代码
parallelismHint(4)
这个配置就表示需要几个executor来执行这个Stream操作,而每个executor就对应一个线程。而关于worker的设置,和storm保持一致,在config中可以设置worker的数量。
[AppleScript] 纯文本查看 复制代码
config.setNumWorkers(2);
以下代码是设置trident的并行度:
[AppleScript] 纯文本查看 复制代码
package com.itheima.trident;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.Jedis;

import java.nio.charset.Charset;

public class IpWatcherTopology {
    public static void main(String[] args) {

        // 配置ZooKeeper集群地址
        ZkHosts zkHosts = new ZkHosts("node-1:2181,node-2:2181,node-3:281");
        // 设置Kafka配置,指定消费的url
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, "visit_url");
        // 创建KafkaSpout
        OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

        // 创建TridentTopology
        TridentTopology tridentTopology = new TridentTopology();
        // 基于KafkaSpout创建Trident流
        tridentTopology.newStream("event", kafkaSpout).parallelismHint(2)
                // 指定对输入的bytes字段进行SplitFunc运行,并生成ip字段和url的tuple
                .each(new Fields("bytes"), new SplitFunc(), new Fields("ip", "url")).parallelismHint(2)
                // 按照ip、url进行分组
                .groupBy(new Fields("ip", "url"))
                // 对分组内的数据进行聚合运行,输出ip1、url11、count(注意:输出的field不能和输入的field名称一致
                .aggregate(new Fields("ip", "url"), new SumAggregate(), new Fields("ip1", "url1", "count")).parallelismHint(2)
                // 统计后的ip、url、计数保存到redis中
                .each(new Fields("ip1", "url1", "count"), new RedisFunc(), new Fields("ip:url", "totalCount")).parallelismHint(4)
                // 检查ip、url是否访问异常,如果出现异常放入黑名单
                .each(new Fields("ip:url", "totalCount"), new CheckIpFunc(), new Fields()).parallelismHint(2);

        StormTopology stormTopology = tridentTopology.build();

        Config config = new Config();
        config.setNumWorkers(3);

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("ipWatcher", , stormTopology);
    }
这里一共设置了12个线程,3个worker。
3. 总结
​        通过本次Trident的案例,我们可以看到,Trident编程要比Storm简洁很多,而且它是以Batch为单位来对数据进行处理,这样大大提高了执行效率,也减少了程序编写的复杂度。但是我们也发现了不足的地方,每一个Stream操作,都要实现自定义的方法,这种方式对比Spark Streaming体验就要差很多了。不过,优点是,它是基于Java编写的,对于熟悉Java开发的同学,或者与Java库的对接会更加友好。
以下为编写Trident程序的要点:
  • 设计整合外部输入源的SPOUT
  • 设计Trident的Topology
  • 设计每一个Stream操作的输入和输出
  • 设计每个Stream操作的处理函数(记住:这个函数是在处理每一个batch的时候,不断被调用的)


1 个回复

倒序浏览
您需要登录后才可以回帖 登录 | 加入黑马