1. 程式人生 > >Kafka 分區分配計算(分區器 Partitions )

Kafka 分區分配計算(分區器 Partitions )

partition topo 名稱 情況 amp 需求 對象 broker current

KafkaProducer在調用send方法發送消息至broker的過程中,首先是經過攔截器Inteceptors處理,然後是經過序列化Serializer處理,之後就到了Partitions階段,即分區分配計算階段。在某些應用場景下,業務邏輯需要控制每條消息落到合適的分區中,有些情形下則只要根據默認的分配規則即可。在KafkaProducer計算分配時,首先根據的是ProducerRecord中的partition字段指定的序號計算分區。讀者有可能剛睡醒,看到這個ProducerRecord似曾相識,沒有關系,先看段Kafka生產者的示例片段:

Producer<String,String> producer = new KafkaProducer<String,String>(properties);
String message = "kafka producer demo";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
try {
    producer.send(producerRecord).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

沒錯,ProducerRecord只是一個封裝了消息的對象而已,ProducerRecord一共有5個成員變量,即:

private final String topic;//所要發送的topic
private final Integer partition;//指定的partition序號
private final Headers headers;//一組鍵值對,與RabbitMQ中的headers類似,kafka0.11.x版本才引入的一個屬性
private final K key;//消息的key
private final V value;//消息的value,即消息體
private final Long timestamp;//消息的時間戳,可以分為Create_Time和LogAppend_Time之分,這個以後的文章中再表。123456

在KafkaProducer的源碼(1.0.0)中,計算分區時調用的是下面的partition()方法:

/**
 * computes partition for given record.
 * if the record has partition returns the value otherwise
 * calls configured partitioner class to compute the partition.
 */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

可以看出的確是先判斷有無指明ProducerRecord的partition字段,如果沒有指明,則再進一步計算分區。上面這段代碼中的partitioner在默認情況下是指Kafka默認實現的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法實現如下:

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

由上源碼可以看出partition的計算方式:

  • 如果key為null,則按照一種輪詢的方式來計算分區分配
  • 如果key不為null則使用稱之為murmur的Hash算法(非加密型Hash函數,具備高運算性能及低碰撞率)來計算分區分配。

KafkaProducer中還支持自定義分區分配方式,與org.apache.kafka.clients.producer.internals.DefaultPartitioner一樣首先實現org.apache.kafka.clients.producer.Partitioner接口,然後在KafkaProducer的配置中指定partitioner.class為對應的自定義分區器(Partitioners)即可,即:

properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");

自定義DemoPartitioner主要是實現Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的計算方式,詳細參考如下:

public class DemoPartitioner implements Partitioner {

    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes || keyBytes.length<1) {
            return atomicInteger.getAndIncrement() % numPartitions;
        }
        //借用String的hashCode的計算方式
        int hash = 0;
        for (byte b : keyBytes) {
            hash = 31 * hash + b;
        }
        return hash % numPartitions;
    }

    @Override
    public void close() {}
}

這個自定義分區器的實現比較簡單,讀者可以根據自身業務的需求來靈活實現分配分區的計算方式,比如:一般大型電商都有多個倉庫,可以將倉庫的名稱或者ID作為Key來靈活的記錄商品信息。

Kafka 分區分配計算(分區器 Partitions )