JavaWeb項目架構之Kafka分布式日誌隊列
阿新 • • 發佈:2018-02-06
進行 strong tco serve ktr tex 模擬 print eat
架構、分布式、日誌隊列,標題自己都看著唬人,其實就是一個日誌收集的功能,只不過中間加了一個Kafka做消息隊列罷了。
kafka介紹
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。
特性
Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:
- 通過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
- 支持通過Kafka服務器和消費機集群來分區消息。
- 支持Hadoop並行數據加載。
主要功能
發布和訂閱消息流,這個功能類似於消息隊列,這也是kafka歸類為消息隊列框架的原因
以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流
可以再消息發布的時候進行處理
使用場景
在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能
構建實時的流數據處理程序來變換或處理數據流,數據處理功能
消息傳輸流程
相關術語介紹
- Broker
Kafka集群包含一個或多個服務器,這種服務器被稱為broker - Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處) - Partition
Partition是物理上的概念,每個Topic包含一個或多個Partition. - Producer
負責發布消息到Kafka broker - Consumer
消息消費者,向Kafka broker讀取消息的客戶端。 - Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)
Kafka安裝
環境
Linux、JDK、Zookeeper
下載二進制程序
wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
安裝
tar -zxvf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1
目錄說明
bin 啟動,停止等命令
config 配置文件
libs 類庫
參數說明
#########################參數解釋##############################
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
port=9092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.1.170 #這個參數默認是關閉的
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小後在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #設置zookeeper的連接端口、如果非集群配置一個地址即可
#########################參數解釋##############################
啟動kafka
啟動kafka之前要啟動相應的zookeeper集群、自行安裝,這裏不做說明。
#進入到kafka的bin目錄
./kafka-server-start.sh -daemon ../config/server.properties
Kafka集成
環境
spring-boot、elasticsearch、kafka
pom.xml引入:
<!-- kafka 消息隊列 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
生產者
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* 生產者
* 創建者 科幫網
* 創建時間 2018年2月4日
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
消費者
mport java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
/**
* 消費者
* 創建者 科幫網
* 創建時間 2018年2月4日
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
日誌監聽
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.itstyle.es.common.utils.JsonMapper;
import com.itstyle.es.log.entity.SysLogs;
import com.itstyle.es.log.repository.ElasticLogRepository;
/**
* 掃描監聽
* 創建者 科幫網
* 創建時間 2018年2月4日
*/
@Component
public class Listener {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ElasticLogRepository elasticLogRepository;
@KafkaListener(topics = {"itstyle"})
public void listen(ConsumerRecord<?, ?> record) {
logger.info("kafka的key: " + record.key());
logger.info("kafka的value: " + record.value());
if(record.key().equals("itstyle_log")){
try {
SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class);
logger.info("kafka保存日誌: " + log.getUsername());
elasticLogRepository.save(log);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
測試日誌傳輸
/**
* kafka 日誌隊列測試接口
*/
@GetMapping(value="kafkaLog")
public @ResponseBody String kafkaLog() {
SysLogs log = new SysLogs();
log.setUsername("紅薯");
log.setOperation("開源中國社區");
log.setMethod("com.itstyle.es.log.controller.kafkaLog()");
log.setIp("192.168.1.80");
log.setGmtCreate(new Timestamp(new Date().getTime()));
log.setExceptionDetail("開源中國社區");
log.setParams("{‘name‘:‘碼雲‘,‘type‘:‘開源‘}");
log.setDeviceType((short)1);
log.setPlatFrom((short)1);
log.setLogType((short)1);
log.setDeviceType((short)1);
log.setId((long)200000);
log.setUserId((long)1);
log.setTime((long)1);
//模擬日誌隊列實現
String json = JsonMapper.toJsonString(log);
kafkaTemplate.send("itstyle", "itstyle_log",json);
return "success";
}
Kafka與Redis
之前簡單的介紹過,JavaWeb項目架構之Redis分布式日誌隊列,有小夥伴們聊到, Redis PUB/SUB沒有任何可靠性保障,也不會持久化。當然了,原項目中僅僅是記錄日誌,並不是十分重要的信息,可以有一定程度上的丟失
Kafka與Redis PUB/SUB之間最大的區別在於Kafka是一個完整的分布式發布訂閱消息系統,而Redis PUB/SUB只是一個組件而已。
使用場景
- Redis PUB/SUB
消息持久性需求不高、吞吐量要求不高、可以忍受數據丟失 - Kafka
高可用、高吞吐、持久性、多樣化的消費處理模型
開源項目源碼(參考):https://gitee.com/52itstyle/spring-boot-elasticsearch
JavaWeb項目架構之Kafka分布式日誌隊列