[AppleScript] 纯文本查看 复制代码
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
[AppleScript] 纯文本查看 复制代码
package com.itheima.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
public class WordCountTopology {
public static void main(String[] args) {
// 创建一个Trident的拓扑结构
TridentTopology tridentTopology = new TridentTopology();
// 创建一个模拟发送数据的TridentSpout
FixedBatchSpout textLineSpout = new FixedBatchSpout(new Fields("line"), 10,
new Values("provides support for the Apache Community of Open Source software projects which provide software products for the public good"),
new Values("by collaborative consensus based processes an open pragmatic software license and a desire to create high quality software that leads the way in its field"),
new Values("not simply a group of projects sharing a server but rather a community of developers and users"),
new Values("not simply a group of of of of of of of a a a server server server a of community community community"));
// textLineSpout.setCycle(true);
// 创建一个TridentStream
tridentTopology.newStream("textLineSpout", textLineSpout)
// 遍历每一个文本行,并指定使用SplitFunc进行处理,输出字段名为word的tuple元组
.each(new Fields("line"), new SplitFunc(), new Fields("word"))
// 根据tuple的word字段进行分组
.groupBy(new Fields("word"))
// 指定聚合需要处理的tuple字段,并使用SumFunc进行聚合计算,输出字段为duWord何count的tuple元组
.aggregate(new Fields("word"), new SumFunc(), new Fields("duWord", "count"))
// 指定将tuple的duWord,count交给PrintFunc进行处理,输出空字段
.each(new Fields("duWord", "count"), new PrintFunc(), new Fields("none"))
.parallelismHint(1);
// 基于TridentTopology构建StormTopology
StormTopology stormTopology = tridentTopology.build();
Config config = new Config();
// 执行本地提交
LocalCluster local = new LocalCluster();
local.submitTopology("wordcount", config, stormTopology);
}
public static class SplitFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
// 实现tuple的转换
String line = tuple.getString(0);
String[] wordArr = line.split(" ");
for (String word : wordArr) {
collector.emit(new Values(word, 1));
}
}
}
public static class SumFunc extends BaseAggregator {
static class WordCountBean{
String word = "";
Long count = 0L;
}
@Override
public Object init(Object batchId, TridentCollector collector) {
// 创建用于分组计算的对象
return new WordCountBean();
}
@Override
public void aggregate(Object val, TridentTuple tuple, TridentCollector collector) {
if(val instanceof WordCountBean) {
// 累加单词数量
WordCountBean wc = (WordCountBean)val;
// 第一次执行初始化单词本身
if(StringUtils.isNotBlank(wc.word)) {
wc.word = tuple.getString(0);
}
wc.count++;
}
}
@Override
public void complete(Object val, TridentCollector collector) {
if(val instanceof WordCountBean) {
// 将最终计算好的结果发射出去
WordCountBean wc = (WordCountBean)val;
collector.emit(new Values(wc.word, wc.count));
}
}
}
private static class PrintFunc extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
System.out.println(tuple.getString(0) + " " + tuple.getLong(1));
collector.emit(new Values(""));
}
}
}