1. 程式人生 > >簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)

簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)

maven依賴

<dependencies>
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>0.9.5</version>
      <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
<groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.5</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId
>
</exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.clojure</groupId
>
<artifactId>clojure</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency> </dependencies>

訂單資訊類:OrderInfo.java

package com.xt.kafka2storm.order;

import com.google.gson.Gson;

import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

/**
 * Describe: 簡單的支付資訊,實際業務更復雜,比如父子訂單,一個訂單多個商品,一個支付訂單多個訂單等情況。
 */
public class OrderInfo implements Serializable {
    private String orderId;//訂單編號
    private Date createOrderTime;//訂單建立時間
    private String paymentId;//支付編號
    private Date paymentTime;//支付時間
    private String productId;//商品編號
    private String productName;//商品名稱
    private long productPrice;//商品價格
    private long promotionPrice;//促銷價格
    private String shopId;//商鋪編號
    private String shopName;//商鋪名稱
    private String shopMobile;//商品電話
    private long payPrice;//訂單支付價格
    private int num;//訂單數量

    public OrderInfo() {
    }

    public OrderInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '\'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '\'' +
                ", shopName='" + shopName + '\'' +
                ", shopMobile='" + shopMobile + '\'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() {
        this.productId = "12121212";
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);

        String date = "2015-11-11 12:22:12";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (ParseException e) {
            e.printStackTrace();
        }

        return new Gson().toJson(this);
    }
}

Kafka生產模擬訂單資訊:OrderMqSender.java

package com.xt.kafka2storm.order;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class OrderMqSender {
    public static void main(String[] args) {
        String TOPIC = "orderMq";
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("request.required.acks", "1");
        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "",new OrderInfo().random() ));
//            try {
//                Thread.sleep(100);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
        }
    }

}

Storm處理從kafka獲取的資料Bolt程式碼:ParserOrderMqBolt.java

package com.xt.kafka2storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.google.gson.Gson;
import com.xt.kafka2storm.order.OrderInfo;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashMap;
import java.util.Map;

public class ParserOrderMqBolt extends BaseRichBolt {
    private JedisPool pool;
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        //change "maxActive" -> "maxTotal" and "maxWait" -> "maxWaitMillis" in all examples
        JedisPoolConfig config = new JedisPoolConfig();
        //控制一個pool最多有多少個狀態為idle(空閒的)的jedis例項。
        config.setMaxIdle(5);
        //控制一個pool可分配多少個jedis例項,通過pool.getResource()來獲取;
        //如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis例項,則此時pool的狀態為exhausted(耗盡)。
        //在borrow一個jedis例項時,是否提前進行validate操作;如果為true,則得到的jedis例項均是可用的;
        config.setMaxTotal(1000 * 100);
        //表示當borrow(引入)一個jedis例項時,最大的等待時間,如果超過等待時間,則直接丟擲JedisConnectionException;
        config.setMaxWaitMillis(30);
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        /**
         *如果你遇到 java.net.SocketTimeoutException: Read timed out exception的異常資訊
         *請嘗試在構造JedisPool的時候設定自己的超時值. JedisPool預設的超時時間是2秒(單位毫秒)
         */
        pool = new JedisPool(config, "127.0.0.1", 6379);
    }

    public void execute(Tuple input) {
        Jedis jedis = pool.getResource();
        //獲取kafkaSpout傳送過來的資料,是一個json
        String string = new String((byte[]) input.getValue(0));
        //解析json
        OrderInfo orderInfo = (OrderInfo) new  Gson().fromJson(string, OrderInfo.class);
        //整個網站,各個業務線,各個品類,各個店鋪,各個品牌,每個商品
        //獲取整個網站的金額統計指標
//        String totalAmount =  jedis.get("totalAmount");
        jedis.incrBy("totalAmount",orderInfo.getProductPrice());
        //獲取商品所屬業務線的指標資訊
        String bid =  getBubyProductId(orderInfo.getProductId(),"b");
//        String bAmout =  jedis.get(bid+"Amout");
        jedis.incrBy(bid+"Amount",orderInfo.getProductPrice());
        jedis.close();
    }

    private String getBubyProductId(String productId,String type) {
//        key:value
        //index:productID:info---->Map
        //  productId-----<各個業務線,各個品類,各個店鋪,各個品牌,每個商品>
        Map<String,String> map =  new HashMap<String,String>();
        map.put("b","3c");
        map.put("c","phone");
        map.put("s","121");
        map.put("p","iphone");
        return map.get(type);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

Storm啟動入口,結合KafkaSpout消費資料:KafkaAndStormTopologyMain.java

package com.xt.kafka2storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;

/**
 * Created by XT on 2018/3/31.
 */
public class KafkaAndStormTopologyMain {

    public static void main(String[] args) throws Exception {
        TopologyBuilder tb = new TopologyBuilder();
        tb.setSpout("kafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop1:2181,hadoop2:2181,hadoop3:2181"),"orderMq","/myKafka","kafkaSpout")),1);
        tb.setBolt("bolt1",new MyBolt1(),1).shuffleGrouping("kafkaSpout");

        Config config = new Config();
        config.setNumWorkers(1);

        //提交任務
        if (args.length > 0){
            //叢集執行
            StormSubmitter.submitTopology(args[0],config,tb.createTopology());
        }else {
            //本地執行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm2kafka",config,tb.createTopology());
        }
    }

}