A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 我是楠楠 黑马粉丝团   /  2018-10-8 14:56  /  2016 人查看  /  0 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

【郑州校区】WordCount案例分析

2.1、 功能说明
设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。
整个topology分为三个部分:
RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。
SplitSentenceBolt:负责将单行文本记录(句子)切分成单词
WordCountBolt:负责对单词的频率进行累加
导入最新的依赖包
[AppleScript] 纯文本查看 复制代码
[/align]<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.1</version>
</dependency>
2.2、TopologyMain 驱动类
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime;


import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;

/**
 * 组装应用程序--驱动类
 */
public class WordCountTopology {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        //1、创建一个job(topology)
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //2、设置job的详细内容
        topologyBuilder.setSpout("ReadFileSpout",new ReadFileSpout(),1);
        topologyBuilder.setBolt("SentenceSplitBolt",new SentenceSplitBolt(),1).shuffleGrouping("ReadFileSpout");
        topologyBuilder.setBolt("WordCountBolt",new WordCountBolt(),1).shuffleGrouping("SentenceSplitBolt");
        //准备配置项
        Config config = new Config();
        config.setDebug(false);
        //3、提交job
        //提交由两种方式:一种本地运行模式、一种集群运行模式。
        if (args != null && args.length > 0) {
            //运行集群模式
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology());
        }
    }
}
2.3、 ReadFileSpout
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * Spout 需要继承一个模板
 */
public class ReadFileSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    /**
     * Map conf 应用程序能够读取的配置文件
     * TopologyContext context 应用程序的上下文
     * SpoutOutputCollector collector Spout输出的数据丢给SpoutOutputCollector。
     */
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        //1、Kafka 连接  / MYSQL 连接  /Redis 连接
        //todo
        //2、将SpoutOutputCollector复制给成员变量
        this.collector = collector;
    }

    /**
     * storm框架有个while循环,一直在nextTuple
     */
    @Override
    public void nextTuple() {
        // 发送数据,使用collector.emit方法
        // Values extends ArrayList<Object>
        collector.emit(new Values("i love u"));
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("biaobai"));
    }
}
2.4、 SentenceSplitBolt
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 切割单词
 */
public class SentenceSplitBolt extends BaseRichBolt {
    private OutputCollector collector;
    /**
     *  初始化方法
     *  Map stormConf 应用能够得到的配置文件
     *  TopologyContext context 上下文 一般没有什么用
     *  OutputCollector collector 数据收集器
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        //todo 连接数据 连接redis 连接hdfs
    }

    /**
     *  有个while不停的调用execute方法,每次调用都会发一个数据进行来。
     */
    @Override
    public void execute(Tuple input) {
//        String sentence = input.getString(0);
        // 底层先通过 biaobai 这个字段在map中找到对应的index角标值,然后再valus中获取对应数据。
        String sentence = input.getStringByField("biaobai");
        // todo 切割
        String[] strings = sentence.split(" ");
        for (String word : strings) {
            // todo 输出数据
            collector.emit(new Values(word,1));
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 声明 输出的是什么字段
        declarer.declare(new Fields("word","num"));
    }
}
2.5、 WordCountBolt
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * 计数
 */
public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Integer> wordCountMap;

    /**
     * 初始化方法
     * Map stormConf 应用能够得到的配置文件
     * TopologyContext context 上下文 一般没有什么用
     * OutputCollector collector 数据收集器
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        //todo 连接数据 连接redis 连接hdfs
        wordCountMap = new HashMap<String, Integer>();
    }

    /**
     * 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer num = input.getIntegerByField("num");
        // 先判断这个单词是否出现过
        if (wordCountMap.containsKey(word)) {
            Integer oldNum = wordCountMap.get(word);
            wordCountMap.put(word, oldNum + num);
        } else {
            wordCountMap.put(word, num);
        }
        System.out.println(wordCountMap);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 声明 输出的是什么字段
        declarer.declare(new Fields("fenshou"));
    }
}
2.6、 pom依赖
[AppleScript] 纯文本查看 复制代码
<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
        <!-- 目前<scope>可以使用5个值:
    * compile,缺省值,适用于所有阶段,会随着项目一起发布。
    * provided,类似compile,期望JDK、容器或使用者会提供这个依赖。如servlet.jar。
    * runtime,只在运行时使用,如JDBC驱动,适用运行和测试阶段。
    * test,只在测试时使用,用于编译和运行测试代码。不会随项目发布。
    * system,类似provided,需要显式提供包含依赖的jar,Maven不会在Repository中查找它。  -->
        <!--<scope>provided</scope>-->
    </dependency>
</dependencies>
2.7、 项目编译
[AppleScript] 纯文本查看 复制代码
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>realtime.WordCountTopology</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.7.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
2.8、 Component生命周期 Spout生命周期
Bolt生命周期
Bolt的两个抽象类
BaseRichBolt 需要手动调ack方法
BaseBasicBolt由storm框架自动调ack方法
详见《第九章》
2.9、 StreamGrouping
MKgrouper
[AppleScript] 纯文本查看 复制代码
public Map<List<Integer>, List<MsgInfo>> grouperBatch(List<MsgInfo> batch) {
    Map<List<Integer>, List<MsgInfo>> ret = new HashMap<List<Integer>, List<MsgInfo>>();
    //optimize fieldGrouping & customGrouping
    if (GrouperType.local_or_shuffle.equals(grouptype)) {
       ret.put(local_shuffer_grouper.grouper(null), batch);
    }  else if (GrouperType.global.equals(grouptype)) {
        // send to task which taskId is 0
        ret.put(JStormUtils.mk_list(out_tasks.get(0)), batch);
    } else if (GrouperType.fields.equals(grouptype)) {
        fields_grouper.batchGrouper(batch, ret);
    } else if (GrouperType.all.equals(grouptype)) {
        // send to every task
        ret.put(out_tasks, batch);
    } else if (GrouperType.shuffle.equals(grouptype)) {
        // random, but the random is different from none
        ret.put(shuffer.grouper(null), batch);
    } else if (GrouperType.none.equals(grouptype)) {
        int rnd = Math.abs(random.nextInt() % out_tasks.size());
        ret.put(JStormUtils.mk_list(out_tasks.get(rnd)), batch);
    } else if (GrouperType.custom_obj.equals(grouptype) || GrouperType.custom_serialized.equals(grouptype)) {
        for (int i = 0; i < batch.size(); i++ ) {
            MsgInfo msg = batch.get(i);
            List<Integer> out = custom_grouper.grouper(msg.values);
            List<MsgInfo> customBatch = ret.get(out);
            if (customBatch == null) {
                customBatch = JStormUtils.mk_list();
                ret.put(out, customBatch);
            }
            customBatch.add(msg);
        }
    } else if (GrouperType.localFirst.equals(grouptype)) {
        ret.put(localFirst.grouper(null), batch);
    } else {
        LOG.warn("Unsupportted group type");
    }
    return ret;
}
2.10、 Tuple是什么
2.11、 并行度是什么
传智播客·黑马程序员郑州校区地址
河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
联系电话 0371-56061160/61/62
来校路线  地铁一号线梧桐街站A口出

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马