1. 程式人生 > >java實現spark streaming與kafka整合進行流式計算

java實現spark streaming與kafka整合進行流式計算

  • 背景:網上關於spark streaming的文章還是比較多的,可是大多數用scala實現,因我們的電商實時推薦專案以java為主,就踩了些坑,寫了java版的實現,程式碼比較意識流,輕噴,歡迎討論。
  • 流程:spark streaming從kafka讀使用者實時點選資料,過濾資料後從redis讀商品相似度矩陣,從db讀user歷史行為,實時計算興趣度,並將結果寫入redis一份,供api層讀取展示,寫入hdfs一份供離線計算準確率召回率。
  • 補充:據瞭解,大型實時推薦系統裡面,協同過濾一般用作生成候選集,計算興趣讀會被ctr等策略的 rerank代替,在calculateinterest中呼叫線上rerank服務排序。
  • 12/13補充:召回不變,目前採用ctr預估加上規則排序,後續上ltr。

  • 廢話少說,上程式碼:

public class Main {
    static final String ZK_QUORUM = "*.*.*.*:2181,*.*.*.*:2181,*.*.*.*:2181/kafka";
    static final String GROUP = "test-consumer-group";
    static final String TOPICSS = "user_trace";
    static final String NUM_THREAD = "64";

    public
static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("main.java.computingCenter"); // Create the context with 2 seconds batch size //每兩秒讀取一次kafka JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(NUM_THREAD); Map<String, Integer> topicMap = new
HashMap<String, Integer>(); String[] topics = TOPICSS.split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String lines) { //kafka資料格式:"{\"Topic\":\"user_trace\",\"PartitionKey\":\"0\",\"TimeStamp\":1471524044018,\"Data\":\"0=163670589171371918%3A196846178238302087\",\"LogId\":\"0\",\"ContentType\":\"application/x-www-form-urlencoded\"}"; List<String> arr = new ArrayList<String>(); for (String s : lines.split(" ")) { Map j = JSON.parseObject(s); String s1 = ""; String s2 = ""; try { s1 = URLDecoder.decode(j.get("Data").toString(), "UTF-8"); s2 = s1.split("=")[1]; } catch (UnsupportedEncodingException e) { e.printStackTrace(); } arr.add(s2); } return arr; } }); JavaPairDStream<String, String> goodsSimilarityLists = words.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { //過濾非法的資料 if (s.split(":").length == 2) { return true; } return false; } }).mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() { //此處分partition對每個pair進行處理 @Override public Iterable<Tuple2<String, String>> call(Iterator<String> s) throws Exception { ArrayList<Tuple2<String, String>> result = new ArrayList<Tuple2<String, String>>(); while (s.hasNext()) { String x = s.next(); String userId = x.split(":")[0]; String goodsId = x.split(":")[1]; System.out.println(x); LinkedHashMap<Long, Double> recommendMap = null; try { //此service從redis讀資料,進行實時興趣度計算,推薦結果寫入redis,供api層使用 CalculateInterestService calculateInterestService = new CalculateInterestService(); try { recommendMap = calculateInterestService.calculateInterest(userId, goodsId); } catch (Exception e) { e.printStackTrace(); } String text = ""; int count = 0; for (Map.Entry<Long, Double> entry : recommendMap.entrySet()) { text = text + entry.getKey(); if (count == recommendMap.size() - 1) { break; } count = count + 1; text = text + "{/c}"; } text = System.currentTimeMillis() + ":" + text; result.add(new Tuple2<String, String>(userId, text)); } catch (Exception e) { e.printStackTrace(); } } return result; } }); goodsSimilarityLists.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() { @Override public Void call(JavaPairRDD<String, String> rdd) throws Exception { //列印rdd,除錯方便 System.out.println(rdd.collect()); return null; } }); JavaPairDStream<Text, Text> goodsSimilarityListsText = goodsSimilarityLists.mapToPair(new PairFunction<Tuple2<String, String>, Text, Text>(){ @Override public Tuple2<Text, Text> call(Tuple2<String, String> ori) throws Exception { //此處要將tuple2轉化為org.apache.hadoop.io.Text格式,使用saveAsHadoopFiles方法寫入hdfs return new Tuple2(new Text(ori._1), new Text(ori._2)); } }); //寫入hdfs goodsSimilarityListsText.saveAsHadoopFiles("/user/hadoop/recommend_list/rl", "123", Text.class, Text.class, SequenceFileOutputFormat.class); jssc.start(); jssc.awaitTermination(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
public class CalculateInterestService {

    private String dictKey = "greate_item_sim_2.0";
    private String recommendTable = "great_recommend_table_2.0";
    static final String HIGO_BASE_URL = "jdbc:mysql://*.*.*.*:3212/*";
    static final String HIGO_BASE_USER = "*";
    static final String HIGO_BASE_PASS = "*";

    public LinkedHashMap<Long, Double> calculateInterest(String userId, String traceGoodsId) {
        LinkedHashMap<Long, Double> sortedMap = new LinkedHashMap<Long, Double>();
        String[] simGoods = RedisHelper.getInstance().hget(dictKey, traceGoodsId).split(",");
        //使用者的歷史記錄,應該存action:goodsId:timestamp格式,要重構,bi寫入單獨的資料表中
        HashMap<Long, String> userTrace = null;
        try {
            userTrace = getUserTrace(userId);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return sortedMap;
        }
        HashMap<Long, Double> recommendMap = new HashMap<Long, Double>();
        String[] simGoodsIds = new String[simGoods.length];
        for (int i = 0; i < simGoods.length; i++) {
            simGoodsIds[i] = simGoods[i].split(":")[0];
        }
        List<String> pSimGoodsIds = RedisHelper.getInstance().hmget(dictKey, simGoodsIds);
        HashMap<Long, String> predictSimGoodsIds = new HashMap<Long, String>();
        for (int i = 0; i < simGoodsIds.length; i++) {
            predictSimGoodsIds.put(Long.parseLong(simGoodsIds[i]), pSimGoodsIds.get(i));
        }
        for (String item : simGoods) {
            //need optimised

            Double totalSum = 0.0;
            Double sum = 0.0;
            Long originGoodsId = Long.parseLong(item.split(":")[0]);
            for (String predictGoods : predictSimGoodsIds.get(originGoodsId).split(",")) {
                Long goodsId = Long.parseLong(predictGoods.split(":")[0].toString());
                Double sim = Double.valueOf(predictGoods.split(":")[1].toString());
                totalSum = totalSum + sim;
                Double score = 0.0;
                if (!userTrace.containsKey(goodsId)) {
                    //TODO 使用者評分矩陣過於稀疏,需要svd補充評分,暫時無評分score為預設0.1
                    userTrace.put(goodsId, "default");
                }
                String action = userTrace.get(goodsId);


                if (action.equals("click")) {
                    score = 0.2;
                } else if (action.equals("favorate")) {

                } else if (action.equals("add_cart")) {
                    score = 0.6;
                } else if (action.equals("order")) {
                    score = 0.8;

                } else if (action.equals("default")) {

                    score = 0.1;
                }
                //相似度詞典應存 goodsid:sim格式,要重構
                sum = sum + score * sim;
            }

            Double predictResult = sum / totalSum;
            recommendMap.put(originGoodsId, predictResult);
        }

        //sort recommend list
        List<Map.Entry<Long, Double>> list = new ArrayList<Map.Entry<Long, Double>>(recommendMap.entrySet());
        Collections.sort(list, new Comparator<Map.Entry<Long, Double>>() {
            @Override
            public int compare(Map.Entry<Long, Double> o1, Map.Entry<Long, Double> o2) {
                return o2.getValue().compareTo(o1.getValue());
            }
        });

        Map.Entry<Long, Double> tmpEntry = null;
        Iterator<Map.Entry<Long, Double>> iter = list.iterator();
        while (iter.hasNext()) {
            tmpEntry = iter.next();
            sortedMap.put(tmpEntry.getKey(), tmpEntry.getValue());
        }

        writeRecommendListToRedis(userId, sortedMap);

        return sortedMap;

    }

    private HashMap<Long, String> getUserTrace(String userId) throws ClassNotFoundException {
        //SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
        Class.forName("com.mysql.jdbc.Driver");
        PreparedStatement stmt = null;
        Connection conn = null;
        UserTrace userTrace = new UserTrace();
        try {
            conn = DriverManager.getConnection(HIGO_BASE_URL, HIGO_BASE_USER, HIGO_BASE_PASS);
            String sql = "select * from t_pandora_goods_record where account_id=" + userId;
            stmt = (PreparedStatement)conn.prepareStatement(sql);
            ResultSet rs = stmt.executeQuery();
            while(rs.next()) {
                userTrace.setId(Long.parseLong(rs.getString(1)));
                userTrace.setAccountId(Long.parseLong(rs.getString(2)));
                userTrace.setGoodsIds(rs.getString(3));
                userTrace.setMtime(rs.getString(4));
            }
            stmt.close();
            conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        String[] goodsActionTimestamp = userTrace.getGoodsIds().split(",");
        HashMap<Long, String> hm = new HashMap<Long, String>();
        for (String ac : goodsActionTimestamp) {
            Long goodsId = Long.parseLong(ac.split(":")[0]);
            //String action = ac.split(":")[1];
            //String timestamp = ac.split(":")[2];
            //hack 下一步要bi把使用者歷史行為寫入表中, action:goodsId:timestamp格式, timestamp後期將參與權重計算
            String action = "click";
            hm.put(goodsId, action);
        }
        return hm;
    }

    private void writeRecommendListToRedis(String userId, LinkedHashMap<Long, Double> sortedMap) {
        String recommendList = "";
        int count = 0;
        for (Map.Entry<Long, Double> entry : sortedMap.entrySet()) {
            recommendList = recommendList + entry.getKey();
            if (count == sortedMap.size() - 1) {
                break;
            }
            count = count + 1;
            recommendList = recommendList + ",";
        }
        RedisHelper.getInstance().hset(recommendTable, userId, recommendList);
    }

}