KafkaConsumer assign VS subscribe
在kafka中,正常情況下,同一個group.id下的不同消費者不會消費同樣的partition,也即某個partition在任何時刻都只能被具有相同group.id的consumer中的一個消費。 也正是這個機制才能保證kafka的重要特性:
- 1、可以通過增加partitions和consumer來提升吞吐量;
- 2、保證同一份訊息不會被消費多次。
在KafkaConsumer類中(官方API),消費者可以通過assign和subscribe兩種方式指定要消費的topic-partition。具體的原始碼可以參考下文,
這兩個介面貌似是完成相同的功能,但是還有細微的差別,初次使用的同學可能感到困惑,下面就詳細介紹下兩者的區別。
對比結果
-
KafkaConsumer.subscribe() : 為consumer自動分配partition,有內部演算法保證topic-partition以最優的方式均勻分配給同group下的不同consumer。
-
KafkaConsumer.assign() : 為consumer手動、顯示的指定需要消費的topic-partitions,不受group.id限制,相當與指定的group無效(this method does not use the consumer's group management)。
測試程式碼
public class KafkaManualAssignTest { private static final Logger logger = LoggerFactory.getLogger(KafkaManualAssignTest.class); private static Properties props = new Properties(); private static KafkaConsumer<String, String> c1, c2; private static final String brokerList = "localhost:9092"; static { props.put("bootstrap.servers", brokerList); props.put("group.id", "assignTest"); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "true"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); c1 = new KafkaConsumer<String, String>(props); c2 = new KafkaConsumer<String, String>(props); } public static void main(String[] args) { TopicPartition tp = new TopicPartition("topic", 0); // 採用assign方式顯示的為consumer指定需要消費的topic, 具有相同group.id的兩個消費者 // 各自消費了一份資料, 出現了資料的重複消費 c1.assign(Arrays.asList(tp)); c2.assign(Arrays.asList(tp)); // 採用subscribe方式, 利用broker為consumer自動分配topic-partitions, // 兩個消費者各自消費一個partition, 資料互補, 無交叉. // c1.subscribe(Arrays.asList("topic")); // c2.subscribe(Arrays.asList("topic")); while (true) { ConsumerRecords<String, String> msg1 = c1.poll(1000L); if (msg1 != null) { for (ConsumerRecord m1 : msg1) { logger.info("m1 offset : {} , value : {}", m1.offset(), m1.value()); } } logger.info("====================="); ConsumerRecords<String, String> msg2 = c2.poll(1000L); if (msg2 != null) { for (ConsumerRecord m2 : msg2) { logger.info("m2 offset : {} , value : {}", m2.offset(), m2.value()); } } System.exit(0); } } } 複製程式碼
官方api
官方關於subscribe的解釋:
/** * Subscribe to the given list of topics to get dynamically assigned partitions. * <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * * <p> * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets * to be reset. You should also provide your own listener if you are doing your own offset * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. * * @param topics The list of topics to subscribe to * @throws IllegalArgumentException If topics is null or contains null or empty elements * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called *previously (without a subsequent call to {@link #unsubscribe()}), or if not *configured at-least one partition assignment strategy */ @Override public void subscribe(Collection<String> topics) { subscribe(topics, new NoOpConsumerRebalanceListener()); } 複製程式碼
官方關於assign的解釋:
/** * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment * and will replace the previous assignment (if there is one). * <p> * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}. * <p> * Manual topic assignment through this method does not use the consumer's group management * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)} * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}. * <p> * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new * assignment replaces the old one. * * @param partitions The list of partitions to assign this consumer * @throws IllegalArgumentException If partitions is null or contains null or empty topics * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern *(without a subsequent call to {@link #unsubscribe()}) */ @Override public void assign(Collection<TopicPartition> partitions) { acquireAndEnsureOpen(); try { if (partitions == null) { throw new IllegalArgumentException("Topic partition collection to assign to cannot be null"); } else if (partitions.isEmpty()) { this.unsubscribe(); } else { Set<String> topics = new HashSet<>(); for (TopicPartition tp : partitions) { String topic = (tp != null) ? tp.topic() : null; if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); topics.add(topic); } // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); metadata.setTopics(topics); } } finally { release(); } } 複製程式碼