1. 程式人生 > >spark記錄(19)SparkStreaming之從kafkaBroker和zookeeper獲取offset,和使用zookeeper管理offset

spark記錄(19)SparkStreaming之從kafkaBroker和zookeeper獲取offset,和使用zookeeper管理offset

col ext js ryu 配置 map readv meta gdi rgs

一、從kafkaBroker獲取offset

/**
 * 測試之前需要啟動kafka
 * @author root
 *
 */
public class GetTopicOffsetFromKafkaBroker {
    public static void main(String[] args) {
        
        Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
        Set
<Entry<TopicAndPartition, Long>> entrySet = topicOffsets.entrySet(); for(Entry<TopicAndPartition, Long> entry : entrySet) { TopicAndPartition topicAndPartition = entry.getKey(); Long offset = entry.getValue(); String topic = topicAndPartition.topic();
int partition = topicAndPartition.partition(); System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset); } } /** * 從kafka集群中得到當前topic,生產者在每個分區中生產消息的偏移量位置 * @param KafkaBrokerServer * @param topic * @return */
public static Map<TopicAndPartition,Long> getTopicOffsets(String KafkaBrokerServer, String topic){ Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>(); for(String broker:KafkaBrokerServer.split(",")){ SimpleConsumer simpleConsumer = new SimpleConsumer(broker.split(":")[0],Integer.valueOf(broker.split(":")[1]), 64*10000,1024,"consumer"); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic)); TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { for (PartitionMetadata part : metadata.partitionsMetadata()) { Broker leader = part.leader(); if (leader != null) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId()); PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 10000); OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); if (!offsetResponse.hasError()) { long[] offsets = offsetResponse.offsets(topic, part.partitionId()); retVals.put(topicAndPartition, offsets[0]); } } } } simpleConsumer.close(); } return retVals; } }

二、從zookeeper獲取offset

public class GetTopicOffsetFromZookeeper {
    
    public static Map<TopicAndPartition,Long> getConsumerOffsets(String zkServers,String groupID, String topic) { 
        Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
        
        ObjectMapper objectMapper = new ObjectMapper();
        CuratorFramework  curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(zkServers).connectionTimeoutMs(1000)
                .sessionTimeoutMs(10000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
        
        curatorFramework.start();
        
        try{
            String nodePath = "/consumers/"+groupID+"/offsets/" + topic;
            if(curatorFramework.checkExists().forPath(nodePath)!=null){
                List<String> partitions=curatorFramework.getChildren().forPath(nodePath);
                for(String partiton:partitions){
                    int partitionL=Integer.valueOf(partiton);
                    Long offset=objectMapper.readValue(curatorFramework.getData().forPath(nodePath+"/"+partiton),Long.class);
                    TopicAndPartition topicAndPartition=new TopicAndPartition(topic,partitionL);
                    retVals.put(topicAndPartition, offset);
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }
        curatorFramework.close();
        
        return retVals;
    } 
    
    
    public static void main(String[] args) {
        Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
        Set<Entry<TopicAndPartition, Long>> entrySet = consumerOffsets.entrySet();
        for(Entry<TopicAndPartition, Long> entry : entrySet) {
            TopicAndPartition topicAndPartition = entry.getKey();
            String topic = topicAndPartition.topic();
            int partition = topicAndPartition.partition();
            Long offset = entry.getValue();
            System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
        }
    }
}

三、使用zookeeper管理offset

public class UseZookeeperManageOffset {
    /**
     * 使用log4j打印日誌,“UseZookeeper.class” 設置日誌的產生類
     */
    static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);
    
    
    public static void main(String[] args) {
        /**
         * 加載log4j的配置文件,方便打印日誌
         */
        ProjectUtil.LoadLogConfig();
        logger.info("project is starting...");
        
        /**
         * 從kafka集群中得到topic每個分區中生產消息的最大偏移量位置
         */
        Map<TopicAndPartition, Long> topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
        
        /**
         * 從zookeeper中獲取當前topic每個分區 consumer 消費的offset位置
         */
        Map<TopicAndPartition, Long> consumerOffsets = 
                GetTopicOffsetFromZookeeper.getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
        
        /**
         * 合並以上得到的兩個offset ,
         *     思路是:
         *         如果zookeeper中讀取到consumer的消費者偏移量,那麽就zookeeper中當前的offset為準。
         *         否則,如果在zookeeper中讀取不到當前消費者組消費當前topic的offset,就是當前消費者組第一次消費當前的topic,
         *             offset設置為topic中消息的最大位置。
         */
        if(null!=consumerOffsets && consumerOffsets.size()>0){
            topicOffsets.putAll(consumerOffsets);
        }
        /**
         * 如果將下面的代碼解開,是將topicOffset 中當前topic對應的每個partition中消費的消息設置為0,就是從頭開始。
         */
//        for(Map.Entry<TopicAndPartition, Long> item:topicOffsets.entrySet()){
//          item.setValue(0l);
//        }
        
        /**
         * 構建SparkStreaming程序,從當前的offset消費消息
         */
        JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"zhy");
        jsc.start();
        jsc.awaitTermination();
        jsc.close();
        
    }
}

spark記錄(19)SparkStreaming之從kafkaBroker和zookeeper獲取offset,和使用zookeeper管理offset