1. 程式人生 > >kafka監控獲取logSize, offset, lag等資訊

kafka監控獲取logSize, offset, lag等資訊

由於專案需要,需要檢視kafka消費資訊lag(lag = logSize - offset)

參考https://www.aliyun.com/jiaocheng/775267.html 的實現方式在有些場景無法獲取offset的值(具體原因暫不知曉後續研究下)

因此決定直接從zookeeper中取offset值

一、springboot專案新增依賴

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

二、相關程式碼

import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaUtil {
	private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class);
	private static final int ZOOKEEPER_TIMEOUT = 30000;
	private final CountDownLatch latch = new CountDownLatch(1);

	public ZooKeeper getZookeeper(String connectionString) {
		ZooKeeper zk = null;
		try {
			zk = new ZooKeeper(connectionString, ZOOKEEPER_TIMEOUT, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if (Event.KeeperState.SyncConnected.equals(event.getState())) {
						latch.countDown();
					}
				}
			});
			latch.await();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return zk;
	}

	public static Properties getConsumerProperties(String groupId, String bootstrap_servers) {
		Properties props = new Properties();
		props.put("group.id", groupId);
		props.put("bootstrap.servers", bootstrap_servers);
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		return props;
	}

	/**
	 * 獲取logSize, offset, lag等資訊
	 * @param zk
	 * @param bootstrap_servers
	 * @param groupId
	 * @param topics null查詢groupId消費過的所有topic
	 * @param sorted
	 * @return
	 * @throws Exception
	 */
	public List<Map<String, Object>> getLagByGroupAndTopic(ZooKeeper zk, String bootstrap_servers, String groupId,
			String[] topics, boolean sorted) throws Exception {

		List<Map<String, Object>> topicPatitionMapList = new ArrayList<>();

		// 獲取group消費過的所有topic
		List<String> topicList = null;
		if (topics == null || topics.length == 0) {
			try {
				topicList = zk.getChildren("/consumers/" + groupId + "/offsets", false);
			} catch (KeeperException | InterruptedException e) {
				logger.error("從zookeeper獲取topics失敗:zkState: {}, groupId:{}", zk.getState(), groupId);
				throw new Exception("從zookeeper中獲取topics失敗");
			}
		} else {
			topicList = Arrays.asList(topics);
		}

		Properties consumeProps = getConsumerProperties(groupId, bootstrap_servers);
		logger.info("consumer properties:{}", consumeProps);
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumeProps);

		// 查詢topic partitions
		for (String topic : topicList) {
			List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
			//由於有時延, 儘量逐個topic查詢, 減少lag為負數的情況
			List<TopicPartition> topicPartitions = new ArrayList<>();

			// 獲取topic對應的 TopicPartition
			for (PartitionInfo partitionInfo : partitionsFor) {
				TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
				topicPartitions.add(topicPartition);
			}
			// 查詢logSize
			Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
			for (Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
				TopicPartition partitionInfo = entry.getKey();
				// 獲取offset
				String offsetPath = MessageFormat.format("/consumers/{0}/offsets/{1}/{2}", groupId, partitionInfo.topic(),
						partitionInfo.partition());
				byte[] data = zk.getData(offsetPath, false, null);
				long offset = Long.valueOf(new String(data));

				Map<String, Object> topicPatitionMap = new HashMap<>();
				topicPatitionMap.put("group", groupId);
				topicPatitionMap.put("topic", partitionInfo.topic());
				topicPatitionMap.put("partition", partitionInfo.partition());
				topicPatitionMap.put("logSize", endOffsets.get(partitionInfo));
				topicPatitionMap.put("offset", offset);
				topicPatitionMap.put("lag", endOffsets.get(partitionInfo) - offset);
				topicPatitionMapList.add(topicPatitionMap);
			}
		}
		consumer.close();
		
		if(sorted) {
			Collections.sort(topicPatitionMapList, new Comparator<Map<String,Object>>() {
				@Override
				public int compare(Map<String, Object> o1, Map<String, Object> o2) {
					if(o1.get("topic").equals(o2.get("topic"))) {
						return ((Integer)o1.get("partition")).compareTo((Integer)o2.get("partition"));
					}
					return ((String)o1.get("topic")).compareTo((String)o2.get("topic"));
				}
			});
		}
		
		return topicPatitionMapList;
	}

	public static void main(String[] args) throws Exception {
		String bootstrap_servers = "localhost:9092";
		String groupId = "interface-group-new";
		String[] topics = null;//{"test1", "test2", test3};

		KafkaUtil kafkaUtil = new KafkaUtil();
		String connectionString = "localhost:2181";
		ZooKeeper zk = kafkaUtil.getZookeeper(connectionString);
		if (zk == null) {
			throw new RuntimeException("獲取zookeeper連線失敗");
		}
		List<Map<String, Object>> topicPatitionMapList = kafkaUtil.getLagByGroupAndTopic(zk, bootstrap_servers,
				groupId, topics, true);

		for (Map<String, Object> map : topicPatitionMapList) {
			System.out.println(map);
		}
		zk.close();
	}
}

三、說明

 呼叫時引數topics為空會獲取到groupId所有消費過的topic(zookeeper會儲存消費過的groupId的offset值)

List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
  獲取到List<PartitionInfo> 後要儘快查詢zookeeper對應的offset,避免由於繼續生產消費或時延導致offset > logSize