1.概述

Kafka是一個分佈表示實時資料流平臺,可獨立部署在單臺伺服器上,也可部署在多臺伺服器上構成叢集。它提供了釋出與訂閱的功能,使用者可以傳送資料到Kafka叢集中,也可以從Kafka叢集中讀取資料。之前在Kafka 2.8.0版本時,Kafka社群提出了KRaft協議的概念,現在社群釋出了Kafka 3.0,裡面涉及優化和新增了很多功能,其中就包含KRaft協議的改機。今天,筆者就給大家介紹一下Kafka 3.0新增了哪些特性以及優化了哪些功能。

2.內容

在 Kafka 3.0 中包含了許多重要的新功能,其中比較顯著的變化如下所示:

  • 棄用對Java 8 和Scala 2.12 的支援;
  • Kafka Raft 支援元資料主題的快照以及自動管理仲裁中的其他改進;
  • 預設情況下為Kafka 生產者提供更加強大的交付保證;
  • 棄用訊息格式 v0 和 v1;
  • OffsetFetch 和 FindCoordinator 請求中的優化;
  • 更靈活的 Mirror Maker 2 配置和 Mirror Maker 1 的棄用;
  • 能夠在 Kafka Connect 中的單個呼叫中重新其中聯結器的任務;
  • 現在預設啟用聯結器日誌上下文和聯結器客戶單覆蓋;
  • Kafka Streams 中時間戳同步的增強語義;
  • 改進了 Stream 和 TaskId 的公共 API;
  • Kafka 中的預設 serde 變為 null。

2.1 關於升級到 Kafka 3.0

在Kafka 3.0中,社群對於Zookeeper的版本已經升級到3.6.3了,其中我們可以預覽 KRaft 模式,但是無法從 2.8 或者更早的版本升級到該模式。許多實現依賴 jar 現在在執行時類路勁中可用,而不是在編譯和執行時類路勁中。升級後的編譯錯誤可以通過顯示新增缺少的依賴 jar 或更新應用程式以不使用內部類來修復。

消費者配置的預設值 session.timeout.ms 從10 秒增加到了45 秒,而Broker配置 log.message.format.version 和 Topic 配置 message.format.version 已經被啟用。兩種配置的值始終假定為 3.0 或者更高,通過 inter.broker.protocol.version 來配置。如果設定了 log.message.format.version 或者 message.format.version 建議在升級到 3.0的同時清理掉這兩個屬性,同時設定 inter.broker.protocol.version 值為 3.0 。

Streams API 刪除了在 2.5.0 或者更早版本中棄用的所有棄用 API,Kafka Streams 不再對“connect:json”模組有編譯時的依賴,依賴此傳遞依賴項的專案必須明確宣告它。

現在,通過指定的自定義主體構建起實現 principal.builder.class 現在必須實現 KafkaPrincipalSerde 介面以允許Broker 之間的轉發。另外,一些過時的類,方法和工具以及從clients、connect、core、和tools模組進行了刪除。

該Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被棄用。請使用 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)來替換,ConsumerGroupMetadata 可以通過檢索KafkaConsumer#groupMetadata()更強大的語義。需要注意的是,完整的消費者組元資料集只有 Brokers 或 2.5 或更高版本才能支援,因此你必須升級你的 Kafka 叢集以獲得更強的語義。否則,你可以通過new ConsumerGroupMetadata(consumerGroupId)與較老版本的Broker進行互動。

聯結器中 internal.key.converter 和 internal.value.converter 屬性已被完全刪除。自版本 2.0.0 起,不推薦使用這些 Connect 工作器屬性。現在被硬編碼為使用 schemas.enable 設定為的 JSON 轉換器false。如果你的叢集一直在使用不同的內部鍵或值轉換器,你可以按照官網文件中概述的遷移步驟,將你的 Connect 叢集安全地升級到 3.0。 基於 Connect 的 MirrorMaker (MM2) 包括對支援的更改IdentityReplicationPolicy,無需重新命名 Topic 即可啟用複製。DefaultReplicationPolicy預設情況下仍然使用現有的,但可以通過 replication.policy 配置屬性啟用身份複製 。這對於從舊版 MirrorMaker (MM1) 遷移的使用者,或者對於不希望 Topic 重新命名的具有簡單單向複製拓撲的用例特別有用。請注意IdentityReplicationPolicy與 DefaultReplicationPolicy 不同,無法根據 Topic 名稱阻止複製迴圈,因此在構建複製拓撲時要注意避免迴圈。

2.1.1 目的

雖然 internal.key.converter 和 internal.value.converter 中 Connect 工作器屬性,以及以這些名稱為字首的所有屬性都已棄用,但是有時候使用者仍會嘗試使用這些屬性進行除錯,在與未棄用的Key 和 Value轉化器相關的屬性意外混淆後,或者只是對其進行盲目的配置後,進行除錯。這些實驗的結果可能會產生不好的後果,配置了新內保轉換器卻無法讀取具有較舊內部轉換器的內保 Topic 資料,這最多會導致偏移量和連機器配置的丟失。

以下連線屬性會將被刪除:

  • internal.key.converter
  • internal.value.converter
  • internal.key.converter.   # 以工作器內部金鑰轉換器為字首的屬性
  • internal.value.converter.   # 以工作執行緒的內部值轉換器為字首的屬性

Connect 的行為就好像上面沒有提供一樣。具體來說,對於它的鍵和值轉換器,它將使用開箱即用的 JsonConverter,配置為 schemas.enable 屬性值為 false 。

2.1.2 升級步驟

執行未使用JsonConverter 並對 schemas.enable 設定 false 的 Connect 叢集使用者,可以按照以下步驟將其 Connect 叢集升級到 3.0:

  1. 停止叢集上的所有工作執行緒
  2. 對於每個內部主題(配置、偏移量和狀態):
    1. 建立一個新主題來代替現有主題
    2. 對於現有主題中的每條訊息:
      1. 使用 Connect 叢集的舊內部鍵和值轉換器反序列化訊息的鍵和值
      2. 使用 禁用模式的JSON 轉換器序列化訊息的鍵和值(通過將schemas.enable屬性設定為false)
      3. 用新的鍵和值向新的內部主題寫一條訊息
  3. 重新配置每個 Connect worker 以使用步驟 2 中新建立的內部主題
  4. 啟動叢集上的所有worker

2.2 新功能

在本次 Kafka 3.0  版本中新增了以下功能:

  • 添加了InsertHeader 和 DropHeader 連線轉換
  • 在 KRaft 模式中實現 createPartitions
  • 如果分割槽從 fetcher 中刪除,副本 fetcher 不應在發散時期更新分割槽狀態

2.2.1 新增 InsertHeader 和 DropHeader

之前在核心 Kafka 產品中引入了 Headers,在 Kafka Connect Framework 中公開它們將是有利的。Kafka 的 Header 是帶有二進位制值的簡單名稱,而 Connect API 已經有一個非常有用的層來處理不同型別的資料。Connect 的 Header 支援應該使用像 Kafka 這樣的字串名稱,但使用與 Connect 記錄鍵和值相同的型別來表示值。這將提供與 Connect 框架的其餘部分的一致性,並使聯結器和轉換能夠輕鬆地訪問、修改和建立記錄上的 Header。

Kafka 將 Header 定義為具有字串名稱和二進位制值,但 Connect 將使用用於記錄鍵和值的相同機制來表示 Header 值。每個 Header 值可能有一個對應的 Schema,允許聯結器和轉換以一致的方式處理 Header 值、記錄鍵和記錄值。Connect 將定義一種 HeaderConverter 機制以類似於Converter框架的方式序列化和反序列化標頭值 ,這樣現有的 Converter實現也可以實現 HeaderConverter. 由於來自不同供應商的聯結器和轉換可能被組合到單個管道中,因此不同的聯結器和轉換可以輕鬆地將 Header 值從原始形式轉換為聯結器和/或轉換期望的型別,這一點很重要。

注意:
為了簡潔和清晰,顯示的程式碼不包括 JavaDoc,但提議的更改確實包括所有公共 API 和方法的 JavaDoc。

1.Connect Header 和 Header API

org.apache.kafka.connect.Header 將新增一個新介面並用作記錄上單個標頭的公共 API。該介面為鍵、值和值的模式定義了簡單的 getter。這些是不可變物件,還有一些方法可以建立Header具有不同名稱或值的新物件。程式碼片段如下所示:

package org.apache.kafka.connect.header;
public interface Header { // Access the key and value
String key(); // never null
Schema schema(); // may be null
Object value(); // may be null // Methods to create a copy
Header with(Schema schema, Object value);
Header rename(String key);
}

org.apache.kafka.connect.Headers 還將新增一個新介面並用作記錄標題有序列表的公共 API。這是在 Kafka 客戶端的 org.apache.kafka.common.header.Headers介面之後作為標題的有序列表進行模式化的,其中允許多個具有相同名稱的標題。Connect Headers介面定義了Header按順序和/或按名稱訪問各個 物件以及獲取有關Header物件數量的資訊的方法 。它還定義了Header使用各種簽名來新增、刪除和保留 物件的方法,這些簽名將易於聯結器和轉換使用。由於多個Header物件可以具有相同的名稱,因此轉換需要一種簡單的方法來修改和/或刪除現有Header物件, apply(HeaderTransform) 並且apply(String, HeaderTransform) 方法可以輕鬆使用自定義 lambda 函式來執行此操作。程式碼片段如下所示:

package org.apache.kafka.connect.header;
public interface Headers extends Iterable<Header> { // Information about the Header instances
int size();
boolean isEmpty();
Iterator<Header> allWithName(String key);
Header lastWithName(String key); // Add Header instances to this object
Headers add(Header header);
Headers add(String key, SchemaAndValue schemaAndValue);
Headers add(String key, Object value, Schema schema);
Headers addString(String key, String value);
Headers addBoolean(String key, boolean value);
Headers addByte(String key, byte value);
Headers addShort(String key, short value);
Headers addInt(String key, int value);
Headers addLong(String key, long value);
Headers addFloat(String key, float value);
Headers addDouble(String key, double value);
Headers addBytes(String key, byte[] value);
Headers addList(String key, List<?> value, Schema schema);
Headers addMap(String key, Map<?, ?> value, Schema schema);
Headers addStruct(String key, Struct value);
Headers addDecimal(String key, BigDecimal value);
Headers addDate(String key, java.util.Date value);
Headers addTime(String key, java.util.Date value);
Headers addTimestamp(String key, java.util.Date value); // Remove and/or retain the latest Header
Headers clear();
Headers remove(String key);
Headers retainLatest(String key);
Headers retainLatest(); // Create a copy of this Headers object
Headers duplicate(); // Apply transformations to named or all Header objects
Headers apply(HeaderTransform transform);
Headers apply(String key, HeaderTransform transform); interface HeaderTransform {
Header apply(Header header);
}
}

2.Connect Records

每條 Kafka 訊息都包含零個或多個標頭名稱-值對,因此 Connect 記錄類將被修改為具有Headers可以就地修改的非空物件。現有的 ConnectRecord 抽象類是兩個基類 SourceRecord和 SinkRecord,並且將被改變為具有新的 headers填充欄位 ConnectHeaders物件。所有現有建構函式和方法的簽名都將保持不變以保持後向相容性,但現有建構函式將headers使用ConnectHeaders物件填充新欄位。而且, toString(), hashCode()和 equalTo(Object)方法將改為使用新的 headers領域。
一個新的建構函式和幾個新方法將被新增到這個現有的類中,程式碼片段如下所示:

package org.apache.kafka.connect.connector;
public abstract class ConnectRecord<R extends ConnectRecord<R>> { /* The following will be added to this class */ private final Headers headers;
public ConnectRecord(String topic, Integer kafkaPartition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);
if (headers == null) {
this.headers = new ConnectHeaders();
} else if (headers instanceof ConnectHeaders) {
this.headers = (ConnectHeaders)headers;
} else {
this.headers = new ConnectHeaders(headers);
}
} public Headers headers() {
return headers;
} public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema,
Object key, Schema valueSchema, Object value, Long timestamp,
Iterable<Header> headers);
}

現有的 SourceRecord類將被修改以新增一個新的建構函式並實現附加 newRecord(...)方法。同樣,所有現有建構函式和方法的簽名將保持不變以保持向後相容性。程式碼片段如下所示:

package org.apache.kafka.connect.source;
public class SourceRecord extends ConnectRecord<SourceRecord> { /* The following will be added to this class */ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
} @Override
public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers);
}
}

同樣,SinkRecord 將修改現有 類以新增新的建構函式並實現附加 newRecord(...) 方法。同樣,所有現有建構函式和方法的簽名將保持不變以保持向後相容性。程式碼片段如下所示:

package org.apache.kafka.connect.sink;
public class SinkRecord extends ConnectRecord<SinkRecord> { /* The following will be added to this class */ public SinkRecord(String topic, int partition,
Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
Long timestamp, TimestampType timestampType, Iterable<Header> headers) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
this.kafkaOffset = kafkaOffset;
this.timestampType = timestampType;
} @Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
}
}

3.序列化與反序列化

本次更新中添加了一個新 org.apache.kafka.connect.storage.HeaderConverter 介面,該org.apache.kafka.connect.storage.Converter介面在現有介面的基礎上進行了模式化, 但具有特定於 Header 的方法名稱和簽名。程式碼片段如下所示:

package org.apache.kafka.connect.storage;
public interface HeaderConverter extends Configurable, Closeable { /**
* Convert the header name and byte array value into a {@link Header} object.
* @param topic the name of the topic for the record containing the header
* @param headerKey the header's key; may not be null
* @param value the header's raw value; may be null
* @return the {@link SchemaAndValue}; may not be null
*/
SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value); /**
* Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation.
* @param topic the name of the topic for the record containing the header
* @param headerKey the header's key; may not be null
* @param schema the schema for the header's value; may be null
* @param value the header's value to convert; may be null
* @return the byte array form of the Header's value; may be null if the value is null
*/
byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value); /**
* Configuration specification for this set of header converters.
* @return the configuration specification; may not be null
*/
ConfigDef config();
}

需要注意的是,不同的是 Converter,新 HeaderConverter介面擴充套件了 Configurable 現在對於可能具有附加配置屬性的 Connect 介面通用的介面。

現有實現 Converter 也可能實現 HeaderConverter,並且ConverterConnect 中的所有三個現有 實現都將相應地更改以通過序列化/反序列化 Header 值來實現這個新介面,類似於它們序列化/反序列化鍵和值的方式:

  • StringConverter
  • ByteArrayConverter
  • JsonConverter

HeaderConverter 將新增一個新實現來將所有內建原語、陣列、對映和結構與字串表示形式相互轉換。與StringConverter使用 toString()方法的不同 SimpleHeaderConverter,除了不帶引號的簡單字串值之外, 使用類似 JSON 的表示形式表示基本型別、陣列、對映和結構。這種形式直接對應於許多開發人員認為將值序列化為字串的方式,並且可以 SimpleHeaderConverter解析這些任何和所有這樣的值,並且大部分時間來推斷正確的模式。因此,這將用於HeaderConverterConnect 工作程式中使用的預設值 。

下表描述了SimpleHeaderConverter將如何持久化這些值,表格如下:

型別 描述 例子
BOOLEAN true或者false  
 BYTE_ARRAY 位元組陣列的Base64編碼字串  
INT8 Java位元組的字串表示形式  
INT16 Java Short的字串表示形式  
INT32 Java Int的字串表示形式  
INT64 Java Long的字串表示形式  
FLOAT32 Java 浮點數的字串表示形式  
FLOAT64 Java Double的字串表示形式  
STRING 字串的UTF-8表示  
ARRAY 陣列的類似 JSON 的表示形式。陣列值可以是任何型別,包括基本型別和非基本型別。  
MAP 類似 JSON 的表示形式。儘管大多數正確建立的對映都具有相同型別的鍵和值,但也支援具有任何鍵和值的對映。對映值可以是任何型別,包括基本型別和非基本型別。 { "foo": "value", "bar": "strValue", "baz": "other" }
STRUCT 類似 JSON 的表示形式。Struct 物件可以序列化,但反序列化時將始終解析為對映,因為模式不包含在序列化形式中。 { "foo": true, "bar": "strValue", "baz": 1234 }
DECIMAL 對應的字串表示java.math.BigDecimal。  
TIME IOS-8601 時間表示,格式為“HH:mm:ss.SSS'Z'”。 16:31:05.387UTC
DATE 日期的 ISO-8601 表示,格式為“YYYY-MM-DD”。 2021-09-25
TIMESTAMP 時間戳的 ISO-8601 表示,格式為“YYYY-MM-DD'T'HH:mm:ss.SSS'Z'”。

2021-09-25T 16:31:05.387UTC

4.屬性配置

Connect 工作器需要配置為使用 HeaderConverter 實現,因此header.converter 將定義一個名為的附加工作器配置 ,預設為 SimpleHeaderConverter. 具有相同名稱和預設值的類似配置屬性將新增到聯結器配置中,允許聯結器覆蓋工作程式的 Header 轉換器。請注意,每個聯結器任務都有自己的標頭轉換器例項,就像鍵和值轉換器一樣。

5.轉換 Header 值

每個 Header 都有一個可由接收器聯結器和簡單訊息轉換使用的值。但是,標頭值的型別首先取決於標頭的建立方式以及它們的序列化和反序列化方式。將新增一組新的轉換實用程式方法,使 SMT 和接收器聯結器可以輕鬆地將標頭值轉換為易於使用的型別。這些轉換可能需要原始架構和值。與字串之間的轉換使用與上述相同的機制SimpleHeaderConverter。
例如,SMT 或接收器聯結器可能期望標頭值為 long,並且可以使用這些實用方法來轉換任何數值(例如,int、short、String、BigDecimal 等)。或者,接收器聯結器可能需要 Timestamp 邏輯資料型別,因此它可以使用該 Values.convertToTimestamp(s,v) 方法從時間戳或日期的任何 ISO-8601 格式字串表示轉換,或表示為 long 或字串的過去紀元的毫秒數。
這些實用方法可用於 Header 值或鍵、值或結構、陣列和對映中的任何值。程式碼片段如下所示:

package org.apache.kafka.connect.data;
public class Values { // All methods return null when value is null, and throw a DataException
// if the value cannot be converted to the desired type.
// If the value is already the desired type, these methods simply return it.
public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {...}
public static Byte convertToByte(Schema schema, Object value) throws DataException {...}
public static Short convertToShort(Schema schema, Object value) throws DataException {...}
public static Integer convertToInteger(Schema schema, Object value) throws DataException {...}
public static Long convertToLong(Schema schema, Object value) throws DataException {...}
public static Float convertToFloat(Schema schema, Object value) throws DataException {...}
public static Double convertToDouble(Schema schema, Object value) throws DataException {...}
public static String convertToString(Schema schema, Object value) {...}
public static java.util.Date convertToTime(Schema schema, Object value) throws DataException {...}
public static java.util.Date convertToDate(Schema schema, Object value) throws DataException {...}
public static java.util.Date convertToTimestamp(Schema schema, Object value) throws DataException {...}
public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) throws DataException {...} // These only support converting from a compatible string form, which is the same
// format used in the SimpleHeaderConverter described above
public static List<?> convertToList(Object value) {...}
public static Map<?, ?> convertToMap(Object value) {...} // Only supports returning the value if it already is a Struct.
public static Struct convertToStruct(Object value) {...}
}

3.優化與調整

在 Kafka 3.0 中優化和調整了以下內容:

  • [ KAFKA-3745 ] - 考慮向 ValueJoiner 介面新增連線鍵
  • [ KAFKA-4793 ] - Kafka Connect: POST /connectors/(string: name)/restart 不會啟動失敗的任務
  • [ KAFKA-5235 ] - GetOffsetShell:支援多個主題和消費者配置覆蓋
  • [ KAFKA-6987 ] - 用 CompletableFuture 重新實現 KafkaFuture
  • [ KAFKA-7458 ] - 在引導階段避免強制處理
  • [ KAFKA-8326 ] - 新增 Serde> 支援
  • [ KAFKA-8372 ] - 刪除不推薦使用的 RocksDB#compactRange API
  • [ KAFKA-8478 ] - 在強制處理之前輪詢更多記錄
  • [ KAFKA-8531 ] - 更改預設複製因子配置
  • [ KAFKA-8613 ] -對流中的視窗操作強制使用寬限期
  • [ KAFKA-8897 ] - RocksDB 增加版本
  • [ KAFKA-9559 ] - 將預設的“預設 serde”從 ByteArraySerde 更改為 null
  • [ KAFKA-9726 ] - MM2 模仿 MM1 的 IdentityReplicationPolicy
  • [ KAFKA-10062 ] - 新增一種方法來檢索 Streams 應用程式已知的當前時間戳
  • [ KAFKA-10201 ] - 更新程式碼庫以使用更具包容性的術語
  • [ KAFKA-10449 ] - Connect-distributed 示例配置檔案沒有針對偵聽器的說明
  • [ KAFKA-10585 ] - Kafka Streams 應該從清理中清理狀態儲存目錄
  • [ KAFKA-10619 ] - Producer 將預設啟用 EOS
  • [ KAFKA-10675 ] - 來自 ConnectSchema.validateValue() 的錯誤訊息應包括架構的名稱。
  • [ KAFKA-10697 ] - 刪除 ProduceResponse.responses
  • [ KAFKA-10746 ] - 消費者輪詢超時到期應記錄為警告而不是資訊。
  • [ KAFKA-10767 ] - 為 ThreadCacheTest 中缺少的方法新增單元測試用例
  • [ KAFKA-10769 ] - 刪除 JoinGroupRequest#containsValidPattern 因為它與 Topic#containsValidPattern 重複
  • [ KAFKA-10885 ] - 重構 MemoryRecordsBuilderTest/MemoryRecordsTest 以避免大量(不必要的)被忽略的測試用例
  • [ KAFKA-12177 ] - 保留不是冪等的
  • [ KAFKA-12234 ] - 擴充套件 OffsetFetch 請求以接受多個組 ID。
  • [ KAFKA-12287 ] - 當按時間戳或持續時間重置偏移量找不到偏移量並預設為最新時,在消費者組上新增警告日誌記錄。
  • [ KAFKA-12288 ] - 刪除任務級檔案系統鎖
  • [ KAFKA-12294 ] - 考慮使用轉發機制來建立元資料自動主題
  • [ KAFKA-12313 ] - 考慮棄用 default.windowed.serde.inner.class 配置
  • [ KAFKA-12329 ] - 當主題不存在時,kafka-reassign-partitions 命令應該給出更好的錯誤資訊
  • [ KAFKA-12335 ] - 將 junit 從 5.7.0 升級到 5.7.1
  • [ KAFKA-12344 ] - 在 Scala API 中支援 SlidingWindows
  • [ KAFKA-12347 ] - 提高 Kafka Streams 跟蹤進度的能力
  • [ KAFKA-12349 ] - 跟進 KIP-500 中的 PartitionEpoch
  • [ KAFKA-12362 ] - 確定任務是否空閒
  • [ KAFKA-12379 ] - KIP-716:允許使用 MirrorMaker2 配置 offsetsync 主題的位置
  • [ KAFKA-12396 ] - 收到空金鑰時kstreams 的專用異常
  • [ KAFKA-12398 ] - 修復脆弱的測試 `ConsumerBounceTest.testClose`
  • [ KAFKA-12408 ] - 文件省略了 ReplicaManager 指標
  • [ KAFKA-12409 ] - ReplicaManager 中的計量器洩漏
  • [ KAFKA-12415 ] - 為 Gradle 7.0 做準備並限制非 api 依賴項的傳遞範圍
  • [ KAFKA-12419 ] - 刪除 3.0 中棄用的 Kafka Streams API
  • [ KAFKA-12436 ] - 棄用 MirrorMaker v1
  • [ KAFKA-12439 ] - 在 KIP-500 模式下,我們應該能夠為被圍欄的節點分配新的分割槽
  • [ KAFKA-12442 ] - 將 ZSTD JNI 從 1.4.8-4 升級到 1.4.9-1
  • [ KAFKA-12454 ] - 噹噹前 kafka 叢集中不存在給定的 brokerIds 時,在 kafka-log-dirs 上新增錯誤日誌記錄
  • [ KAFKA-12464 ] - 增強約束粘性分配演算法
  • [ KAFKA-12479 ] - 在 ConsumerGroupCommand中將分割槽偏移請求合併為單個請求
  • [ KAFKA-12483 ] - 預設情況下在聯結器配置中啟用客戶端覆蓋
  • [ KAFKA-12484 ] - 預設情況下啟用 Connect 的聯結器日誌上下文
  • [ KAFKA-12499 ] - 根據 Streams EOS 上的提交間隔調整事務超時
  • [ KAFKA-12509 ] - 加強 StateDirectory 執行緒鎖定
  • [ KAFKA-12541 ] - 擴充套件 ListOffset 以獲取具有最大時間戳的偏移量 (KIP-734)
  • [ KAFKA-12573 ] - 刪除了不推薦使用的`Metric#value`
  • [ KAFKA-12574 ] - 棄用 eos-alpha
  • [ KAFKA-12577 ] - 刪除不推薦使用的 `ConfigEntry` 建構函式
  • [ KAFKA-12584 ] - 刪除不推薦使用的 `Sum` 和 `Total` 類
  • [ KAFKA-12591 ] - 刪除不推薦使用的 `quota.producer.default` 和 `quota.consumer.default` 配置
  • [ KAFKA-12612 ] - 從 3.0 中的 ConsumerRecord/RecordMetadata 中刪除校驗和
  • [ KAFKA-12614 ] - 使用 Jenkinsfile 進行主幹和釋出分支構建
  • [ KAFKA-12620 ] - 控制器生成的生產者 ID
  • [ KAFKA-12637 ] - 刪除不推薦使用的 PartitionAssignor 介面
  • [ KAFKA-12662 ] - 為 ProducerPerformance 新增單元測試
  • [ KAFKA-12663 ] - 更新 FindCoordinator 以一次解析多個 Coordinator
  • [ KAFKA-12675 ] - 提高粘性通用分配器的可擴充套件性和效能
  • [ KAFKA-12779 ] - TaskMetadata 應該返回實際的 TaskId 而不是純字串
  • [ KAFKA-12788 ] - 改進 KRaft 副本放置
  • [ KAFKA-12803 ] - 支援在 KRaft 模式下重新分配分割槽
  • [ KAFKA-12819 ] - 測試的生活質量改進
  • [ KAFKA-12849 ] - 考慮將 TaskMetadata 遷移到與內部實現的介面
  • [ KAFKA-12874 ] - 將預設消費者會話超時增加到 45 秒 (KIP-735)
  • [ KAFKA-12906 ] - 消費者應在反序列化異常中包含分割槽和偏移量
  • [ KAFKA-12909 ] - 允許使用者選擇加入虛假的左/外流流加入改進
  • [ KAFKA-12921 ] - 將 ZSTD JNI 從 1.4.9-1 升級到 1.5.0-1
  • [ KAFKA-12922 ] - MirrorCheckpointTask 應該關閉主題過濾器
  • [ KAFKA-12931 ] - KIP-746:修改 KRaft 元資料記錄
  • [ KAFKA-12934 ] - 將一些控制器類移動到元資料包
  • [ KAFKA-12981 ] - 確保同步讀取/更新 LogSegment.maxTimestampSoFar 和 LogSegment.offsetOfMaxTimestampSoFar
  • [ KAFKA-13000 ] - 改進 MockClient 中 UnsupportedVersionException 的處理
  • [ KAFKA-13021 ] - 從 KIP-633 改進 API 更改和地址跟進的 Javadocs
  • [ KAFKA-13026 ] - 冪等生產者 (KAFKA-10619) 後續測試
  • [ KAFKA-13041 ] - 支援使用 ducker-ak 除錯系統測試
  • [ KAFKA-13209 ] - 升級碼頭伺服器以修復 CVE-2021-34429
  • [ KAFKA-13258 ] - AlterClientQuotas 響應失敗時不包含錯誤
  • [ KAFKA-13259 ] - DescribeProducers 響應在失敗時不包含錯誤
  • [ KAFKA-13260 ] - FindCoordinator errorCounts 不處理 v4

4.BUG修復

在 Kafka 3.0 中修復瞭如下BUG:

  • [ KAFKA-3968 ] - 將新 FileMessageSet 重新整理到磁碟時,不會在父目錄上呼叫 fsync()
  • [ KAFKA-5146 ] - Kafka Streams:刪除對 connect-json 的編譯依賴
  • [ KAFKA-6435 ] - 應用程式重置工具可能會刪除不正確的內部主題
  • [ KAFKA-7421 ] - 類載入期間 Kafka Connect 中的死鎖
  • [ KAFKA-8315 ] - 歷史連線問題
  • [ KAFKA-8562 ] - 儘管 KAFKA-5051,SASL_SSL 仍然執行反向 DNS 查詢
  • [ KAFKA-8784 ] - 刪除 RocksDBConfigSetter#close 的預設實現
  • [ KAFKA-8940 ] - 片狀測試 SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
  • [ KAFKA-9186 ] - Kafka Connect 用可能來自 DelegatingClassLoader 的錯誤訊息淹沒日誌
  • [ KAFKA-9189 ] - 如果與 Zookeeper 的連線丟失,則會阻止關閉
  • [ KAFKA-9295 ] - KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
  • [ KAFKA-9527 ] - 當 --to-datetime 或 --by-duration 在具有空分割槽的 --input-topics 上執行時,應用程式重置工具返回 NPE
  • [ KAFKA-9672 ] - ISR 中的死代理導致 isr-expiration 失敗並出現異常
  • [ KAFKA-9858 ] - bzip2 1.0.6 中 bzip2recover 中的 CVE-2016-3189 釋放後使用漏洞允許遠端攻擊者通過精心製作的 bzip2 檔案導致拒絕服務(崩潰),與設定為之前的塊端相關塊的開始。
  • [ KAFKA-10046 ] - 棄用的 PartitionGrouper 配置被忽略
  • [ KAFKA-10192 ] - 片狀測試 BlockingConnectorTest#testBlockInConnectorStop
  • [ KAFKA-10340 ] - 在嘗試為不存在的主題生成記錄而不是永遠掛起時,源聯結器應該報告錯誤
  • [ KAFKA-10614 ] - 選舉/辭職的組協調員應防範領導時代
  • [ KAFKA-12170 ] - Connect Cast 無法正確處理“位元組”型別的欄位
  • [ KAFKA-12252 ] - 當工人失去領導權時,分散式牧人滴答執行緒快速迴圈
  • [ KAFKA-12262 ] - 當擁有金鑰的追隨者成為領導者時,永遠不會分發新的會話金鑰
  • [ KAFKA-12297 ] - MockProducer 的實現與非同步傳送回撥的文件相矛盾
  • [ KAFKA-12303 ] - 當存在空值時,Flatten SMT 會刪除一些欄位
  • [ KAFKA-12308 ] - ConfigDef.parseType 死鎖
  • [ KAFKA-12330 ] - 當 FetchResponse 已滿時,FetchSessionCache 可能會導致分割槽飢餓
  • [ KAFKA-12336 ] - 使用命名的 Consumed 引數呼叫 stream[K, V](topicPattern: Pattern) API 時自定義流命名不起作用
  • [ KAFKA-12350 ] - 關於refresh.topics.interval.seconds預設值不正確的文件
  • [ KAFKA-12393 ] - 記錄多租戶注意事項
  • [ KAFKA-12426 ] - 缺少在 RaftReplicaManager 中建立 partition.metadata 檔案的邏輯
  • [ KAFKA-12427 ] - Broker 不會關閉帶有緩衝資料的靜音空閒連線
  • [ KAFKA-12474 ] - 如果無法寫入新的會話金鑰,Worker 可能會死
  • [ KAFKA-12492 ] - 示例 RocksDBConfigSetter 的格式混亂
  • [ KAFKA-12514 ] - SubscriptionState 中的 NPE
  • [ KAFKA-12520 ] - 在啟動時不必要地重建生產者狀態
  • [ KAFKA-12522 ] - Cast SMT 應該允許空值記錄通過
  • [ KAFKA-12548 ] - 無效的記錄錯誤訊息未傳送到應用程式
  • [ KAFKA-12557 ] - org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse 間歇性地無限期掛起
  • [ KAFKA-12611 ] - 修復了在 ProducerPerformance 中錯誤地使用隨機負載的問題
  • [ KAFKA-12619 ] - 確保在初始化高水印之前提交 LeaderChange 訊息
  • [ KAFKA-12650 ] - InternalTopicManager#cleanUpCreatedTopics 中的 NPE
  • [ KAFKA-12655 ] - CVE-2021-28165 - 將碼頭升級到 9.4.39
  • [ KAFKA-12660 ] - 追加失敗後不更新偏移提交感測器
  • [ KAFKA-12661 ] - 當值不為空時,ConfigEntry#equal 不比較其他欄位
  • [ KAFKA-12667 ] - StateDirectory 關閉時錯誤日誌不正確
  • [ KAFKA-12672 ] - 執行 test -kraft -server-start 導致錯誤
  • [ KAFKA-12677 ] - raftCluster 總是傳送到錯誤的活動控制器並且從不更新
  • [ KAFKA-12684 ] - 有效的分割槽列表被成功選擇的分割槽列表錯誤地替換
  • [ KAFKA-12686 ] - AlterIsr 響應處理中的競爭條件
  • [ KAFKA-12691 ] - TaskMetadata timeSinceIdlingStarted 未正確報告
  • [ KAFKA-12700 ] - admin.listeners 配置在文件中有不穩定的有效值
  • [ KAFKA-12702 ] - InterBrokerSendThread 中捕獲的未處理異常
  • [ KAFKA-12718 ] - SessionWindows 過早關閉
  • [ KAFKA-12730 ] - 單個 Kerberos 登入失敗會導致 Java 9 以後的所有連線失敗
  • [ KAFKA-12747 ] - 片狀測試 RocksDBStoreTest.shouldReturnUUIDsWithStringPrefix
  • [ KAFKA-12749 ] - 被抑制的 KTable 上的更新日誌主題配置丟失
  • [ KAFKA-12752 ] - CVE-2021-28168 將球衣升級到 2.34 或 3.02
  • [ KAFKA-12754 ] - 讀取偏移量時,TaskMetadata endOffsets 不會更新
  • [ KAFKA-12777 ] - AutoTopicCreationManager 不處理響應錯誤
  • [ KAFKA-12782 ] - Javadocs 搜尋將您傳送到一個不存在的 URL
  • [ KAFKA-12792 ] - 修復指標錯誤並引入 TimelineInteger
  • [ KAFKA-12815 ] - KTable.transformValue 可能有不正確的記錄元資料
  • [ KAFKA-12835 ] - 代理上的主題 ID 可能不匹配(代理間協議版本更新後)
  • [ KAFKA-12851 ] - 片狀測試 RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
  • [ KAFKA-12856 ] - 將 Jackson 升級到 2.12.3
  • [ KAFKA-12865 ] - 描述 ACL 中管理客戶端 API 的文件錯誤
  • [ KAFKA-12866 ] - 即使使用 chroot Kafka 也需要 ZK root 訪問許可權
  • [ KAFKA-12867 ] - Trogdor ConsumeBenchWorker 使用 maxMessages 配置提前退出
  • [ KAFKA-12870 ] - RecordAccumulator 卡在重新整理狀態
  • [ KAFKA-12880 ] - 在 3.0 中刪除不推薦使用的 Count 和 SampledTotal
  • [ KAFKA-12889 ] - 日誌清理組考慮空日誌段以避免留下空日誌
  • [ KAFKA-12890 ] - 消費者組陷入“CompletingRebalance”
  • [ KAFKA-12896 ] - 由重複的組長 JoinGroups 引起的組重新平衡迴圈
  • [ KAFKA-12897 ] - KRaft 控制器無法在單個代理叢集上建立具有多個分割槽的主題
  • [ KAFKA-12898 ] - 訂閱中擁有的分割槽必須排序
  • [ KAFKA-12904 ] - Connect 的驗證 REST 端點使用不正確的超時
  • [ KAFKA-12914 ] - StreamSourceNode.toString() 丟擲 StreamsBuilder.stream(Pattern) ctor
  • [ KAFKA-12925 ] - 中間介面缺少字首掃描
  • [ KAFKA-12926 ] - 執行 kafka-consumer-groups.sh 時,ConsumerGroupCommand 的 java.lang.NullPointerException 出現負偏移
  • [ KAFKA-12945 ] - 刪除 3.0 中的埠、主機名和相關配置
  • [ KAFKA-12948 ] - 節點處於連線狀態的 NetworkClient.close(node) 使 NetworkClient 無法使用
  • [ KAFKA-12949 ] - TestRaftServer 的 scala.MatchError:test-kraft-server-start.sh 上的 null
  • [ KAFKA-12951 ] - 恢復 GlobalKTable 時的無限迴圈
  • [ KAFKA-12964 ] - 損壞的段恢復可以刪除新的生產者狀態快照
  • [ KAFKA-12983 ] - 在加入組之前並不總是呼叫 onJoinPrepare
  • [ KAFKA-12984 ] - 合作粘性分配器可能會因無效的 SubscriptionState 輸入元資料而卡住
  • [ KAFKA-12991 ] - 修復對 `AbstractCoordinator.state` 的不安全訪問
  • [ KAFKA-12993 ] - Streams“記憶體管理”文件的格式混亂
  • [ KAFKA-12996 ] - 當獲取偏移量小於領導者起始偏移量時,未正確處理 OffsetOutOfRange 以用於發散時期
  • [ KAFKA-13002 ] - 對於非 MAX_TIMESTAMP 規範,listOffsets 必須立即降級
  • [ KAFKA-13003 ] - KafkaBroker 通告套接字埠而不是配置的通告埠
  • [ KAFKA-13007 ] - KafkaAdminClient getListOffsetsCalls 為每個主題分割槽構建叢集快照
  • [ KAFKA-13008 ] - 流將在等待分割槽延遲時長時間停止處理資料
  • [ KAFKA-13010 ] - 片狀測試 org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
  • [ KAFKA-13029 ] - FindCoordinators 批處理可能會在滾動升級期間破壞消費者
  • [ KAFKA-13033 ] - 協調器不可用錯誤應該導致新增到取消對映列表中進行新的查詢
  • [ KAFKA-13037 ] - “執行緒狀態已經是 PENDING_SHUTDOWN” 日誌垃圾郵件
  • [ KAFKA-13053 ] - KRaft 記錄的凹凸框架版本
  • [ KAFKA-13056 ] - 當控制器共同駐留時,代理不應生成快照
  • [ KAFKA-13057 ] - 許多代理 RPC 在 KRaft 模式下未啟用
  • [ KAFKA-13058 ] - `AlterConsumerGroupOffsetsHandler` 不能正確處理分割槽錯誤。
  • [ KAFKA-13073 ] - 由於 MockLog 的實現不一致,模擬測試失敗
  • [ KAFKA-13078 ] - 過早關閉 FileRawSnapshotWriter
  • [ KAFKA-13080 ] - 獲取快照請求未定向到控制器中的kraft
  • [ KAFKA-13092 ] - LISR 請求中的效能迴歸
  • [ KAFKA-13096 ] - 新增/刪除/替換執行緒時不會更新 QueryableStoreProvider 呈現 IQ 不可能
  • [ KAFKA-13098 ] - 在元資料日誌目錄中恢復快照時沒有此類檔案異常
  • [ KAFKA-13099 ] - 使 transactionalIds 過期時訊息太大錯誤
  • [ KAFKA-13100 ] - 控制器無法恢復到記憶體快照
  • [ KAFKA-13104 ] - 控制器應在 RaftClient 辭職時通知它
  • [ KAFKA-13112 ] - 控制器提交的偏移量與 raft 客戶端偵聽器上下文不同步
  • [ KAFKA-13119 ] - 在啟動時驗證 KRaft controllerListener 配置
  • [ KAFKA-13127 ] - 修復雜散分割槽查詢邏輯
  • [ KAFKA-13129 ] - 修復與 ConfigCommand 更改相關的損壞系統測試
  • [ KAFKA-13132 ] - 在 LISR 請求中升級到主題 ID 在 3.0 中引入了差距
  • [ KAFKA-13137 ] - KRaft 控制器指標 MBean 名稱被錯誤引用
  • [ KAFKA-13139 ] - 在沒有任務的情況下請求重新啟動聯結器後的空響應導致 NPE
  • [ KAFKA-13141 ] - 如果存在分歧時期,領導者不應更新追隨者獲取偏移量
  • [ KAFKA-13143 ] - 禁用 KRaft 控制器的元資料端點
  • [ KAFKA-13160 ] - 修復了在使用 KRaft 時呼叫代理的配置處理程式以傳遞預期預設資源名稱的程式碼。
  • [ KAFKA-13161 ] - 在 KRaft 中分割槽更改後未更新跟隨者領導者和 ISR 狀態
  • [ KAFKA-13167 ] - KRaft 代理應在受控關閉期間立即心跳
  • [ KAFKA-13168 ] - KRaft 觀察者不應該有副本 ID
  • [ KAFKA-13173 ] - KRaft 控制器不能正確處理同時代理到期
  • [ KAFKA-13198 ] - TopicsDelta 在處理 PartitionChangeRecord 時不會更新已刪除的主題
  • [ KAFKA-13214 ] - 消費者在斷開連線後不應重置組狀態
  • [ KAFKA-13215 ] - 片狀測試 org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
  • [ KAFKA-13219 ] - BrokerState 指標不適用於 KRaft 叢集
  • [ KAFKA-13262 ] - 模擬客戶端現在有最終的 close() 方法
  • [ KAFKA-13266 ] - 從提取器中刪除分割槽後應建立“InitialFetchState”
  • [ KAFKA-13270 ] - Kafka 可能無法連線到 ZooKeeper,永遠重試,永遠不會啟動
  • [ KAFKA-13276 ] - 公共 DescribeConsumerGroupsResult 建構函式指的是 KafkaFutureImpl
  • [ KAFKA-13277 ] - 請求/響應中長標記字串的序列化丟擲 BufferOverflowException

5.任務

在 Kafka 3.0 中的開發任務如下:

  • [ KAFKA-8405 ] - 刪除不推薦使用的 `kafka-preferred-replica-election` 命令
  • [ KAFKA-8734 ] - 刪除 PartitionAssignorAdapter 和不推薦使用的 PartitionAssignor 介面
  • [ KAFKA-10070 ] - 引數化連線單元測試以刪除程式碼重複
  • [ KAFKA-10091 ] - 改善任務空閒
  • [ KAFKA-12482 ] - 刪除不推薦使用的 rest.host.name 和 rest.port Connect worker 配置
  • [ KAFKA-12519 ] - 考慮刪除舊的內建指標版本的流
  • [ KAFKA-12578 ] - 刪除不推薦使用的安全類/方法
  • [ KAFKA-12579 ] - 從 3.0 的客戶端中刪除各種不推薦使用的方法
  • [ KAFKA-12581 ] - 刪除不推薦使用的 Admin.electPreferredLeaders
  • [ KAFKA-12588 ] - 在 shell 命令中刪除不推薦使用的 --zookeeper
  • [ KAFKA-12590 ] - 刪除不推薦使用的 SimpleAclAuthorizer
  • [ KAFKA-12592 ] - 刪除不推薦使用的 LogConfig.Compact
  • [ KAFKA-12600 ] - 刪除客戶端配置`client.dns.lookup`的棄用配置值`default`
  • [ KAFKA-12625 ] - 修復通知檔案
  • [ KAFKA-12717 ] - 刪除內部轉換器配置屬性
  • [ KAFKA-12724 ] - 將 2.8.0 新增到系統測試和流升級測試
  • [ KAFKA-12794 ] - DescribeProducersRequest.json 中的尾隨 JSON 令牌可能會導致某些 JSON 解析器中的解析錯誤
  • [ KAFKA-12800 ] - 配置 jackson 以拒絕生成器中的尾隨輸入
  • [ KAFKA-12820 ] - 升級 maven-artifact 依賴以解決 CVE-2021-26291
  • [ KAFKA-12976 ] - 從刪除主題呼叫中刪除UNSUPPORTED_VERSION錯誤
  • [ KAFKA-12985 ] - CVE-2021-28169 - 將碼頭升級到 9.4.42
  • [ KAFKA-13035 ] - Kafka Connect:更新 POST /connectors/(string: name)/restart 文件以包含任務重啟行為
  • [ KAFKA-13051 ] - 需要為 3.0 定義 Principal Serde
  • [ KAFKA-13151 ] - 在 KRaft 中禁止策略配置

6.總結

Kafka 3.0 的釋出標誌著社群對 Kafka 專案邁向了一個新的里程牌。另外,感謝Kafka PMC對Kafka Eagle監控系統的認可,為了維護Apache社群的商標權益,現在對Kafka Eagle正式改名為EFAKEagle For Apache Kafka),EFAK會持續更新迭代優化,為大家管理Kafka叢集和使用Kafka應用提供便利,歡迎大家使用EFAK,也可以到Github或者EAFK官網上關注 EFAK 的最新動態。

7.結束語

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大資料探勘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學視訊。