1. 程式人生 > >Kafka認證權限配置(動態添加用戶)

Kafka認證權限配置(動態添加用戶)

store span current require gnu logging salt 一次 能夠

  之前寫過一篇Kafka ACL使用實戰,裏面演示了如何配置SASL PLAINTEXT + ACL來為Kafka集群提供認證/權限安全保障,但有一個問題經常被問到:這種方案下是否支持動態增加/移除認證用戶——這裏給出明確的答案:不可以!因為所有認證用戶信息全部配置在靜態的jaas文件中,故無法在不重啟broker的情況下實現用戶增減。這一次我以Kafka 2.1.0版本為例演示配置SASL SCRAM + ACL來實現如何動態增減用戶,另外也想完善和優化上一篇中的一些不足之處(比如說不用再修改初始的.sh腳本,改用環境變量的方式來使設置生效)。

1. 環境準備

Kafka服務器:一臺雲主機,4 core,8GB RAM,1Gbps帶寬

Kafka客戶端:另一臺雲主機

客戶端與服務器通過內網交互

2. 集群拓撲

啟動兩臺Kafka服務器,由於我只在一臺雲主機上演示,故上面啟動兩個broker實例。客戶端這邊使用console-producer和console-consumer腳本來模擬客戶端程序。

3. 創建用戶

  我們使用kafka-configs.sh來創建用戶,Kafka的SCRAM實現機制會把用戶認證信息保存在Zookeeper中。假設我要創建3個用戶admin, writer, reader分別用於實現Kafka broker間通訊、生產消息和消費消息。下面我們開始具體的配置:首先啟動Zookeeper,但不要啟動Kafka broker,ZK啟動成功後執行以下命令去創建3個用戶:

創建writer用戶,密碼是writer-pwd:

$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config ‘SCRAM-SHA-256=[iterations=8192,password=writer-pwd],SCRAM-SHA-512=[password=writer-pwd]‘ --entity-type users --entity-name writer

Completed Updating config for entity: user-principal ‘writer‘.

創建reader用戶,密碼是reader-pwd:

$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config ‘SCRAM-SHA-256=[password=reader-pwd],SCRAM-SHA-512=[password=reader-pwd]‘ --entity-type users --entity-name reader
Completed Updating config for entity: user-principal ‘reader‘.

創建admin用戶,密碼是admin:

$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config ‘SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]‘ --entity-type users --entity-name admin
Completed Updating config for entity: user-principal ‘admin‘.

3個測試用戶都創建好了,下面我們使用kafka-configs.sh查看一下writer用戶的信息:

$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --describe --entity-type users --entity-name writer
Configs for user-principal ‘writer‘ are SCRAM-SHA-512=salt=dTlvNzl4Y3BvZ3BuMmx5ODY0aWlzN2RsZg==,stored_key=Yc02SwxDkAKDQH01W98bkJLJcVO24q9vR5tS0nWaq5Jg2Z7DtzwrOt6J2Cr8Oib+dHq7TUIeG+NLiCAMnRlfVg==,server_key=Tu+iiosvJrDemOvjaDdzrh2GhLRg6r9zoTRDdvXZCMA7n7+D8DYsUz6Gnugcczsnz5Ut/jkkklEOXYRXIqOLCg==,iterations=4096,SCRAM-SHA-256=salt=Y2dpcnB4aTU5NWNwMDZjNmFvbHluMWJpOQ==,stored_key=GGMhtO1PhxZFpEHOaDiqA4AM16Ma19nky1UV/gFoC1s=,server_key=L0R1xkcULaWcGMu6TdtWi5mf5lu1VTS8imWvKPlM3i4=,iterations=8192

裏面包含了writer用戶加密算法SCRAM-SHA-256以及SCRAM-SHA-512對應的鹽值(salt)、ServerKey和StoreKey等,總之都是SCRAM機制的術語了。

4. Broker端配置

  和SASL PLAINTEXT一樣,我們依然需要為每個broker創建一個對應的jaas文件。註:由於本例中我的兩個broker實例都是在同一臺雲主機上啟動的,故我只創建一份jaas文件即可。實際使用中需要為每臺單獨的物理broker機器創建一份jaas文件。

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};

將上面內容保存成kafka-broker-jaas.conf文件。註意末尾的兩個分號,另外不要任何空白鍵。這裏配置admin用戶用於實現broker間的通訊。接下來是配置broker端的server.properties,配置項如下:

# 啟用ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 設置本例中admin為超級用戶
super.users=User:admin

# 啟用SCRAM機制,采用SCRAM-SHA-512算法

sasl.enabled.mechanisms=SCRAM-SHA-512

# 為broker間通訊開啟SCRAM機制,采用SCRAM-SHA-512算法

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

# broker間通訊使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT

# 配置listeners使用SASL_PLAINTEXT

listeners=SASL_PLAINTEXT://172.21.0.9:9092

# 配置advertised.listeners

advertised.listeners=SASL_PLAINTEXT://172.21.0.9:9092

  另一臺broker的配置和它基本類似,只是要使用不同的端口(比如9093)、broker.id和log.dirs。現在分別啟動兩個broker實例,如果一切配置正常,這兩個broker實例應該能夠正常啟動——註意引入jaas文件的方式,將-Djava.security.auth.login.config作為KAKFA_OPTS環境變量的方式進行設置。

$ KAFKA_OPTS=-Djava.security.auth.login.config=/xfs/bigdata/kafka_2.12-2.1.0/kafka-broker-jaas.conf bin/kafka-server-start.sh /xfs/bigdata/kafka_2.12-2.1.0/config/server1.properties
......

[2019-02-05 17:12:08,365] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-01-05 17:12:08,365] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-05 17:12:08,367] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

$ KAFKA_OPTS=-Djava.security.auth.login.config=/xfs/bigdata/kafka_2.12-2.1.0/kafka-broker-jaas.conf bin/kafka-server-start.sh /xfs/bigdata/kafka_2.12-2.1.0/config/server2.properties
......

[2019-02-05 17:22:12,970] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-05 17:22:12,970] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-05 17:22:12,971] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

現在創建測試topic,本例只創建一個單分區單副本的topic即可:

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.9:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".

5. Client端配置

  Okay,一切準備就緒了。我們先來使用console-producer腳本來嘗試發送消息:

$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test
>hello, world
[2019-02-05 18:17:19,005] ERROR Error when sending message to topic test with key: null, value: 12 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

消息發送失敗了,原因是沒有指定合法的認證用戶,現在我改用writer用戶發送——為此我需要創建一個名為producer.conf的配置文件給producer程序使用,其內容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer-pwd";

之後運行console-producer腳本:

$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello
[2019-02-05 18:25:40,272] WARN [Producer clientId=console-producer] Bootstrap broker 172.21.0.9:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

異常發生變化了,現在報的是“無法創建連接”的錯誤,這是因為writer用戶沒有對test topic的寫權限所致,故需要給writer用戶增加該topic的寫權限:

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:writer --operation Write --topic test
Adding ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *

Current ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *

再次執行console-producer腳本:

$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello
>Kafka

發送成功!

下面是配置consumer程序,和producer一樣,為reader用戶創建consumer.conf,同時設置對topic的讀權限:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader --operation Read --topic test
Adding ACLs for resource `Topic:LITERAL:test`:
User:reader has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *
User:reader has Allow permission for operations: Read from hosts: *

執行console-consumer腳本:

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.9:9092,172.21.0.9:9093 --topic test --from-beginning --consumer.config /opt/data/kafka2-2.1.0/consumer.conf --group test-group
[2019-02-05 18:55:57,272] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group

報錯提示reader用戶沒有訪問consumer group的權限,加之:

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader --operation Read --group test-group
Adding ACLs for resource `Group:LITERAL:test-group`:
User:reader has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Group:LITERAL:test-group`:
User:reader has Allow permission for operations: Read from hosts: *

再次執行console-consumer腳本:

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.9:9092,172.21.0.9:9093 --topic test --from-beginning --consumer.config /opt/data/kafka_2.12-2.1.0/consumer.conf --group test-group
hello
Kafka

6. 動態增加/刪除用戶

現在我們在不重啟broker的情況下增加新用戶writer1和reader1,分別為它們賦予test topic的寫權限和讀權限:

$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config ‘SCRAM-SHA-256=[iterations=8192,password=writer1-pwd],SCRAM-SHA-512=[password=writer1-pwd]‘ --entity-type users --entity-name writer1
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config ‘SCRAM-SHA-256=[password=reader1-pwd],SCRAM-SHA-512=[password=reader1-pwd]‘ --entity-type users --entity-name reader1


$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:writer1 --operation Write --topic test

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader1 --operation Read --topic test

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader1 --operation Read --group test-group1

同時刪除原來的用戶writer:

bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --delete-config ‘SCRAM-SHA-256‘ --entity-type users --entity-name writer
bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --delete-config ‘SCRAM-SHA-512‘ --entity-type users --entity-name writer

現在檢驗writer用戶不能寫入消息:

$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello by writer
[2019-02-06 09:30:54,492] ERROR [Producer clientId=console-producer] Connection to node -2 (172.21.0.9/172.21.0.9:9093) failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
[2019-02-06 09:30:54,492] ERROR Error when sending message to topic test with key: null, value: 15 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512
[2019-02-06 09:30:54,493] ERROR [Producer clientId=console-producer] Connection to node -1 (172.21.0.9/172.21.0.9:9092) failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)

最後修改producer.conf中的writer為writer1,驗證writer1用戶有權限生產消息:

$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello by writer1
>successful
>

至此,一個支持動態增加/刪除用戶的Kafka安全配置就做好了。

Kafka認證權限配置(動態添加用戶)