Java使用nanomsg消息中間件
阿新 • • 發佈:2018-06-18
com except 設置 jna cat tac pri bus .com 一,引用的包
linux:在類路徑下創建目錄 linux-x86-64,並將libnanomsg.so文件放入其下
<dependency>
<groupId>jnanomsg</groupId>
<artifactId>jnanomsg</artifactId>
<version>0.4.3</version>
</dependency>
二,nanomsg的使用,需要針對不同的平臺編譯並得到動態庫,然後引入項目。當前自己在window10-x86-64 和 linux-x86-64的環境下編譯了兩個動態庫。(本次使用的是nanomsg5.1.0)
windows:直接將nanomsg.dll和nanomsg.lib置於類路徑下
下載路徑:https://pan.baidu.com/s/18-wE_QMeYDkwi4kYQfrAaw
三,借用別人的代碼
1,PAIR 模式
public class Pair { private static String url = "tcp://127.0.0.1:7789"; public static void main(String[] args) { node0(); node1(); } private static void node0() { PairSocket socket = new PairSocket(); socket.connect(url); send(socket); recv(socket, "node0"); } private static void node1() { PairSocket socket = new PairSocket(); socket.bind(url); send(socket); recv(socket, "node1"); } private static void recv(final PairSocket socket, final String nodeName) { socket.setRecvTimeout(2000); // 設置執行recv的超時時間 new Thread(new Runnable() { public void run() { while (true) { try { System.out.println(nodeName + ":" + socket.recvString()); // 阻塞socket,直到超時或者有響應 Thread.sleep(1000); } catch (IOException e) { // 忽略超時 e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } private static void send(final PairSocket socket) { socket.setSendTimeout(1100); // 設置執行send的超時時間 new Thread(new Runnable() { public void run() { while (true) { try { socket.send("hello"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }).start(); } }
2,Pipeline模式
public class Pipeline { private static String url = "tcp://127.0.0.1:7789"; public static void main(String[] args) { node1(); node0(); } private static void node1() { final PullSocket socket = new PullSocket(); socket.bind(url); new Thread(new Runnable() { public void run() { while (true) { try { System.out.println(socket.recvString()); // 阻塞socket,直到超時或者有響應 Thread.sleep(1000); } catch (IOException e) { // 忽略超時 // e.printStackTrace(); } catch (InterruptedException e) { // e.printStackTrace(); } } } }).start(); } private static void node0() { final PushSocket socket = new PushSocket(); socket.connect(url); socket.send("hello"); } }
3,ReqRep模式
public class ReqRep {
private static String url = "tcp://127.0.0.1:7789";
public static void main(String[] args) {
node1();
node0();
}
private static void node1() {
final RepSocket socket = new RepSocket();
socket.bind(url);
new Thread(new Runnable() {
public void run() {
while (true) {
try {
System.out.println( "node1:" + socket.recvString()); // 阻塞socket,直到超時或者有響應
Thread.sleep(1000);
socket.send("world");
} catch (IOException e) { // 忽略超時
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
private static void node0() {
final ReqSocket socket = new ReqSocket();
socket.connect(url);
new Thread(new Runnable() {
public void run() {
while (true) {
try {
socket.send("hello");
Thread.sleep(1000);
System.out.println( "node0:" + socket.recvString()); // 阻塞socket,直到超時或者有響應
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
// e.printStackTrace();
}
}
}
}).start();
}
}
4,pubsub模式
public class PubSub {
private static String url = "tcp://127.0.0.1:7789";
public static void main(String[] args) {
service();
client0();
client1();
client2();
}
private static void client0() {
client("client0");
}
private static void client1() {
client("client1");
}
private static void client2() {
client("client2");
}
private static void client(final String name) {
final SubSocket socket = new SubSocket();
socket.connect(url);
socket.subscribe("test"); // jnanomsg中 頻道的匹配是匹配recv為^test的消息
new Thread(new Runnable() {
public void run() {
while (true) {
try{
System.out.println(name + ":" + socket.recvString());
}catch (IOException e) { // 忽略超時
//e.printStackTrace();
}
}
}
}).start();
}
private static void service() {
final PubSocket socket = new PubSocket();
socket.bind(url);
new Thread(new Runnable() {
public void run() {
while (true) {
try{
socket.send("test1 msg");
Thread.sleep(2000);
}catch (IOException e) { // 忽略超時
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
5,bus 模式
public class Bus {
private static String url0 = "tcp://127.0.0.1:7780";
private static String url1 = "tcp://127.0.0.1:7781";
private static String url2 = "tcp://127.0.0.1:7782";
private static String url3 = "tcp://127.0.0.1:7783";
public static void main(String[] args) {
BusSocket s0 = node("node0", url0, new String[]{url1, url2, url3});
BusSocket s1 = node("node1", url1, new String[]{url2, url3});
BusSocket s2 = node("node2", url2, new String[]{url3});
BusSocket s3 = node("node3", url3, new String[]{});
s0.send("client0 send a");
s1.send("client1 send a");
s2.send("client2 send a");
s3.send("client3 send a");
}
private static BusSocket node(final String name, String self, String[] other) {
final BusSocket socket = new BusSocket();
socket.bind(self);
for (String s : other){
socket.connect(s);
}
new Thread(new Runnable() {
public void run() {
while (true) {
try {
System.out.println(name + ":"+ socket.recvString()); // 阻塞socket,直到超時或者有響應
Thread.sleep(1);
} catch (IOException e) { // 忽略超時
// e.printStackTrace();
} catch (InterruptedException e) {
// e.printStackTrace();
}
}
}
}).start();
return socket;
// socket.connect();
}
}
Java使用nanomsg消息中間件