A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 我是楠楠 于 2020-4-27 14:32 编辑

【郑州校区】基于Apache Storm Trident实时计算开发 上

1. 概述
​        Apache Storm是一个经典的分布式流式计算编程框架,但编写程序比较麻烦。通常,一个基本的基于Storm的流式计算程序需要有以下几个部分:一个Spout、若干个Bolt、一个Topology。Spout和Bolt程序编写,以及Topology的编排不是太友好。今天,我要介绍的是,基于Storm的更高层次的抽象——Trident。它的API要比Storm更加简洁,而且支持迭代计算,不需要反复的创建Bolt来进行操作。
2. 主要内容2.1 Trident程序架构
简单解释下上图:
  • TridentSpout负责读取数据源,并将数据以batch为基本单位,发射到下游,一个batch包含若干个tuple
  • 每一个Trident的Stream操作都是针对一个batch来进行操作的,同时一些Stream操作需要实现特定的接口来自定义计算方式,并且需要在Stream操作中定义输入和输出的Field字段名。Stream操作可以将一个batch转换为另外一个batch(其实内部是tuple转换成另外一个tuple)
  • groupBy操作,groupBy操作会将指定字段的相同的tuple放到一个组里,方便后边的聚合算子(aggregate)进行计算,这个操作对tuple的结构并不会有影响

2.2 Trident程序设计
编写基于Storm Trident的程序,可以参考以下几个步骤:
  • 创建一个TridentTopology
    TridentTopology是Trident自己封装的流式拓扑结构抽象,它可以以链式操作的方式组织流式计算的具体步骤,语法结构对比Storm要更加简单。按照链式操作,将整个流式计算的过程简洁地描述出来。

[AppleScript] 纯文本查看 复制代码
tridentTopology.newStream("textLineSpout", textLineSpout)
    .each(new Fields("line"), new SplitFunc(), new Fields("word"))
    .groupBy(new Fields("word"))
    .aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
    .each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
    .parallelismHint(1);

  • 创建一个用于发射数据的Spout
    Trident对Spout有自己的封装,Spout的作用和Storm中一致,它是流式计算的源头,数据都是从Spout发射到其他计算任务的
  • 基于Trident算子构建TridentTopology
    前面那段代码中的each、groupBy、aggregate都是针对一个小批量Batch的操作,我们也看到了,这里出现了groupBy和aggregate等批处理计算中的函数,因为Trident可以以Batch的方式来操作流,这样可以在一定程度上提高计算效率
  • 定义用于Trident算子的函数操作(例如:累加、求个数等)
    当我们要使用each、aggregate这类操作时,Trident并不知道我们到底要如何进行遍历、分组或者聚合。所以,Trident提供了一系列的接口,这些接口的目的就是要让我们提供具体的实现
  • 将TridentTopology转换为StormTopology
    TridentTopology最终还是以StormTopology提交到Storm集群上运行,所以,Trident就是一套基于Storm的高层API,它让我们更加方便地编写Storm程序
  • 提交执行

2.3 重要概念
  • Trident Topology
    基于Storm Trident的流式计算拓扑图
  • Trident Spout
    Trident封装的Spout,就是流的源头,获取并发射数据的组件
  • Trident Function
    用户Trident流式操作函数。例如:

  • aggregate: 针对Tuple进行聚合运算
  • each: 遍历每一个Tuple,可以进行单独的业务处理
  • groupBy: 根据Tuple进行分组

  • Trident State
    在Trident中,数据是以batch的形式进行计算的,每一个batch都会有自己的一个transaction id。TridentState可以用于容错,如果有数据出现丢失,我们可以利用TridentState可以实现一个Batch的事务处理。

2.4 入门案例
pom.xml
[AppleScript] 纯文本查看 复制代码
<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>
Topology
[AppleScript] 纯文本查看 复制代码
package com.itheima.trident;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.HashMap;

public class WordCountTopology {

    public static void main(String[] args) {

        // 创建一个Trident的拓扑结构
        TridentTopology tridentTopology = new TridentTopology();

        // 创建一个模拟发送数据的TridentSpout
        FixedBatchSpout textLineSpout = new FixedBatchSpout(new Fields("line"), 10,
                new Values("provides support for the Apache Community of Open Source software projects which provide software products for the public good"),
                new Values("by collaborative consensus based processes an open pragmatic software license and a desire to create high quality software that leads the way in its field"),
                new Values("not simply a group of projects sharing a server but rather a community of developers and users"),
                new Values("not simply a group of of of of of of of a a a server server server a of community community community"));
//        textLineSpout.setCycle(true);

        // 创建一个TridentStream
        tridentTopology.newStream("textLineSpout", textLineSpout)
                // 遍历每一个文本行,并指定使用SplitFunc进行处理,输出字段名为word的tuple元组
                .each(new Fields("line"), new SplitFunc(), new Fields("word"))
                // 根据tuple的word字段进行分组
                .groupBy(new Fields("word"))
                // 指定聚合需要处理的tuple字段,并使用SumFunc进行聚合计算,输出字段为duWord何count的tuple元组
                .aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
                // 指定将tuple的duWord,count交给PrintFunc进行处理,输出空字段
                .each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
                .parallelismHint(1);

        // 基于TridentTopology构建StormTopology
        StormTopology stormTopology = tridentTopology.build();

        Config config = new Config();

        // 执行本地提交
        LocalCluster local = new LocalCluster();
        local.submitTopology("wordcount", config, stormTopology);

    }

    public static class SplitFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            // 实现tuple的转换
            String line = tuple.getString(0);
            String[] wordArr = line.split(" ");

            for (String word : wordArr) {
                collector.emit(new Values(word, 1));
            }
        }
    }

    public static class SumFunc extends BaseAggregator {

        static class WordCountBean{
            String word = "";
            Long count = 0L;

        }

        @Override
        public Object init(Object batchId, TridentCollector collector) {
            // 创建用于分组计算的对象
            return new WordCountBean();
        }

        @Override
        public void aggregate(Object val, TridentTuple tuple, TridentCollector collector) {
            if(val instanceof WordCountBean) {
                // 累加单词数量
                WordCountBean wc = (WordCountBean)val;
                // 第一次执行初始化单词本身
                if(StringUtils.isNotBlank(wc.word)) {
                    wc.word = tuple.getString(0);
                }
                wc.count++;
            }
        }

        @Override
        public void complete(Object val, TridentCollector collector) {
            if(val instanceof WordCountBean) {
                // 将最终计算好的结果发射出去
                WordCountBean wc = (WordCountBean)val;
                collector.emit(new Values(wc.word, wc.count));
            }
        }
    }

    private static class PrintFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            System.out.println(tuple.getString(0) + " " + tuple.getLong(1));
            collector.emit(new Values(""));
        }
    }
}
2.5 实时防恶意服务器攻击系统2.5.1 需求分析
​        这个实时系统,主要是检测在一定时间内,某个IP访问某个些页面超过了一定的访问量,就会将IP拉入到黑名单。所以,每当一个IP访问了一次页面,就会将次IP的访问次数进行累加,并检测到超过一定数量的访问IP,就加入到“监狱”中让这个IP“反省”半个小时。本项目需要用到的技术点如下:
  • Kafka
  • Redis
  • Apache Storm Trident
  • Spring Boot
  • Spring Data Redis

2.5.2 架构图
2.5.3 创建topic
  • 启动Kafka集群
  • 创建名字为visit_url的topic
  • 指定partititon数量为3、副本数量为3

2.5.4 模拟发送数据到Kafka
这里使用Spring Boot整合Kafka定时生成模拟测试的日志到Kafka中
pom.xml
[AppleScript] 纯文本查看 复制代码
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.0.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-text</artifactId>
        <version>1.6</version>
    </dependency>
</dependencies>
application.properties
[AppleScript] 纯文本查看 复制代码
server.port=9999
# Kafka服务器
spring.kafka.producer.bootstrap-servers=node-1:9092,node-2:9092,node-3:9093
# key-value序列化、反序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
KafkaDataGen.java,定期随机生产消息到Kafka
[AppleScript] 纯文本查看 复制代码
package com.itheima;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.text.RandomStringGenerator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@EnableScheduling
public class KafkaDataGen {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private RandomStringGenerator.Builder buider = new RandomStringGenerator.Builder();

    /**
     * 定时生产消息到Kafka
     *
     * 在Kafka集群中使用下列命令测试topic的消息是否能够被消费
     * bin/kafka-console-consumer.sh --zookeeper node-1:2181 --topic visit_url --from-beginning
     */
    @Scheduled(cron = "* * * * * ?")
    public void gen() {
        String date = sdf.format(new Date());
        String ip = randomIp();
        String pageUrl = randomUrl();

        String msg = date + "||" + ip + "||" + pageUrl;
        System.out.println(msg);

        kafkaTemplate.send("visit_url", msg);
    }

    /**
     * 生成一样的数据
     */
    @Scheduled(cron = "* * * * * ?")
    public void genViolation() {
        String date = sdf.format(new Date());
        String ip = randomIp1();
        String pageUrl = "/trident/admin/index.html";

        String msg = date + "||" + ip + "||" + pageUrl;
        System.out.println(msg);

        kafkaTemplate.send("visit_url", msg);
    }

    // 随机生成访问的URL
    private String randomUrl() {
        String prefix = "/trident/admin/";
        String url = buider.selectFrom('a', 't', 'k').build().generate(3, 4);

        return prefix + url + ".html";
    }

    // 随机生成IP地址
    private String randomIp() {
        int ip1 = RandomUtils.nextInt(10, 17);
        int ip2 = RandomUtils.nextInt(0, 6);
        int ip3 = RandomUtils.nextInt(0, 6);
        int ip4 = RandomUtils.nextInt(155, 158);

        return ip1 + "." + ip2 + "." + ip3 + "." + ip4;
    }

    // 随机生成IP地址
    private String randomIp1() {
        int ip1 = 10;
        int ip2 = 0;
        int ip3 = 1;
        int ip4 = RandomUtils.nextInt(155, 156);

        return ip1 + "." + ip2 + "." + ip3 + "." + ip4;
    }
}
Application.java,启动类
[AppleScript] 纯文本查看 复制代码
package com.itheima;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

1 个回复

倒序浏览
您需要登录后才可以回帖 登录 | 加入黑马