1. 程式人生 > >Kafka 接收訊息 exactly once

Kafka 接收訊息 exactly once

– Start

下面的方法演示將 offset 儲存在資料庫中,和訊息處理放在同一事務中,真正實現 exactly once.

建立表

CREATE TABLE KAFKA_OFFSET
(
    TOPIC 		VARCHAR2(100),
    PARTITION	NUMBER(3, 0),
    OFFSET		NUMBER(18, 0) 
);
INSERT INTO KAFKA_OFFSET VALUES ('topic0', 0, 0);
COMMIT;


CREATE TABLE KAFKA_MESSAGE
(
    TOPIC 		VARCHAR2(100),
    PARTITION	NUMBER(3, 0),
    OFFSET		NUMBER(18, 0),
    KEY			VARCHAR2(100),
    MESSAGE		VARCHAR2(100)
);

接收訊息程式碼

package shangbo.kafka.example6;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class App {
	public static void main(String[] args) {
		ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
		Service service = context.getBean(Service.class);

		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("group.id", "ConsumerGroup3"); // 消費者組的標識
		props.put("enable.auto.commit", "false"); // 收到訊息後,手動提交 offset

		// 建立 Consumer, 從 topic0 接收資料
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

		// 手動設定查詢 offset
		List<PartitionOffset> partitionOffsets = service.queryPartitionOffset("topic0");

		consumer.assign(partitionOffsets.stream().map(p -> new TopicPartition("topic0", p.getPartition())).collect(Collectors.toList()));

		for (PartitionOffset partitionOffset : partitionOffsets) {
			TopicPartition tp = new TopicPartition("topic0", partitionOffset.getPartition());
			consumer.seek(tp, partitionOffset.getOffset());
		}

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			service.process(records);
		}
	}
}

package shangbo.kafka.example6;

import javax.sql.DataSource;

import org.apache.commons.dbcp.BasicDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@Configuration
@EnableTransactionManagement // 開啟事務管理
public class AppConfig {

	@Bean(destroyMethod = "close")
	public BasicDataSource dataSource() {
		BasicDataSource dataSource = new BasicDataSource();
		dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
		dataSource.setUrl("jdbc:oracle:thin:@localhost:1521:xe");
		dataSource.setUsername("hr");
		dataSource.setPassword("123456");

		return dataSource;
	}

	@Bean
	public DataSourceTransactionManager txManager(DataSource dataSource) {
		DataSourceTransactionManager txManager = new DataSourceTransactionManager();
		txManager.setDataSource(dataSource);

		return txManager;
	}

	@Bean
	public Service service(DataSource dataSource) {
		Service service = new ServiceImpl();
		service.setDataSource(dataSource);

		return service;
	}

}

package shangbo.kafka.example6;

import java.util.List;

import javax.sql.DataSource;

import org.apache.kafka.clients.consumer.ConsumerRecords;

public interface Service {
	List<PartitionOffset> queryPartitionOffset(String topic);
	void process(ConsumerRecords<String, String> records);
	void setDataSource(DataSource dataSource);
}

package shangbo.kafka.example6;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

import javax.sql.DataSource;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;

public class ServiceImpl implements Service {
	private JdbcTemplate jdbcTemplate;

	@Transactional(readOnly = true)
	public List<PartitionOffset> queryPartitionOffset(String topic) {
		String sql = "select * from kafka_offset where topic = ?";
		return jdbcTemplate.query(sql, new Object[] { topic }, new BeanPropertyRowMapper<PartitionOffset>(PartitionOffset.class));
	}

	@Transactional
	public void process(ConsumerRecords<String, String> records) {
		Set<TopicPartition> topicPartitions = records.partitions();
		for (TopicPartition tp : topicPartitions) {
			final List<ConsumerRecord<String, String>> recs = records.records(tp);

			// 儲存訊息
			String insertKafkaMsgSql = "insert into KAFKA_MESSAGE values (?, ?, ?, ?, ?)";
			jdbcTemplate.batchUpdate(insertKafkaMsgSql, new BatchPreparedStatementSetter() {
				public void setValues(PreparedStatement ps, int i) throws SQLException {
					ps.setString(1, recs.get(i).topic());
					ps.setInt(2, recs.get(i).partition());
					ps.setLong(3, recs.get(i).offset());
					ps.setString(4, recs.get(i).key());
					ps.setString(5, recs.get(i).value());
				}

				public int getBatchSize() {
					return recs.size();
				}
			});

			// 儲存 offset
			Long maxOffset = recs.stream().mapToLong(ConsumerRecord::offset).max().getAsLong();
			maxOffset += 1;
			String updateOffsetSql = "update kafka_offset set offset = ? where topic = ? and partition = ?";
			jdbcTemplate.update(updateOffsetSql, maxOffset, tp.topic(), tp.partition());
		}
	}

	//
	// Setter
	//
	public void setDataSource(DataSource dataSource) {
		jdbcTemplate = new JdbcTemplate(dataSource);
	}

}

package shangbo.kafka.example6;

public class PartitionOffset {
	private Integer partition;
	private Long offset;
	
	public Integer getPartition() {
		return partition;
	}
	public void setPartition(Integer partition) {
		this.partition = partition;
	}
	public Long getOffset() {
		return offset;
	}
	public void setOffset(Long offset) {
		this.offset = offset;
	}
}

– 更多參見:
– 聲 明:轉載請註明出處
– Last Edited on 2018-06-13
– Written by ShangBo on 2018-06-13
– End