1. 程式人生 > >kafka-python的API簡單介紹

kafka-python的API簡單介紹

在上一篇文章中說明了kafka-python的API使用的理論概念,這篇文章來說明API的實際使用。

在官方文件詳細列出了kafka-python的API介面https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

對於生成者我們著重於介紹一個send方法,其餘的方法提到的時候會說明,在官方文件中有許多可配置引數可以檢視,也可以檢視上一篇博文中的引數。

#send方法的詳細說明,send用於向主題傳送資訊
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic (str)
– topic where the message will be published,指定向哪個主題傳送訊息。
value (optional) – message value. Must be type bytes, or be serializable to bytes via configured value_serializer. If value is None, key is required and message acts as a ‘delete’.
#value為要傳送的訊息值,必須為bytes型別,如果這個值為空,則必須有對應的key值,並且空值被標記為刪除。可以通過配置value_serializer引數序列化為位元組型別。

key (optional) – a key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer’s partitioner config is left as default),
then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be
type bytes, or be serializable to bytes via configured key_serializer.
         #key與value對應的鍵值,必須為bytes型別
。kafka根據key值確定訊息發往哪個分割槽(如果分割槽被指定則發往指定的分割槽),具有相同key的訊息被髮往同一個分割槽,如果key
#為NONE則隨機選擇分割槽,可以使用key_serializer引數序列化為位元組型別。
headers (optional) – a list of header key value pairs. List items are tuples of str key and bytes value.
#鍵值對的列表頭部,列表項是str(key)和bytes(value)。
timestamp_ms (int, optional) – epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.
#時間戳

訊息傳送成功,返回的是RecordMetadata的物件;否則的話引發KafkaTimeoutError異常

在進行實際測試前,先建立一個topics,這裡我們利用控制檯建立:

[[email protected] bin]# ./kafka-topics.sh --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --create --topic kafkatest --replication-factor 3 --partitions 3
Created topic "kafkatest".
[[email protected] bin]# ./kafka-topics.sh --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --list --topic kafkatest
kafkatest
[[email protected] bin]# ./kafka-topics.sh --zookeeper=10.0.102.204:2181,10.0.102.214:2181 --describe --topic kafkatest
Topic:kafkatest    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: kafkatest    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
    Topic: kafkatest    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: kafkatest    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
[[email protected] bin]#  

#主題有3個分割槽,3個複製係數,主題名為kafkatest.

 

一個簡易的生產者demo如下:(摘自:https://blog.csdn.net/luanpeng825485697/article/details/81036028

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=["10.0.102.214:9092"])

i = 20
while True:
    i += 1
    msg = "producer1+%d" % i
    print(msg)
    producer.send('kafkatest', key=bytes(str(i), value=msg.encode('utf-8'))
    time.sleep(1)

producer.close()

#就是一個簡易的while迴圈,不停的向kafka傳送訊息,一定要注意send傳送的key和value的值均為bytes型別。

一個消費者的demo接收上面生產者傳送的資料。

from kafka import KafkaConsumer

consumer = KafkaConsumer("kafkatest", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='latest')
for msg in consumer:
    key = msg.key.decode(encoding="utf-8")               #因為接收到的資料時bytes型別,因此需要解碼
    value = msg.value.decode(encoding="utf-8")
    print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))

#這是一個阻塞的過程,當生產者有訊息傳來的時候,就會讀取訊息,若是沒有訊息就會阻塞等待
#auto_offset_reset引數表示重置偏移量,有兩個取值,latest表示讀取訊息佇列中最新的訊息,另一個取值earliest表示讀取最早的訊息。

執行上面的兩個demo,得到的結果如下:

消費者群組

在上一篇博文中,說明了消費者群組與消費者的概念,這裡我們來定義一個消費者群組。

一個群組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分割槽的訊息。每個消費者接收主題一部分分割槽的訊息

建立一個消費者群組如下:

from kafka import KafkaConsumer
import time

#消費者群組中有一個group_id引數, consumer
= KafkaConsumer("kafkatest", group_id="test1", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='latest') for msg in consumer: key = msg.key.decode(encoding="utf-8") value = msg.value.decode(encoding="utf-8") print("%s-%d-%d key=%s value=%s" % (msg.topic, msg.partition, msg.offset, key, value))

消費者群組中的消費者總是消費訂閱主題的部分資料。

在pycharm中把上面的程式碼複製一份,這樣在一個test1群組中就有了兩個消費者,同時執行。

分析: kafkatest主題有3個分割槽,3個分割槽會被分配給test1群組中的兩個消費者,在上面一篇博文中提到,預設的分配策略時range。也就是說一個消費者可能由2個分割槽,另一個消費者只有一個分割槽;執行結果如下:

下面會通過例項來說明幾個消費者的方法的使用

 kafka-python的API官方文件介紹的很清楚,可以檢視:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer("kafkatest", group_id="test1", bootstrap_servers=["10.0.102.204:9092"])
>>> consumer.topics() #獲取主主題列表,返回的是一個set集合 {'kafkatest', 'lianxi', 'science'} >>> consumer.partitions_for_topic("kafkatest") #獲取主題的分割槽資訊 {0, 1, 2} >>> consumer.subscription() #獲取當前消費者訂閱的主題 {'kafkatest'}

>>> consumer.position((0,))      #得到下一個記錄的偏移量

TypeError: partition must be a TopicPartition namedtuple

#需要注意的是position方法需要傳入的是一個kafka-python自帶的一種資料結構TopicPartition,這種資料結構的定義如下,在使用的時候需要匯入
TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])

>>> consumer.position(TopicPartition(topic='kafkatest', partition=1))
17580

下面說明poll()方法的用法:

poll(timeout_ms=0, max_records=None)方法: 從指定的主題/分割槽中獲取資料
Records are fetched and returned in batches by topic-partition. On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek() or automatically set as the last committed offset for the subscribed list of partitions.
#通過主題-分割槽分批獲取和返回記錄,在每一個輪詢中,消費者將會使用最後消費的偏移量作為開始然後順序fetch資料。最後消費的偏移量可以使用seek()手動設定,或者自動設定為訂閱
#的分割槽列表的最後提交的偏移量。 Incompatible with iterator interface – use one or the other, not both. 與迭代器的介面是對立的。 timeout_ms (
int, optional) – Milliseconds spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any
                records that are available currently in the buffer, else returns empty. Must not be negative. Default: 0 max_records (int, optional) – The maximum number of records returned in a single call to poll(). Default: Inherit value from max_poll_records.
預設從max_poll_records繼承值。
#一個簡答的例項從kafka拉取資料
from kafka import KafkaConsumer import
time consumer = KafkaConsumer("kafkatest", bootstrap_servers=['10.0.102.204:9092']) while True: msg = consumer.poll(timeout_ms=5) print(msg) time.sleep(2)


#執行結果如下,返回的是一個字典,consumerRecord物件包含著訊息的一些元資料資訊
{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21929, timestamp=1545978879892, timestamp_type=0, key=b'138', value=b'producer1+138', checksum=-660348132, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22064, timestamp=1545978882893, timestamp_type=0, key=b'141', value=b'producer1+141', checksum=-1803506349, serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21930, timestamp=1545978880892, timestamp_type=0, key=b'139', value=b'producer1+139', checksum=-1863433503, serialized_key_size=3, serialized_value_size=13), ConsumerRecord(topic='kafkatest', partition=2, offset=21931, timestamp=1545978881893, timestamp_type=0, key=b'140', value=b'producer1+140', checksum=-280146643, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21932, timestamp=1545978884894, timestamp_type=0, key=b'143', value=b'producer1+143', checksum=1459018748, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=1): [ConsumerRecord(topic='kafkatest', partition=1, offset=22046, timestamp=1545978883894, timestamp_type=0, key=b'142', value=b'producer1+142', checksum=-2023137030, serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22065, timestamp=1545978885894, timestamp_type=0, key=b'144', value=b'producer1+144', checksum=1999922748, serialized_key_size=3, serialized_value_size=13)]}

seek()方法的用法:

seek(partition, offset)    
    
Manually specify the fetch offset for a TopicPartition.
#手動指定拉取主題的偏移量

Overrides the fetch offsets that the consumer will use on the next poll(). If this API is invoked for the same partition more than once, 
the latest offset will be used on the next poll(). #覆蓋下一個消費者使用poll()拉取的偏移量。如果這個API對同一個分割槽執行了多次,那麼最後一個次的結果將會被使用。 Note: You may lose data
if this API is arbitrarily used in the middle of consumption to reset the fetch offsets. #如果在消費過程中任意使用此API以重置提取偏移,則可能會丟失資料。


#例項如下
>>> consumer.position(TopicPartition(topic="kafkatest",partition=1))
22103

#使用seek()設定偏移量
>>> consumer.seek(partition=TopicPartition("kafkatest",1),offset=22222)
#需要說明的是seek函式有一個partition引數,但是這個引數必須是TopicPartition型別的。

>>> consumer.position(TopicPartition(topic="kafkatest",partition=1))
22222

與seek相關的還有兩個方法:

seek_to_beginning(*partitions)
#尋找分割槽最早可用的偏移量
seek_to_end(*partitions)
#尋找分割槽最近可用的偏移量

>>> consumer.seek_to_beginning(TopicPartition("kafkatest",1))
>>> consumer.seek_to_end(TopicPartition("kafkatest",1))

#注意這兩個方法的引數都是TopicPartition型別。

subscribe()方法,給當前消費者訂閱主題。

Subscribe to a list of topics, or a topic regex pattern.
#訂閱一個主體列表,或者主題的正則表示式
Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).
#分割槽將會通過分割槽協調器自動分配。主題訂閱不是增量的,這個列表將會替換已經存在的主題。

This method is incompatible with assign().    
#這個方法與assign()方法是不相容的。    

#說明一下listener引數:監聽回撥,該回調將在每次重新平衡操作之前和之後呼叫。
作為組管理的一部分,消費者將跟蹤屬於特定組的使用者列表,並在以下事件之一觸發時觸發重新平衡操作:

  任何訂閱主題的分割槽數都會發生變化   主題已建立或刪除   消費者組織的現有成員死亡   將新成員新增到使用者組 觸發任何這些事件時,將首先呼叫提供的偵聽器以指示已撤消使用者的分配,然後在收到新分配時再次呼叫。請注意,此偵聽器將立即覆蓋先前對subscribe的呼叫中設定的任何偵聽器。

但是,可以保證通過此介面撤消/分配的分割槽來自此呼叫中訂閱的主題。

>>> consumer.subscription()                       #當前消費者訂閱的主題
{'lianxi'}
>>> consumer.subscribe(("kafkatest","lianxi"))    #訂閱主題,會覆蓋之前的主題
>>> consumer.subscription()                       #可以看到已經覆蓋
{'lianxi', 'kafkatest'}

unsubscribe() :取消訂閱所有主題並清除所有已分配的分割槽。

assign(partitions)

Manually assign a list of TopicPartitions to this consumer.
#手動將TopicPartitions指定給此消費者。
#這個函式和subscribe函式不能同時使用
>>> consumer.assign(TopicPartition("kafkatest",1))

assignment():

Get the TopicPartitions currently assigned to this consumer.
如果分割槽是使用assign()直接分配的,那麼這將只返回先前分配的相同分割槽。如果使用subscribe()訂閱了主題,那麼這將給出當前分配給使用者的主題分割槽集(如果分配尚未發生,
或者分割槽正在重新分配的過程中,則可能是None)

beginning_offsets(partitions)

Get the first offset for the given partitions.     #得到給定分割槽的第一個偏移量

This method does not change the current consumer position of the partitions. #這個方法不會改變當前消費者的偏移量

This method may block indefinitely if the partition does not exist.  #這個方法可能會阻塞,如果給定的分割槽沒有出現。

partitions引數仍然是TopicPartition型別。
>>> consumer.beginning_offsets(TopicPartition("kafkatest",1))
#這個方法在kafka-python-1.3.1中沒有

close(autocommit=True)

Close the consumer, waiting indefinitely for any needed cleanup.   #關閉消費者,阻塞等待所需要的清理。

Keyword Arguments:
     autocommit (bool) – If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any 
pending consumed offsets prior to close. Default: True
#如果為此使用者配置了自動提交,則此可選標誌會導致使用者在關閉之前嘗試提交任何待處理的消耗偏移量。預設值:True

commit(offsets=None)

Commit offsets to kafka, blocking until success or error.
#提交偏移量到kafka,阻塞直到成功或者出錯
這隻向Kafka提交偏移量。使用此API提交的偏移量將在每次重新平衡之後的第一次取出時以及在啟動時使用。因此,如果需要在Kafka以外的任何地方儲存偏移,則不應該使用此API。
為了避免在重新啟動使用者時重新處理讀取的最後一條訊息,提交的偏移量應該是應用程式應該使用的下一條訊息,即:last_offset
+1

Parameters:    offsets (dict, optional) – {TopicPartition: OffsetAndMetadata} dict to commit with the configured group_id. Defaults to
currently consumed offsets for all subscribed partitions.

commit_async(offsets=None, callback=None)

Commit offsets to kafka asynchronously, optionally firing callback.
#非同步提交,可選擇的觸發回撥,其餘的和上面的commit一樣。

committed(partition)

Get the last committed offset for the given partition.

This offset will be used as the position for the consumer in the event of a failure.

如果有問題的分割槽未分配給此使用者,或者使用者尚未初始化其已提交偏移量快取,則此呼叫可能會阻止執行遠端呼叫。
>>> consumer.committed(TopicPartition("kafkatest",1))
22103

pase, pased和resume

pase:暫停當前正在進行的請求。需要使用resume恢復
pased:獲取使用pase暫停時的分割槽資訊
resume: 從pase狀態恢復。
除了pased之外,其餘兩個方法的引數均為TopicPartation型別

kafka-python除了有消費者和生成者之外,還有一個客戶端,下面我們來說明客戶端API。

客戶端API

客戶端API的官方文件為: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

簡單說明怎麼使用客戶端API建立主題。

>>> from kafka.client import KafkaClient
>>> kc = KafkaClient(bootstrap_servers="10.0.102.204:9092")
>>> kc.config        #配置還是蠻多的
{'bootstrap_servers': '10.0.102.204:9092', 'client_id': 'kafka-python-1.3.1', 'request_timeout_ms': 40000, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(6, 1, 1)], 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_password': None, 'ssl_crlfile': None, 'api_version': (0, 10), 'api_version_auto_timeout_ms': 2000, 'selector': <class 'selectors.SelectSelector'>, 'metrics': None, 'metric_group_prefix': '', 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None}
#這些引數的具體意思可以檢視上面的官方文件。

>>> kc.add_topic("clent-1") #新增主題
<kafka.future.Future object at 0x0000000003A92320>

kafka-python還提供了其餘兩個API,broker連線API和叢集連線API