1. 程式人生 > >Kafka分割槽與消費者的關係

Kafka分割槽與消費者的關係

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
    //    主題與消費者的對映                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map
<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic
= topicEntry.getKey(); // 主題 List<String> consumersForTopic = topicEntry.getValue(); // 消費者列表 // partitionsPerTopic表示主題和分割槽數的對映 // 獲取主題下有多少個分割槽 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null
) continue; // 消費者按字典序排序 Collections.sort(consumersForTopic); // 分割槽數量除以消費者數量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); // 取模,餘數就是額外的分割槽 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); // 分配分割槽 assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; }