1. 程式人生 > >java編寫Producer(執行緒池,kafka)

java編寫Producer(執行緒池,kafka)

1.將kafka帶的jar包匯入專案內

2

​public class TestThreadPool {
    public static void main(String args[]) {
        //線上程池中建立2個執行緒
        ExecutorService exec = Executors.newFixedThreadPool(2);
        //建立100個執行緒目標物件
        for (int index = 0; index < 100; index++) {
            Runnable run = new Runner(index);
            //執行執行緒目標物件
            exec.execute(run);
        }
        exec.shutdown();
    }
}

//執行緒目標物件
class Runner implements Runnable {
    int index = 0;
    public static String topic = "test";

    public Runner(int index) {
        this.index = index;
    }

    public void run() {
        long time = (long) (Math.random() * 1000);
        Producer producer = createProducer();
        KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, "執行緒:" + Thread.currentThread().getName() + "(目標物件" + index + ")" + ":數字" + time);
        producer.send(keyedMessage); //
        System.out.println("執行緒:" + Thread.currentThread().getName() + "(目標物件" + index + ")" + ":數字" + time);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private Producer createProducer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");//宣告zk ip+埠
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("metadata.broker.list", "xx.xx.xx.xx:9092");//宣告kafka broker
        return new Producer<Integer, String>(new ProducerConfig(properties));
    }
}

        ​