java編寫Producer(執行緒池,kafka)
阿新 • • 發佈:2019-02-09
1.將kafka帶的jar包匯入專案內
2
public class TestThreadPool { public static void main(String args[]) { //線上程池中建立2個執行緒 ExecutorService exec = Executors.newFixedThreadPool(2); //建立100個執行緒目標物件 for (int index = 0; index < 100; index++) { Runnable run = new Runner(index); //執行執行緒目標物件 exec.execute(run); } exec.shutdown(); } } //執行緒目標物件 class Runner implements Runnable { int index = 0; public static String topic = "test"; public Runner(int index) { this.index = index; } public void run() { long time = (long) (Math.random() * 1000); Producer producer = createProducer(); KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, "執行緒:" + Thread.currentThread().getName() + "(目標物件" + index + ")" + ":數字" + time); producer.send(keyedMessage); // System.out.println("執行緒:" + Thread.currentThread().getName() + "(目標物件" + index + ")" + ":數字" + time); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } private Producer createProducer() { Properties properties = new Properties(); properties.put("zookeeper.connect", "xx.xx.xx.xx:2181");//宣告zk ip+埠 properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "xx.xx.xx.xx:9092");//宣告kafka broker return new Producer<Integer, String>(new ProducerConfig(properties)); } }