【郑州校区】案例:实时交易数据统计
3.1 业务背景(重要)根据订单mq,快速计算双11当天的订单量、销售金额。 3.2 架构设计及思路支付系统+kafka+storm/Jstorm集群+redis集群 1、支付系统发送mq到kafka集群中,编写storm程序消费kafka的数据并计算实时的订单数量、订单数量 2、将计算的实时结果保存在redis中 3、外部程序访问redis的数据实时展示结果 3.3 数据准备订单编号、订单时间、支付编号、支付时间、商品编号、商家名称、商品价格、优惠价格、支付金额 3.4 业务口径 l 订单总数:一条支付信息当一条订单处理,假设订单信息不会重发(实际情况要考虑订单去重的情况,父子订单等多种情况),计算接收到MQ的总条数,即当做订单数。 l 销售额:累加所有的订单中商品的价格 l 支付金额:累加所有订单中商品的支付价格 l 用户人数:一条支付信息当一个人处理,假设订单一个人只下一单(实际情况要考虑用户去重的情况)。 整体淘宝的业务指标,每个品类,每个产品线,每个淘宝店,每个商品 Redis Key如何设计? Index:{}pinlei}:{date} value Index:1290:20160526 value Index:1291:20160526 value Index:1292:20160526 value Index:1293:20160526 value Index:1294:20160526 value 3.5 数据展示读取redis中的数据,每秒进行展示,打印在控制台。 3.6 工程设计l 数据产生:编写kafka数据生产者,模拟订单系统发送mq l 数据输入:使用PaymentSpout消费kafka中的数据 l 数据计算:使用CountBolt对数据进行统计 l 数据存储:使用Sava2RedisBolt对数据进行存储,将结果数据存储到redis中 l 数据展示:编写java app客户端,访问redis,对数据进行展示,展示方式为打印在控制台。 1、获取外部数据源,MQSpout----Open(连接你的RMQ)---nextTuple()-----emit(json) 2、ParserPaymentInfoBolt()----execute(Tuple)------解析Json----JavaBean productId,orderId,time,price(原价,订单价,优惠价,支付价),user,收货地址 total:原价、total:订单价、total:订单人数…… 3、Save2ReidsBolt,保存相关业务指标 问题: 在redis中存放整个网站销售的原价, b:t:p:20160410 ---> value redis: String----> value1+value2 + value3 + value4 incrBy b:t:p:20160410 b:t:p:20161111 b:t:p:20160412 3.7 代码开发3.7.1 项目依赖[AppleScript] 纯文本查看 复制代码 <!-- storm core-->
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
<!-- storm kafka KafkaSpout-->
<!-- use new kafka spout code -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<!-- redis jedis 依赖-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
<!-- json Gson/fastjson-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.4</version>
</dependency>
</dependencies> |
3.7.2 数据生产[AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime.kanban.producer;
import cn.itcast.realtime.kanban.domain.PaymentInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class PaymentInfoProducer {
public static void main(String[] args){
//1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
/**
* 当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数。
* 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
* 当一起使用时,min.insync.replicas和acks允许您执行更大的耐久性保证。
* 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“全部”选项来产生。
* 这将确保生产者如果大多数副本没有收到写入引发异常。
*/
props.put("acks", "all");
/**
* 设置一个大于零的值,将导致客户端重新发送任何失败的记录
*/
props.put("retries", 0);
/**
* 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。
* 这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。
*/
props.put("batch.size", 16384);
/**
*在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。
* 这个设置通过添加少量的人工延迟来实现这一点,即不是立即发出一个记录,
* 而是等待达到给定延迟的记录,以允许发送其他记录,以便发送可以一起批量发送
*/
props.put("linger.ms", 1);
/**
* 生产者可用于缓冲等待发送到服务器的记录的总字节数。
* 如果记录的发送速度比发送给服务器的速度快,那么生产者将会阻塞,max.block.ms之后会抛出异常。
* 这个设置应该大致对应于生产者将使用的总内存,但不是硬性限制,
* 因为不是所有生产者使用的内存都用于缓冲。
* 一些额外的内存将被用于压缩(如果压缩被启用)以及用于维护正在进行的请求。
*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
while (true){
//3、发送数据
kafkaProducer.send(new ProducerRecord<String, String>("itcast_shop_order",new PaymentInfo().random()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} | 3.7.3 驱动类[AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime.kanban.Storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
/**
* 组装应用程序--驱动类
*/
public class KanBanTopology {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
//1、创建一个job(topology)
TopologyBuilder topologyBuilder = new TopologyBuilder();
//2、设置job的详细内容
KafkaSpoutConfig.Builder<String, String> builder = KafkaSpoutConfig.builder("node01:9092","itcast_shop_order");
builder.setGroupId("bigdata_kanban_order");
KafkaSpoutConfig<String, String> kafkaSpoutConfig = builder.build();
topologyBuilder.setSpout("KafkaSpout",new KafkaSpout<String,String>(kafkaSpoutConfig), 1);
topologyBuilder.setBolt("ETLBolt",new ETLBolt(),1).shuffleGrouping("KafkaSpout");
topologyBuilder.setBolt("ProcessBolt",new ProcessBolt(),1).shuffleGrouping("ETLBolt");
//准备配置项
Config config = new Config();
config.setDebug(false);
//3、提交job
//提交由两种方式:一种本地运行模式、一种集群运行模式。
if (args != null && args.length > 0) {
//运行集群模式
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("KanBanTopology", config, topologyBuilder.createTopology());
}
}
} |
3.7.4 ETLBolt [AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime.kanban.Storm;
import cn.itcast.realtime.kanban.domain.PaymentInfo;
import com.google.gson.Gson;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class ETLBolt extends BaseRichBolt {
private OutputCollector collector;
/**
* 初始化方法
* Map stormConf 应用能够得到的配置文件
* TopologyContext context 上下文 一般没有什么用
* OutputCollector collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
*/
@Override
public void execute(Tuple input) {
String json = input.getString(4);
json = input.getStringByField("value");
// 将json串转成 Java对象
Gson gson = new Gson();
PaymentInfo paymentInfo = gson.fromJson(json, PaymentInfo.class);
// 其它的操作,比如说根据商品id查询商品的一级分类,二级分类,三级分类
if(paymentInfo!=null){
collector.emit(new Values(paymentInfo));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明 输出的是什么字段
declarer.declare(new Fields("paymentInfo"));
}
} |
3.7.5 processBolt[AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime.kanban.Storm;
import cn.itcast.realtime.kanban.domain.PaymentInfo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import java.util.Map;
public class ProcessBolt extends BaseRichBolt {
private Jedis jedis;
/**
* 初始化方法
* Map stormConf 应用能够得到的配置文件
* TopologyContext context 上下文 一般没有什么用
* OutputCollector collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
jedis = new Jedis("redis", 6379);
}
/**
* 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
*/
@Override
public void execute(Tuple input) {
//获取上游发送的javabean
PaymentInfo value = (PaymentInfo) input.getValue(0);
//先计算总数据 来一条算一条
jedis.incrBy("kanban:total:ordernum",1);
jedis.incrBy("kanban:total:orderPrice",value.getPayPrice());
jedis.incrBy("kanban:total:orderuser",1);
//计算商家(店铺的销售情况)
String shopId = value.getShopId();
jedis.incrBy("kanban:shop:"+shopId+":ordernum",1);
jedis.incrBy("kanban:shop:"+shopId+":orderPrice",value.getPayPrice());
jedis.incrBy("kanban:shop:"+shopId+":orderuser",1);
//计算每个品类(品类id)一级品类
String Level1 = value.getLevel1();
jedis.incrBy("kanban:shop:"+Level1+":ordernum",1);
jedis.incrBy("kanban:shop:"+Level1+":orderPrice",value.getPayPrice());
jedis.incrBy("kanban:shop:"+Level1+":orderuser",1);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
} |
3.7.6 看板 [AppleScript] 纯文本查看 复制代码 package cn.itcast.realtime.kanban.view;
import redis.clients.jedis.Jedis;
public class Kanban {
public static void main(String[] args) {
Jedis jedis = new Jedis("redis",6379);
while (true){
System.out.println("kanban:total:ordernum 指标是"+jedis.get("kanban:total:ordernum"));
System.out.println("kanban:total:orderPrice指标是"+jedis.get("kanban:total:orderPrice"));
System.out.println("kanban:total:orderuser指标是"+jedis.get("kanban:total:orderuser"));
System.out.println("---------------------------");
System.out.println();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} |
3.7.7 打包运行[AppleScript] 纯文本查看 复制代码 <build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>cn.itcast.realtime.kanban.Storm.KanBanTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build> | 3.7.8 数据对象[AppleScript] 纯文本查看 复制代码 import com.google.gson.Gson;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
public class PaymentInfo implements Serializable {
private static final long serialVersionUID = -7958315778386204397L;
private String orderId;//订单编号
private Date createOrderTime;//订单创建时间
private String paymentId;//支付编号
private Date paymentTime;//支付时间
private String productId;//商品编号
private String productName;//商品名称
private long productPrice;//商品价格
private long promotionPrice;//促销价格
private String shopId;//商铺编号
private String shopName;//商铺名称
private String shopMobile;//商品电话
private long payPrice;//订单支付价格
private int num;//订单数量
/**
* <Province>19</Province>
* <City>1657</City>
* <County>4076</County>
*/
private String province; //省
private String city; //市
private String county;//县
//102,144,114
private String catagorys;
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getCounty() {
return county;
}
public void setCounty(String county) {
this.county = county;
}
public String getCatagorys() {
return catagorys;
}
public void setCatagorys(String catagorys) {
this.catagorys = catagorys;
}
public PaymentInfo() {
}
public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
this.orderId = orderId;
this.createOrderTime = createOrderTime;
this.paymentId = paymentId;
this.paymentTime = paymentTime;
this.productId = productId;
this.productName = productName;
this.productPrice = productPrice;
this.promotionPrice = promotionPrice;
this.shopId = shopId;
this.shopName = shopName;
this.shopMobile = shopMobile;
this.payPrice = payPrice;
this.num = num;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Date getCreateOrderTime() {
return createOrderTime;
}
public void setCreateOrderTime(Date createOrderTime) {
this.createOrderTime = createOrderTime;
}
public String getPaymentId() {
return paymentId;
}
public void setPaymentId(String paymentId) {
this.paymentId = paymentId;
}
public Date getPaymentTime() {
return paymentTime;
}
public void setPaymentTime(Date paymentTime) {
this.paymentTime = paymentTime;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public long getProductPrice() {
return productPrice;
}
public void setProductPrice(long productPrice) {
this.productPrice = productPrice;
}
public long getPromotionPrice() {
return promotionPrice;
}
public void setPromotionPrice(long promotionPrice) {
this.promotionPrice = promotionPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
public String getShopMobile() {
return shopMobile;
}
public void setShopMobile(String shopMobile) {
this.shopMobile = shopMobile;
}
public long getPayPrice() {
return payPrice;
}
public void setPayPrice(long payPrice) {
this.payPrice = payPrice;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "PaymentInfo{" +
"orderId='" + orderId + '\'' +
", createOrderTime=" + createOrderTime +
", paymentId='" + paymentId + '\'' +
", paymentTime=" + paymentTime +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", promotionPrice=" + promotionPrice +
", shopId='" + shopId + '\'' +
", shopName='" + shopName + '\'' +
", shopMobile='" + shopMobile + '\'' +
", payPrice=" + payPrice +
", num=" + num +
'}';
}
public String random() {
this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
this.productPrice = new Random().nextInt(1000);
this.promotionPrice = new Random().nextInt(500);
this.payPrice = new Random().nextInt(480);
this.shopId = new Random().nextInt(200000)+"";
this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
this.province = new Random().nextInt(23)+"";
this.city = new Random().nextInt(265)+"";
this.county = new Random().nextInt(1489)+"";
String date = "2015-11-11 12:22:12";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.createOrderTime = simpleDateFormat.parse(date);
} catch (ParseException e) {
e.printStackTrace();
}
return new Gson().toJson(this);
}
} |
传智播客·黑马程序员郑州校区地址 河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
|