1. 程式人生 > >kafka的流計算框架

kafka的流計算框架

需求:

producer: 傳送例如  aa-zz
consumer:收到zz
通過-切分得到後面的,如果沒有-就正常輸出

Processor

public class LogProcessor implements Processor<byte [],byte []> {
    ProcessorContext context ;
    //初始化
    public void init(ProcessorContext context) {
        //傳輸
        this.context = context;
    }
    //具體業務邏輯
    public void process(byte[] key, byte[] value) {
        //獲取資料
        String line = new String(value);
        //切分資料
        if (line.contains("-")){
            line = line.split("-")[1];
        }

        //輸出資料
        context.forward(key,line.getBytes());

    }
    //釋放資源
    public void close() {

    }
}

application

public class Application {
    public static void main(String[] args) {
        String onetopic = "t1";
        String twotopic = "t2";
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.232.132:9092,192.168.232.133:9092,192.168.232.134:9092");
        StreamsConfig s = new StreamsConfig(prop);
        //初始化拓撲
        Topology Builder = new Topology();
        Builder.addSource("source",onetopic).addProcessor("processor", new ProcessorSupplier() {
            public Processor<byte[],byte[]> get() {
                return new LogProcessor();
            }
        },"source").addSink("Sink",twotopic,"processor");

        //輸出
        KafkaStreams ka = new KafkaStreams(Builder,prop); //可以是Builder,s,我試過,好像過時了.....
        ka.start();
    }
}

kafka可以做流計算,但是不適合