1. 程式人生 > >RabbitMQ四種Exchange型別之Direct (Erlang)

RabbitMQ四種Exchange型別之Direct (Erlang)

-module(mod_direct_receive).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-export([start_link/1]).
-include("common.hrl").

-record(state, {routing_key = <<"">>}).


start_link(RoutingKey) ->
	Server1 = lists:concat([?MODULE,erlang:binary_to_list(RoutingKey)]),
	Server2 = erlang:list_to_atom(Server1),
	gen_server:start_link({local,Server2}, ?MODULE, [RoutingKey], []).


init([RoutingKey]) ->
	start(RoutingKey),
    {ok, #state{routing_key=RoutingKey}}.

handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.


handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({'basic.consume_ok',_}, State) ->
	{noreply, State};
handle_info({#'basic.deliver'{},#amqp_msg{payload=Msg}}, State) ->
	io:format(" [routing_key = ~p] receive  messages is ~p~n",[State#state.routing_key,Msg]),
	{noreply, State};

handle_info(Info, State) ->
	io:format("[routing_key = ~p] unknown messages is ~p~n", [State#state.routing_key,Info]),
    {noreply, State}.


terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

start(RoutingKey) ->
	Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD},
	case amqp_connection:start(Params) of
		{ok,ConnectionPid} ->
			{ok, Channel} = amqp_connection:open_channel(ConnectionPid),
			%%生成佇列名稱
			Queue = lists:concat([fanout_queue,now_time()]),
			QueueName = erlang:list_to_binary(Queue),
			%%宣告佇列
			amqp_channel:call(Channel, #'queue.declare'{queue = QueueName,auto_delete=true}),
			%%宣告exchange
			amqp_channel:call(Channel, #'exchange.declare'{ auto_delete =true,exchange = <<"direct">>, type = ?EXCHANGE_TYPE_DIRECT}),
			%%佇列繫結到exchange
			amqp_channel:call(Channel, #'queue.bind'{queue = QueueName, exchange = <<"direct">>,routing_key = RoutingKey}),
			io:format(" [routing_key = ~p] Waiting for messages......~n",[RoutingKey]),
			amqp_channel:subscribe(Channel, #'basic.consume'{queue = QueueName,no_ack = true}, self());
		{error,Resaon} ->
			io:format("[routing_key = ~p] connection rabbit error: ~p~n", [RoutingKey,Resaon]),
			Resaon
	end.

now_time()->
	{A, B, _} = os:timestamp(),
    A * 1000000 + B.

生產者: