1. 程式人生 > >記一次kafka消費能力優化

記一次kafka消費能力優化

之前的程式碼:

有多個source:多個kafka,一個ES

1.消費者資料介面

interface Source {
    List<String> poll();
}

2.impl

class KafkaSource implement Source {

  List<String> poll() {
    ConsumerRecords<String,String> records = kafkaConsumer.poll(500);

    List<String> dataList = new ArrayList(); //linkedlist是否要好點
for(ConsumerRecord record : records ) { String data = Adaptor.adaptor(record); dataList.add(data); } return dataList ; } }

3.實際消費者

class Server {
    Source source;

    public Server(Source source) {
        this.source = source;
    }

    void run() {
        while(true
)
{ List<String> dataList = source.poll(); for(String data : dataList) { doSomething(data); } } } }

消費能力:10000條/s

1.消費者資料介面

改造(使用Vistor模式)

public interface Source {

    void consume(SourceVistor sourceVistor);

    interface SourceVistor{
        void
accept(Event event); } }

2.impl

class KafkaSource implement Source {

  public void consume(final SourceVistor sourceVistor) {
        executorService.submit(new Runnable() {
            @Override
            public void run() {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(pollMillions);
                for (ConsumerRecord<String, String> record : consumerRecords) 
                    {
                    sourceVistor.accept(Adaptor.adapt(record.value()));
                    }
                    consumer.commitAsync();
                }
            }
        });
    }
}

3.實際消費者

public class Server {
    @Override
    public void start() {
        //開始消費kafka
        for (Source source : sources) {
            source.consume(new MatchAccepter());
        }
    }}

    private class MatchAccepter implements Source.SourceVistor{
        public void accept(String data) {
            doSomething(data);
        }
    }
}

4.修改kafka消費配置,指定消費資料量

  <!-- 消費者通用配置 -->
  <util:properties id="commonConsumerConfig">
    <prop key="enable.auto.commit">${kafka.consumer.auto.commit.enable:false}</prop>
    <prop key="request.timeout.ms">${kafka.consumer.request.timeout.ms:50000}</prop>
    <prop key="auto.commit.interval.ms">${kafka.consumer.auto.commit.interval.ms:1000}</prop>
    <prop key="max.partition.fetch.bytes">${kafka.consumer.max.partition.fetch.bytes:1000000}</prop>
    <prop key="auto.offset.reset">${kafka.consumer.auto.offset.reset:latest}</prop>
    <prop key="heartbeat.interval.ms">${kafka.consumer.heartbeat.interval.ms:25000}</prop>
    <prop key="session.timeout.ms">${kafka.consumer.session.timeout.ms:30000}</prop>
    <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
    <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
  </util:properties>

消費能力: 70000/s

總結:
可以看出,之前的方式,讀取出來再封裝進List再迴圈讀取這種方式非常簡單,也是第一時間想到的,但是效率比後者訪問者模式低了一個數量級(可能也和kafka配置有關.)改了之後,只需要讀取一次就消費,邏輯上來講,減少了2/3的浪費!!!
對於kafka這種我們監控資料,每秒鐘可能10W條資料,因此一點點地方都要注意,何況這個地方是重中之重!改了之後,效能立馬飆升!

(記憶體2G)
最後:Vistor模式很不錯!