1. 程式人生 > >RabbitMQ - Work queues

RabbitMQ - Work queues

print () rabbitmq exit con consumer def todo reat

Producer:

    private static void newTask(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE, 
true, false, false, null); String message = getMessage(args); channel.basicPublish("", TASK_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println("[x] send ‘" + message + "‘"); } catch (Exception e) {
// TODO: handle exception } } private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }

build.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
javac -cp %CP% Sender.java -d .

run.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_producer.Sender hello,world
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_producer.Sender hello,world...

  

Consumer:

private static void taskWorker() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        try {
            channel.queueDeclare(TASK_QUEUE, true, false, false, null);
            System.out.println("[*] Waiting for message, to exit press CTRL+C");
            channel.basicQos(1);

            final Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("[x] Receive message ‘" + message + "‘");
                    try {
                        doWork(message);
                    } finally {
                        System.out.print("[x] Done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }

            };
            channel.basicConsume(TASK_QUEUE, false, consumer);
        } catch (Exception e) {
            // TODO: handle exception
        }

        // System.in.read();

        // channel.close();
        // connection.close();
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == ‘.‘) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

build.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
javac -cp %CP% Receiver.java -d .

run.bat

set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_consumer.Receiver > log.txt
set CP=.;amqp-client-5.0.0.jar;slf4j-api-1.7.25.jar;
java -cp %CP% rm_consumer.Receiver

  

RabbitMQ - Work queues