1. 程式人生 > >java 非阻塞通訊的例子

java 非阻塞通訊的例子

package 建立非阻塞的EchoClient;


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;


public class EchoClient1{
private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024);
private int port=8000;
private SocketChannel socketChannel;
private Selector selector;
private Charset charset=Charset.forName("GBK");
public EchoClient1(){
try {
socketChannel=SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
socketChannel.configureBlocking(false);
System.out.println("客戶端已經與伺服器建立連線!");
selector=Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void LocalRead() throws IOException{
BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=localReader.readLine())!=null){
ByteBuffer buffer = charset.encode(msg+"\r\n");
synchronized(sendBuffer){
sendBuffer.put(buffer);
}
if(msg.equals("bye")) break;
}
}
public void talk(){
SelectionKey key=null;
try {
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
while(selector.select()>0){
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isReadable()){
Receive(key);
}
if(key.isWritable())
Send(key);
}
}
} 
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
void Send(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized(sendBuffer){
sendBuffer.flip();
socketChannel.write(sendBuffer);
sendBuffer.compact();
}
}
void Receive(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String data = charset.decode(receiveBuffer).toString();
if((data.indexOf("\r\n")==-1)) return ;
String outputData = data.substring(0, data.indexOf("\n")+1);
System.out.println(outputData);

ByteBuffer buffer = charset.encode(outputData);
receiveBuffer.position(buffer.limit());
receiveBuffer.compact();
if(outputData.equals("echo:bye")){
key.cancel();
key.channel().close();
System.out.println("與伺服器斷開連線");
selector.close();
System.exit(0);
}
}
public static void main(String[] args) {
EchoClient1 client = new EchoClient1();
new Thread(new Runnable() {

@Override
public void run() {
// TODO Auto-generated method stub
try {
client.LocalRead();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
client.talk();
}
}

package 混合模式;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.net.*;


/*主執行緒負責接收和傳送資料(非阻塞)
 * 一個執行緒負責接受連線(阻塞)
 * */
public class EchoServer {
private Selector selector = null;
private ServerSocketChannel serverSocketChannel = null;
private int port = 8000;
private Charset charset = Charset.forName("GBK");
    private Object gate=new Object();
private SelectionKey key;
public EchoServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.socket().setReuseAddress(true);
System.out.println("伺服器已經啟動!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


public void accept() {
try {
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(
"接受來自:" + socketChannel.socket().getInetAddress() + "埠:" + socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized (gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}


public void service() {
while (true) {
try {
synchronized (gate) {}
//就算主執行緒先進來,主執行緒將在這裡進行阻塞,當接受執行緒進入同步程式碼塊時,將喚醒selector
//當接受執行緒在執行登記事件時,主線將在同步程式碼塊中阻塞,帶接受執行緒完成阻塞事件
int n = selector.select();
if (n == 0)
continue;
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(key!=null){
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}

}
} 
}


public void receive(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(32);
socketChannel.read(readBuffer);
readBuffer.flip();
buffer.limit(buffer.capacity());
buffer.put(readBuffer);
}


public void send(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
String data = charset.decode(buffer).toString();
if (data.indexOf("\r\n") == -1)
return;
String outputData = data.substring(0, data.indexOf("\n") + 1);
System.out.println(outputData);
ByteBuffer reply = charset.encode("echo:" + outputData);
while(reply.hasRemaining())
socketChannel.write(reply);
ByteBuffer del = charset.encode(outputData);
buffer.position(del.limit());
buffer.compact();

if(outputData.equals("bye\r\n"))
{
if(key!=null)
{
key.cancel();
key.channel().close();
System.out.println("關閉與客戶端的連線!");
}
}
}


public static void main(String[] args) {
// TODO Auto-generated method stub
EchoServer server = new EchoServer();
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
            server.accept();
}
}).start();
server.service();
}

}

package non_blocking;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;;


public class EchoServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private int port=8000;
private Charset charset=Charset.forName("GBK");
public EchoServer(){
try {
   selector= Selector.open();
serverSocketChannel= serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服務端已經啟動!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void service(){
SelectionKey key=null;
try {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(selector.select()>0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isAcceptable()){
ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept();
System.out.println("接受來自"+socketChannel.socket().getInetAddress()+"埠:"+socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer=ByteBuffer.allocate(1024);
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if(key.isReadable()) { receive(key);}
if(key.isWritable()) { send(key);}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(key!=null){
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
public void receive(SelectionKey key) throws IOException{
ByteBuffer buffer = (ByteBuffer) key.attachment();
ByteBuffer buff = ByteBuffer.allocate(32);
SocketChannel socketChannel= (SocketChannel) key.channel();
socketChannel.read(buff);
buff.flip();
buffer.limit(buffer.capacity());
buffer.put(buff);
}
public void send(SelectionKey key) throws IOException{
ByteBuffer buffer= (ByteBuffer) key.attachment();
SocketChannel socketChannel= (SocketChannel) key.channel();
buffer.flip();
String data = charset.decode(buffer).toString();
if(data.indexOf("\r\n")==-1) return;
String outputData=data.substring(0, data.indexOf("\n")+1);
System.out.println(outputData);
ByteBuffer echoBuffer=charset.encode("echo:"+outputData);
while(echoBuffer.hasRemaining())
socketChannel.write(echoBuffer);
ByteBuffer encode = charset.encode(outputData);
buffer.position(encode.limit());
buffer.compact();
if(outputData.equals("bye\r\n")){
key.cancel();
socketChannel.close();
System.out.println("關閉伺服器之間的連線");
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new EchoServer().service();
}


}