A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

【郑州校区】案例:实时交易数据统计

3.1 业务背景(重要)
根据订单mq,快速计算双11当天的订单量、销售金额。
3.2 架构设计及思路
支付系统+kafka+storm/Jstorm集群+redis集群
1、支付系统发送mq到kafka集群中,编写storm程序消费kafka的数据并计算实时的订单数量、订单数量
2、将计算的实时结果保存在redis中
3、外部程序访问redis的数据实时展示结果
        
3.3 数据准备
订单编号、订单时间、支付编号、支付时间、商品编号、商家名称、商品价格、优惠价格、支付金额
3.4 业务口径
l 订单总数:一条支付信息当一条订单处理,假设订单信息不会重发(实际情况要考虑订单去重的情况,父子订单等多种情况),计算接收到MQ的总条数,即当做订单数。
l 销售额:累加所有的订单中商品的价格
l 支付金额:累加所有订单中商品的支付价格
l 用户人数:一条支付信息当一个人处理,假设订单一个人只下一单(实际情况要考虑用户去重的情况)。
整体淘宝的业务指标,每个品类,每个产品线,每个淘宝店,每个商品
Redis Key如何设计?
Index:{}pinlei}:{date}   value
Index:1290:20160526   value
Index:1291:20160526   value
Index:1292:20160526   value
Index:1293:20160526   value
Index:1294:20160526   value
3.5 数据展示
读取redis中的数据,每秒进行展示,打印在控制台。
3.6 工程设计
l 数据产生:编写kafka数据生产者,模拟订单系统发送mq
l 数据输入:使用PaymentSpout消费kafka中的数据
l 数据计算:使用CountBolt对数据进行统计
l 数据存储:使用Sava2RedisBolt对数据进行存储,将结果数据存储到redis中
l 数据展示:编写java app客户端,访问redis,对数据进行展示,展示方式为打印在控制台。
1、获取外部数据源,MQSpout----Open(连接你的RMQ)---nextTuple()-----emit(json)
2、ParserPaymentInfoBolt()----execute(Tuple)------解析Json----JavaBean
   productId,orderId,time,price(原价,订单价,优惠价,支付价),user,收货地址
   total:原价、total:订单价、total:订单人数……
3、Save2ReidsBolt,保存相关业务指标
        问题:   在redis中存放整个网站销售的原价,  b:t:p:20160410 ---> value
           redis:   String----> value1+value2 + value3 + value4  incrBy
b:t:p:20160410
b:t:p:20161111
b:t:p:20160412
3.7 代码开发3.7.1 项目依赖
[AppleScript] 纯文本查看 复制代码
<!-- storm core-->
<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
        <scope>provided</scope>
    </dependency>
    <!-- storm kafka KafkaSpout-->
    <!--  use new kafka spout code -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka-client</artifactId>
        <version>1.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>
    <!-- redis  jedis 依赖-->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.8.0</version>
    </dependency>
    <!-- json Gson/fastjson-->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.4</version>
    </dependency>
</dependencies>
3.7.2 数据生产
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime.kanban.producer;
import cn.itcast.realtime.kanban.domain.PaymentInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PaymentInfoProducer {
    public static void main(String[] args){
        //1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        /**
         * 当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数。
         * 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
         * 当一起使用时,min.insync.replicas和acks允许您执行更大的耐久性保证。
         * 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“全部”选项来产生。
         * 这将确保生产者如果大多数副本没有收到写入引发异常。
         */
        props.put("acks", "all");
        /**
         * 设置一个大于零的值,将导致客户端重新发送任何失败的记录
         */
        props.put("retries", 0);
        /**
         * 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。
         * 这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。
         */
        props.put("batch.size", 16384);
        /**
         *在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。
         * 这个设置通过添加少量的人工延迟来实现这一点,即不是立即发出一个记录,
         * 而是等待达到给定延迟的记录,以允许发送其他记录,以便发送可以一起批量发送
         */
        props.put("linger.ms", 1);
        /**
         * 生产者可用于缓冲等待发送到服务器的记录的总字节数。
         * 如果记录的发送速度比发送给服务器的速度快,那么生产者将会阻塞,max.block.ms之后会抛出异常。
         * 这个设置应该大致对应于生产者将使用的总内存,但不是硬性限制,
         * 因为不是所有生产者使用的内存都用于缓冲。
         * 一些额外的内存将被用于压缩(如果压缩被启用)以及用于维护正在进行的请求。
         */
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        while (true){
            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("itcast_shop_order",new PaymentInfo().random()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
3.7.3 驱动类
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime.kanban.Storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;

/**
 * 组装应用程序--驱动类
 */
public class KanBanTopology {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        //1、创建一个job(topology)
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //2、设置job的详细内容
        KafkaSpoutConfig.Builder<String, String> builder = KafkaSpoutConfig.builder("node01:9092","itcast_shop_order");
        builder.setGroupId("bigdata_kanban_order");
        KafkaSpoutConfig<String, String> kafkaSpoutConfig = builder.build();
        topologyBuilder.setSpout("KafkaSpout",new KafkaSpout<String,String>(kafkaSpoutConfig), 1);
        topologyBuilder.setBolt("ETLBolt",new ETLBolt(),1).shuffleGrouping("KafkaSpout");
        topologyBuilder.setBolt("ProcessBolt",new ProcessBolt(),1).shuffleGrouping("ETLBolt");
        //准备配置项
        Config config = new Config();
        config.setDebug(false);
        //3、提交job
        //提交由两种方式:一种本地运行模式、一种集群运行模式。
        if (args != null && args.length > 0) {
            //运行集群模式
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("KanBanTopology", config, topologyBuilder.createTopology());
        }
    }
}
3.7.4 ETLBolt
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime.kanban.Storm;

import cn.itcast.realtime.kanban.domain.PaymentInfo;
import com.google.gson.Gson;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class ETLBolt extends BaseRichBolt {
    private OutputCollector collector;

    /**
     * 初始化方法
     * Map stormConf 应用能够得到的配置文件
     * TopologyContext context 上下文 一般没有什么用
     * OutputCollector collector 数据收集器
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
     */
    @Override
    public void execute(Tuple input) {
        String json = input.getString(4);
        json = input.getStringByField("value");
        // 将json串转成 Java对象
        Gson gson = new Gson();
        PaymentInfo paymentInfo = gson.fromJson(json, PaymentInfo.class);
        // 其它的操作,比如说根据商品id查询商品的一级分类,二级分类,三级分类
        if(paymentInfo!=null){
            collector.emit(new Values(paymentInfo));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 声明 输出的是什么字段
        declarer.declare(new Fields("paymentInfo"));
    }
}
3.7.5 processBolt
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime.kanban.Storm;

import cn.itcast.realtime.kanban.domain.PaymentInfo;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;

import java.util.Map;

public class ProcessBolt extends BaseRichBolt {
    private Jedis jedis;

    /**
     * 初始化方法
     * Map stormConf 应用能够得到的配置文件
     * TopologyContext context 上下文 一般没有什么用
     * OutputCollector collector 数据收集器
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        jedis = new Jedis("redis", 6379);
    }

    /**
     * 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
     */
    @Override
    public void execute(Tuple input) {
        //获取上游发送的javabean
        PaymentInfo value = (PaymentInfo) input.getValue(0);
        //先计算总数据 来一条算一条
        jedis.incrBy("kanban:total:ordernum",1);
        jedis.incrBy("kanban:total:orderPrice",value.getPayPrice());
        jedis.incrBy("kanban:total:orderuser",1);

        //计算商家(店铺的销售情况)
        String shopId = value.getShopId();
        jedis.incrBy("kanban:shop:"+shopId+":ordernum",1);
        jedis.incrBy("kanban:shop:"+shopId+":orderPrice",value.getPayPrice());
        jedis.incrBy("kanban:shop:"+shopId+":orderuser",1);

        //计算每个品类(品类id)一级品类
        String Level1 = value.getLevel1();
        jedis.incrBy("kanban:shop:"+Level1+":ordernum",1);
        jedis.incrBy("kanban:shop:"+Level1+":orderPrice",value.getPayPrice());
        jedis.incrBy("kanban:shop:"+Level1+":orderuser",1);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}
3.7.6 看板
[AppleScript] 纯文本查看 复制代码
package cn.itcast.realtime.kanban.view;

import redis.clients.jedis.Jedis;

public class Kanban {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("redis",6379);
        while (true){
            System.out.println("kanban:total:ordernum 指标是"+jedis.get("kanban:total:ordernum"));
            System.out.println("kanban:total:orderPrice指标是"+jedis.get("kanban:total:orderPrice"));
            System.out.println("kanban:total:orderuser指标是"+jedis.get("kanban:total:orderuser"));
            System.out.println("---------------------------");
            System.out.println();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
3.7.7 打包运行
[AppleScript] 纯文本查看 复制代码
<build>
    <plugins>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass>cn.itcast.realtime.kanban.Storm.KanBanTopology</mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.7.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
3.7.8 数据对象
[AppleScript] 纯文本查看 复制代码
import com.google.gson.Gson;

import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

public class PaymentInfo implements Serializable {
    private static final long serialVersionUID = -7958315778386204397L;
    private String orderId;//订单编号
    private Date createOrderTime;//订单创建时间
    private String paymentId;//支付编号
    private Date paymentTime;//支付时间
    private String productId;//商品编号
    private String productName;//商品名称
    private long productPrice;//商品价格
    private long promotionPrice;//促销价格
    private String shopId;//商铺编号
    private String shopName;//商铺名称
    private String shopMobile;//商品电话
    private long payPrice;//订单支付价格
    private int num;//订单数量

    /**
     * <Province>19</Province>
     * <City>1657</City>
     * <County>4076</County>
     */
    private String province; //省
    private String city; //市
    private String county;//县

    //102,144,114
    private String catagorys;

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getCounty() {
        return county;
    }

    public void setCounty(String county) {
        this.county = county;
    }

    public String getCatagorys() {
        return catagorys;
    }

    public void setCatagorys(String catagorys) {
        this.catagorys = catagorys;
    }

    public PaymentInfo() {
    }

    public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '\'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '\'' +
                ", shopName='" + shopName + '\'' +
                ", shopMobile='" + shopMobile + '\'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() {
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);
        this.shopId = new Random().nextInt(200000)+"";

        this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
        this.province = new Random().nextInt(23)+"";
        this.city = new Random().nextInt(265)+"";
        this.county = new Random().nextInt(1489)+"";

        String date = "2015-11-11 12:22:12";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (ParseException e) {
            e.printStackTrace();
        }

        return new Gson().toJson(this);
    }
}
传智播客·黑马程序员郑州校区地址
河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
联系电话 0371-56061160/61/62
来校路线  地铁一号线梧桐街站A口出

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马