1. 程式人生 > >kafka-streams進行簡單的資料清洗

kafka-streams進行簡單的資料清洗

package com.terry.kafkastream;

	import org.apache.kafka.streams.KafkaStreams;
	import org.apache.kafka.streams.StreamsConfig;
	import org.apache.kafka.streams.Topology;
	import org.apache.kafka.streams.processor.Processor;
	import org.apache.kafka.streams.processor.ProcessorSupplier;

	import java.util.Properties;

	/**
	 * 需求:對資料進行清洗操作
	 *
	 * 思路:terry-henshuai 把-清洗掉
	 */
	public class Application {
	    public static void a(String[] args) {
	        //1、定義主題 傳送到另外一個主題 資料清洗
	        String oneTopic = "t1";
	        String twoTopic = "t2";

	        //2、設定屬性
	        Properties properties = new Properties();
	        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
	        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata:9092");

	        //3、例項物件
	        StreamsConfig streamsConfig = new StreamsConfig(properties);

	        //4、流計算
	        Topology topology = new Topology();

	        //5、定義kafka元件資料來源
	        topology.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {
	            @Override
	            public Processor<byte[], byte[]> get() {
	                return new LogProcessor();
	            }
	        },"Source").addSink("Sink",twoTopic,"Processor");

	        //6、例項化
	        KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
	        kafkaStreams.start();


	    }

	}

 

	package com.terry.kafkastream;

	import org.apache.kafka.streams.processor.Processor;
	import org.apache.kafka.streams.processor.ProcessorContext;

	/**
	 * 資料清洗
	 */
	public class LogProcessor  implements Processor<byte[], byte[]> {

	    private ProcessorContext processorContext;

	    @Override
	    public void init(ProcessorContext processorContext) {
	        //傳輸
	        this.processorContext=processorContext;

	    }

	    @Override
	    public void process(byte[] key, byte[] value) {
	        //1、拿到訊息資料,專程字串
	        String s = new String(value);

	        //2、如果包含-,則取出
	        if(s.contains("-")){
	            String[] split = s.split("-");
	            s = split[1];
	        }

	        processorContext.forward(key,s.getBytes());

	    }


	    @Override
	    public void close() {

	    }
	}