【郑州校区】WordCount案例分析
2.1、 功能说明设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。 整个topology分为三个部分: RandomSentenceSpout:数据源,在已知的英文句子中,随机发送一条句子出去。 SplitSentenceBolt:负责将单行文本记录(句子)切分成单词 WordCountBolt:负责对单词的频率进行累加 导入最新的依赖包 [AppleScript] 纯文本查看 复制代码 [/align]<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency> 2.2、TopologyMain 驱动类[AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
/**
* 组装应用程序--驱动类
*/
public class WordCountTopology {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
//1、创建一个job(topology)
TopologyBuilder topologyBuilder = new TopologyBuilder();
//2、设置job的详细内容
topologyBuilder.setSpout("ReadFileSpout",new ReadFileSpout(),1);
topologyBuilder.setBolt("SentenceSplitBolt",new SentenceSplitBolt(),1).shuffleGrouping("ReadFileSpout");
topologyBuilder.setBolt("WordCountBolt",new WordCountBolt(),1).shuffleGrouping("SentenceSplitBolt");
//准备配置项
Config config = new Config();
config.setDebug(false);
//3、提交job
//提交由两种方式:一种本地运行模式、一种集群运行模式。
if (args != null && args.length > 0) {
//运行集群模式
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology());
}
}
} | 2.3、 ReadFileSpout [AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* Spout 需要继承一个模板
*/
public class ReadFileSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
/**
* Map conf 应用程序能够读取的配置文件
* TopologyContext context 应用程序的上下文
* SpoutOutputCollector collector Spout输出的数据丢给SpoutOutputCollector。
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//1、Kafka 连接 / MYSQL 连接 /Redis 连接
//todo
//2、将SpoutOutputCollector复制给成员变量
this.collector = collector;
}
/**
* storm框架有个while循环,一直在nextTuple
*/
@Override
public void nextTuple() {
// 发送数据,使用collector.emit方法
// Values extends ArrayList<Object>
collector.emit(new Values("i love u"));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("biaobai"));
}
} |
2.4、 SentenceSplitBolt [AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 切割单词
*/
public class SentenceSplitBolt extends BaseRichBolt {
private OutputCollector collector;
/**
* 初始化方法
* Map stormConf 应用能够得到的配置文件
* TopologyContext context 上下文 一般没有什么用
* OutputCollector collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
//todo 连接数据 连接redis 连接hdfs
}
/**
* 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
*/
@Override
public void execute(Tuple input) {
// String sentence = input.getString(0);
// 底层先通过 biaobai 这个字段在map中找到对应的index角标值,然后再valus中获取对应数据。
String sentence = input.getStringByField("biaobai");
// todo 切割
String[] strings = sentence.split(" ");
for (String word : strings) {
// todo 输出数据
collector.emit(new Values(word,1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明 输出的是什么字段
declarer.declare(new Fields("word","num"));
}
} |
2.5、 WordCountBolt [AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* 计数
*/
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String, Integer> wordCountMap;
/**
* 初始化方法
* Map stormConf 应用能够得到的配置文件
* TopologyContext context 上下文 一般没有什么用
* OutputCollector collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
//todo 连接数据 连接redis 连接hdfs
wordCountMap = new HashMap<String, Integer>();
}
/**
* 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
*/
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer num = input.getIntegerByField("num");
// 先判断这个单词是否出现过
if (wordCountMap.containsKey(word)) {
Integer oldNum = wordCountMap.get(word);
wordCountMap.put(word, oldNum + num);
} else {
wordCountMap.put(word, num);
}
System.out.println(wordCountMap);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明 输出的是什么字段
declarer.declare(new Fields("fenshou"));
}
} | 2.6、 pom依赖[AppleScript] 纯文本查看 复制代码 <dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<!-- 目前<scope>可以使用5个值:
* compile,缺省值,适用于所有阶段,会随着项目一起发布。
* provided,类似compile,期望JDK、容器或使用者会提供这个依赖。如servlet.jar。
* runtime,只在运行时使用,如JDBC驱动,适用运行和测试阶段。
* test,只在测试时使用,用于编译和运行测试代码。不会随项目发布。
* system,类似provided,需要显式提供包含依赖的jar,Maven不会在Repository中查找它。 -->
<!--<scope>provided</scope>-->
</dependency>
</dependencies> | 2.7、 项目编译[AppleScript] 纯文本查看 复制代码 <build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>realtime.WordCountTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build> |
2.8、 Component生命周期 Spout生命周期 Bolt生命周期 Bolt的两个抽象类BaseRichBolt 需要手动调ack方法 BaseBasicBolt由storm框架自动调ack方法 详见《第九章》 2.9、 StreamGroupingMKgrouper [AppleScript] 纯文本查看 复制代码 public Map<List<Integer>, List<MsgInfo>> grouperBatch(List<MsgInfo> batch) {
Map<List<Integer>, List<MsgInfo>> ret = new HashMap<List<Integer>, List<MsgInfo>>();
//optimize fieldGrouping & customGrouping
if (GrouperType.local_or_shuffle.equals(grouptype)) {
ret.put(local_shuffer_grouper.grouper(null), batch);
} else if (GrouperType.global.equals(grouptype)) {
// send to task which taskId is 0
ret.put(JStormUtils.mk_list(out_tasks.get(0)), batch);
} else if (GrouperType.fields.equals(grouptype)) {
fields_grouper.batchGrouper(batch, ret);
} else if (GrouperType.all.equals(grouptype)) {
// send to every task
ret.put(out_tasks, batch);
} else if (GrouperType.shuffle.equals(grouptype)) {
// random, but the random is different from none
ret.put(shuffer.grouper(null), batch);
} else if (GrouperType.none.equals(grouptype)) {
int rnd = Math.abs(random.nextInt() % out_tasks.size());
ret.put(JStormUtils.mk_list(out_tasks.get(rnd)), batch);
} else if (GrouperType.custom_obj.equals(grouptype) || GrouperType.custom_serialized.equals(grouptype)) {
for (int i = 0; i < batch.size(); i++ ) {
MsgInfo msg = batch.get(i);
List<Integer> out = custom_grouper.grouper(msg.values);
List<MsgInfo> customBatch = ret.get(out);
if (customBatch == null) {
customBatch = JStormUtils.mk_list();
ret.put(out, customBatch);
}
customBatch.add(msg);
}
} else if (GrouperType.localFirst.equals(grouptype)) {
ret.put(localFirst.grouper(null), batch);
} else {
LOG.warn("Unsupportted group type");
}
return ret;
} |
2.10、 Tuple是什么2.11、 并行度是什么传智播客·黑马程序员郑州校区地址 河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
|