1. 程式人生 > >jstorm kafkaspout未能實現fail機制重發功能

jstorm kafkaspout未能實現fail機制重發功能

問題的發現是zookeeper kafka的消費offset很久沒有更新,通過打日誌發現

lastoffset其實是從傳送的儲存了所有傳送的offset的pendingOffsets(原始碼裡只是個treeset,阿里實現了ack,fail的非同步,居然沒用ConcurrentSkipListSet)中獲取的,而該pendingOffsets刪除資料,是ack後才呼叫的;如果某個offset的資料fail了,那麼此時呼叫的是另外一個treeset集合,而且只是簡單的remove,問題是都沒塞資料,就直接remvoe了;如下面兩張圖:

解決方法:

把原先的treeset替換為ConcurrentSkipListSet,

fail方法:

 public void fail(KafkaMessageId fail) {
        failedOffsets.add(fail);
        pendingOffsets.remove(fail.getOffset());
    }

在PartitionConsumer的emmit方法第一行新增如下程式碼行:

 if (!failedOffsets.isEmpty()) {
            fillFailMessage();
        }
 private void fillFailMessage() {

        ByteBufferMessageSet msgs;
        try {
            if (failedOffsets.isEmpty()) {
                return;
            }
            KafkaMessageId kafkaMessageId = failedOffsets.pollFirst();
            msgs = consumer.fetchMessages(kafkaMessageId.getPartition(), kafkaMessageId.getOffset());
            List<Long> failedOffset = failedOffsets.stream().mapToLong(KafkaMessageId::getOffset).boxed().collect
                    (Collectors.toList());
            for (MessageAndOffset msg : msgs) {
                if (failedOffset.contains(msg.offset())) {
                    LOG.info("failToSend data is parition   :" + partition + "  , offset : " + msg.offset()
                            +"failedOffsets size : "+failedOffset.size());
                    pendingOffsets.add(kafkaMessageId.getOffset());
                    emittingMessages.add(msg);
                    failedOffsets.removeIf(k->{
                  return k.getOffset() == msg.offset();
                    });
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            LOG.error(e.getMessage(), e);
        }
    }

這裡你可以自己決定是否過濾已經發送的。到此結束