黑马程序员技术交流社区

标题: 【郑州校区】基于Apache Storm Trident实时计算开发 上 [打印本页]

作者: 我是楠楠    时间: 2020-4-27 14:31
标题: 【郑州校区】基于Apache Storm Trident实时计算开发 上
本帖最后由 我是楠楠 于 2020-4-27 14:32 编辑

【郑州校区】基于Apache Storm Trident实时计算开发 上

1. 概述
​        Apache Storm是一个经典的分布式流式计算编程框架,但编写程序比较麻烦。通常,一个基本的基于Storm的流式计算程序需要有以下几个部分:一个Spout、若干个Bolt、一个Topology。Spout和Bolt程序编写,以及Topology的编排不是太友好。今天,我要介绍的是,基于Storm的更高层次的抽象——Trident。它的API要比Storm更加简洁,而且支持迭代计算,不需要反复的创建Bolt来进行操作。
2. 主要内容2.1 Trident程序架构
简单解释下上图:
2.2 Trident程序设计
编写基于Storm Trident的程序,可以参考以下几个步骤:
[AppleScript] 纯文本查看 复制代码
tridentTopology.newStream("textLineSpout", textLineSpout)
    .each(new Fields("line"), new SplitFunc(), new Fields("word"))
    .groupBy(new Fields("word"))
    .aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
    .each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
    .parallelismHint(1);

2.3 重要概念2.4 入门案例
pom.xml
[AppleScript] 纯文本查看 复制代码
<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>
Topology
[AppleScript] 纯文本查看 复制代码
package com.itheima.trident;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.HashMap;

public class WordCountTopology {

    public static void main(String[] args) {

        // 创建一个Trident的拓扑结构
        TridentTopology tridentTopology = new TridentTopology();

        // 创建一个模拟发送数据的TridentSpout
        FixedBatchSpout textLineSpout = new FixedBatchSpout(new Fields("line"), 10,
                new Values("provides support for the Apache Community of Open Source software projects which provide software products for the public good"),
                new Values("by collaborative consensus based processes an open pragmatic software license and a desire to create high quality software that leads the way in its field"),
                new Values("not simply a group of projects sharing a server but rather a community of developers and users"),
                new Values("not simply a group of of of of of of of a a a server server server a of community community community"));
//        textLineSpout.setCycle(true);

        // 创建一个TridentStream
        tridentTopology.newStream("textLineSpout", textLineSpout)
                // 遍历每一个文本行,并指定使用SplitFunc进行处理,输出字段名为word的tuple元组
                .each(new Fields("line"), new SplitFunc(), new Fields("word"))
                // 根据tuple的word字段进行分组
                .groupBy(new Fields("word"))
                // 指定聚合需要处理的tuple字段,并使用SumFunc进行聚合计算,输出字段为duWord何count的tuple元组
                .aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
                // 指定将tuple的duWord,count交给PrintFunc进行处理,输出空字段
                .each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
                .parallelismHint(1);

        // 基于TridentTopology构建StormTopology
        StormTopology stormTopology = tridentTopology.build();

        Config config = new Config();

        // 执行本地提交
        LocalCluster local = new LocalCluster();
        local.submitTopology("wordcount", config, stormTopology);

    }

    public static class SplitFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            // 实现tuple的转换
            String line = tuple.getString(0);
            String[] wordArr = line.split(" ");

            for (String word : wordArr) {
                collector.emit(new Values(word, 1));
            }
        }
    }

    public static class SumFunc extends BaseAggregator {

        static class WordCountBean{
            String word = "";
            Long count = 0L;

        }

        @Override
        public Object init(Object batchId, TridentCollector collector) {
            // 创建用于分组计算的对象
            return new WordCountBean();
        }

        @Override
        public void aggregate(Object val, TridentTuple tuple, TridentCollector collector) {
            if(val instanceof WordCountBean) {
                // 累加单词数量
                WordCountBean wc = (WordCountBean)val;
                // 第一次执行初始化单词本身
                if(StringUtils.isNotBlank(wc.word)) {
                    wc.word = tuple.getString(0);
                }
                wc.count++;
            }
        }

        @Override
        public void complete(Object val, TridentCollector collector) {
            if(val instanceof WordCountBean) {
                // 将最终计算好的结果发射出去
                WordCountBean wc = (WordCountBean)val;
                collector.emit(new Values(wc.word, wc.count));
            }
        }
    }

    private static class PrintFunc extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            System.out.println(tuple.getString(0) + " " + tuple.getLong(1));
            collector.emit(new Values(""));
        }
    }
}
2.5 实时防恶意服务器攻击系统2.5.1 需求分析
​        这个实时系统,主要是检测在一定时间内,某个IP访问某个些页面超过了一定的访问量,就会将IP拉入到黑名单。所以,每当一个IP访问了一次页面,就会将次IP的访问次数进行累加,并检测到超过一定数量的访问IP,就加入到“监狱”中让这个IP“反省”半个小时。本项目需要用到的技术点如下:
2.5.2 架构图
2.5.3 创建topic2.5.4 模拟发送数据到Kafka
这里使用Spring Boot整合Kafka定时生成模拟测试的日志到Kafka中
pom.xml
[AppleScript] 纯文本查看 复制代码
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.0.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-text</artifactId>
        <version>1.6</version>
    </dependency>
</dependencies>
application.properties
[AppleScript] 纯文本查看 复制代码
server.port=9999
# Kafka服务器
spring.kafka.producer.bootstrap-servers=node-1:9092,node-2:9092,node-3:9093
# key-value序列化、反序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
KafkaDataGen.java,定期随机生产消息到Kafka
[AppleScript] 纯文本查看 复制代码
package com.itheima;

import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.text.RandomStringGenerator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@EnableScheduling
public class KafkaDataGen {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    private RandomStringGenerator.Builder buider = new RandomStringGenerator.Builder();

    /**
     * 定时生产消息到Kafka
     *
     * 在Kafka集群中使用下列命令测试topic的消息是否能够被消费
     * bin/kafka-console-consumer.sh --zookeeper node-1:2181 --topic visit_url --from-beginning
     */
    @Scheduled(cron = "* * * * * ?")
    public void gen() {
        String date = sdf.format(new Date());
        String ip = randomIp();
        String pageUrl = randomUrl();

        String msg = date + "||" + ip + "||" + pageUrl;
        System.out.println(msg);

        kafkaTemplate.send("visit_url", msg);
    }

    /**
     * 生成一样的数据
     */
    @Scheduled(cron = "* * * * * ?")
    public void genViolation() {
        String date = sdf.format(new Date());
        String ip = randomIp1();
        String pageUrl = "/trident/admin/index.html";

        String msg = date + "||" + ip + "||" + pageUrl;
        System.out.println(msg);

        kafkaTemplate.send("visit_url", msg);
    }

    // 随机生成访问的URL
    private String randomUrl() {
        String prefix = "/trident/admin/";
        String url = buider.selectFrom('a', 't', 'k').build().generate(3, 4);

        return prefix + url + ".html";
    }

    // 随机生成IP地址
    private String randomIp() {
        int ip1 = RandomUtils.nextInt(10, 17);
        int ip2 = RandomUtils.nextInt(0, 6);
        int ip3 = RandomUtils.nextInt(0, 6);
        int ip4 = RandomUtils.nextInt(155, 158);

        return ip1 + "." + ip2 + "." + ip3 + "." + ip4;
    }

    // 随机生成IP地址
    private String randomIp1() {
        int ip1 = 10;
        int ip2 = 0;
        int ip3 = 1;
        int ip4 = RandomUtils.nextInt(155, 156);

        return ip1 + "." + ip2 + "." + ip3 + "." + ip4;
    }
}
Application.java,启动类
[AppleScript] 纯文本查看 复制代码
package com.itheima;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}






欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2