1. 程式人生 > >netty實現一個簡單的服務端和客戶端通訊

netty實現一個簡單的服務端和客戶端通訊

netty java Nio網路通訊框架,能接收TCP,http,webSocket,UDP

elk包括:logStash,Elasticsearch,Kibana
LogStash:tcp監聽netty伺服器傳送的資訊,並把訊息處理好後轉發給ES
Elasticsearch:接受LogStash傳送來的資料,並進行分詞構建索引。
kibana:主要是從ES中獲取相關索引的資料進行檢索,分析。預設埠5601
順序:pc網頁 通過http--->Netty高效能伺服器-->LogStash資料收集器(過濾處理)-->Elasticsearch搜尋引擎(構建索引)-->kibana展示分析
手機app 通過tcp---->同上


netty 3個核心功能:
1.Extensible Event Model可以擴充套件的事件模型。
2.Universal communication api 普通的通訊API。
3.zero-copy-Capable Rich Byte Buffer 零拷貝


案例1.1
netty服務端:ServerBootstrap.java是netty的啟動輔助類
public class NettyServer{
public static void main(String[]args){
1.繫結兩個執行緒組分別用來處理客戶端通道的accept和讀寫事件
2.繫結服務端通道NioServerSocketChannel
3.給讀寫事件的執行緒通道 繫結handler 去真正處理讀寫
4.監聽埠
ServerBootstrap server = new ServerBootstrap();//netty啟動類
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childgroup = new NioEventLoopGroup();
server.group(parentGroup,childgroup);//parentGroup用來處理accept事件,childgroup用來處理通道的讀寫事件
server.channel(NioServerSocketChannel.class);//第2部繫結某個通道
server.childHandler(new ChannelInitializer<SocketChannel>(){//繫結handler,真正的處理,childHandler給子執行緒組繫結handler, ChannelInitializer是給通道初始化
@Override
protected void initChannel(SocketChannel ch) throws Exception{ //處理他的讀寫事件
//ch.pipeline().addLast(new DelimiterBasedFrameDecoder(   //解碼器,一定要加在SimpleServerHandler 的上面
//Integer.MAX_VALUE,Delimiters.lineDelimiter()[0])) ;//maxFrameLength表示這一貞最大的大小 ,delimiter分隔符 0就是 \r \n的組合或者-->Unpooled.wrappedBuffer(new byte[]{'\r','\n'})
//解碼器,接收的資料 進行解碼
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(//長度解碼器
Integer.MAX_VALUE,0,4,0,4)); //引數1這一貞最大長度,引數2 lengthFieldOffset長度偏移量,引數3 lengthFieldLength 傳遞的長度 獲取前面4個字元,引數4 lengthAdjustment 長度需要調節 0為不需要調節 1為增加一個位元組,引數5 initialBytesToStrip 需要跳過多少長度 跳過4個位元組,int是4個位元組

ch.pipeline().addLast(new SimpleServerHandler());
//下面是2個編碼器,向socket傳遞的資料("is ok")進行編碼,順序是倒過來執行的。因為是個雙向連結串列 ??
ch.pipeline().addLast(new LengthFieldPrepender(4,false)); //這個編碼器只能編碼byteBuf. 引數1 長度 4個位元組, 引數2 lengthIncludesLengthFieldLength 長度是否包含了這個長度 false不包含   
ch.pipeline().addLast(new StringEncoder()); //把傳過來的資料 轉換成byteBuf
}
});
ChannelFuture future = server.bind(8080).sync();//第4步 繫結埠
future.channel().closeFuture().sync();//當通道關閉了,就繼續往下走
}
}
public class SimpleServerHandler extends ChannelInboundHandlerAdapter{
//可以在這裡面寫一套類似springMVC的框架
//讓SimpleServerHandler不跟任何業務有關,可以封裝一套框架
@Override  //讀取客戶端通道的資料
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
//super.channelRead(ctx,msg);
if(msg instanceof ByteBuf){
System.out.println(((ByteBuf)msg).toString(Charset.defaultCharset()));
}

//業務邏輯程式碼處理框架。。。

//告訴客戶端,我已經讀到你的資料了
ctx.channel().writeAndFlush("is ok");
}
}
//使用下面語句測試上面程式碼。1.開啟上面main方法。2.執行一下命令
c:>telnet localhost 8080
隨便輸入,會發現eclipse控制檯會監聽到輸入的內容,有個問題 接收時一個字一個字接受,可以讓他一行一行接收


socket端,向上面的netty傳送資料
public class SocketClient{

public static void main(String[]args){
Socket socket = new Socket("loclhost",8080);//netty和socket通訊
DataOutputStream output = new DataOutputStream(socket.getOutputStream());//netty和socket通訊
output.writeInt(4);//寫入4 ,
output.writeBytes("ssssss\r\naaaaaaa");   //寫資料給netty伺服器
output.flush();
//socket接受netty傳過來的資料
DataInputStream input = new DataInputStream(socket.getInputStream());
int length = input.readInt(); //獲取傳來的資料 的長度
byte[] b = new byte[length];
input.readFully(b,0,length);
System.out.println(new String(b));
socket.close();
}
}


案例1.2:netty客戶端 向cs傳送資料
使用Bootstrap.java啟動netty
public class NettyClient{
public static vlid main(String[]args){
Bootstrap client = new Bootstrap();//netty的啟動類
//繫結執行緒組,處理讀寫和連結事件
EventLoopGroup group = new NioEventLoopGroup();
cilent.group(group);//第1步.繫結執行緒組//netty只有一個執行緒組管理,因為構造沒有accept實現?
//第2步 繫結客戶端通道
client.channel(NioSocketChannel.class);
//第3步 給NIoSocketChannel初始化handler,真正的處理讀寫事件
client.handler(new ChannelInitializer<NioSocketChannel>(){ //通道是NioSocketChannel
@Override
protected void initChannel(NioSocketChannel ch){
ch.pipeline().addLast(new SimpleClientHandler()); //找到他的管道 增加他的handler

ch.pipelien().addLast(new LengthFieldPrepender(4,false));//是個長度編碼器
ch.pipelien().addLast(new StringEchoder());//字串編碼器

}
});
//第4步 只要連結即可
ChannelFuture future = client.connect("loaclhost",8080).sync(); //同步返回sync()
//傳送資料給伺服器
String msg = "hello netty";//需要加字串編碼器
future.channel().writeAndFlush(msg);

future.channel().closeFuture.sync();
System.out.println("客戶端執行完畢");
}
}
public class SimpleClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
//接收伺服器端傳過來的資料
if(msg instanceof ByteBuf){
System.out.println( ((ByteBuf)msg).toString(Charset.defaultCharset()));
}
ctx.channel().close(); //把客戶端的通道關閉

}
}
測試:啟動NettyServer.java,再啟動NettyClient.java ,netty伺服器會接收到資訊


案例2 netty和spring整合:
@Configuration   //相當於把該類作為spring的xml配置檔案中的<beans>
@ComponentScan("bauble2")
public class SpringServer{
public static void main(){
//使用ApplicationContext構建spring容器 //ApplicationContext藉口沒有start方法所以使用AnnotationConfigApplicationContext定義deriving definition
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(SpringServer.class);
context.start();
//啟動之後 想要主執行緒掛到這裡,增加同步機制
synchronized (SpringServer.class){
try{
SpringServer.class.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
www.itjoin.org/course/findLatest
//跟netty整合, 可以在這裡面寫一套類似springMVC的框架
@Component //注入到spring容器裡面
public class InitNetty implements ApplicationListener<ContextRefreshedEvent>{ //ContextRefreshedEvent當整個容器重新整理的時候觸發
@Override
public void onApplicationEvent(ContextRefreshedEvent event){
new NettyThread().start();
}
}
//new 一個執行緒
public class NettyThread extends Thread{
@Override
public void run(){
//啟動netty
dostart();
}
private void dostart(){
1.繫結兩個執行緒組分別用來處理客戶端通道的accept和讀寫事件
2.繫結服務端通道NioServerSocketChannel
3.給讀寫事件的執行緒通道 繫結handler 去真正處理讀寫
4.監聽埠
try{
ServerBootstrap server = new ServerBootstrap();//netty啟動類
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childgroup = new NioEventLoopGroup();
server.group(parentGroup,childgroup);//parentGroup用來處理accept事件,childgroup用來處理通道的讀寫事件
server.channel(NioServerSocketChannel.class);//第2部繫結某個通道
server.childHandler(new ChannelInitializer<SocketChannel>(){//繫結handler,真正的處理,childHandler給子執行緒組繫結handler, ChannelInitializer是給通道初始化
@Override
protected void initChannel(SocketChannel ch) throws Exception{ //處理他的讀寫事件
//ch.pipeline().addLast(new DelimiterBasedFrameDecoder(   //解碼器,一定要加在SimpleServerHandler 的上面
//Integer.MAX_VALUE,Delimiters.lineDelimiter()[0])) ;//maxFrameLength表示這一貞最大的大小 ,delimiter分隔符 0就是 \r \n的組合或者-->Unpooled.wrappedBuffer(new byte[]{'\r','\n'})
//解碼器,接收的資料 進行解碼
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(//長度解碼器
Integer.MAX_VALUE,0,4,0,4)); //引數1這一貞最大長度,引數2 lengthFieldOffset長度偏移量,引數3 lengthFieldLength 傳遞的長度 獲取前面4個字元,引數4 lengthAdjustment 長度需要調節 0為不需要調節 1為增加一個位元組,引數5 initialBytesToStrip 需要跳過多少長度 跳過4個位元組,int是4個位元組

ch.pipeline().addLast(new SimpleServerHandler());
//下面是2個編碼器,向socket傳遞的資料("is ok")進行編碼,順序是倒過來執行的。因為是個雙向連結串列 ??
ch.pipeline().addLast(new LengthFieldPrepender(4,false)); //這個編碼器只能編碼byteBuf. 引數1 長度 4個位元組, 引數2 lengthIncludesLengthFieldLength 長度是否包含了這個長度 false不包含   
ch.pipeline().addLast(new StringEncoder()); //把傳過來的資料 轉換成byteBuf
}
});
ChannelFuture future = server.bind(8080).sync();//第4步 繫結埠
future.channel().closeFuture().sync();//當通道關閉了,就繼續往下走
}catch(InterruptedException e){
e.printStackTrace();
}
}
}

測試:啟動SpringServer.jave ,再 啟動NettyClient.java,會有資訊返回。 InitNetty 會在spring啟動的時候啟動,因為ContextRefreshedEvent監聽 spring的啟動

------

假如寫一套ElasticSearch底層分散式RPC,    或者Dubbo分散式RPC

需要寫netty RPC架構
netty客戶端非同步獲取相應結果到主執行緒     《伺服器架構實現   客戶端與服務端架構通訊》
netty做長連結的時候 需要注意哪些地方。
1。需要心跳檢測機制,保證連結的穩定。 
2.考慮重連,容易丟包。
3.採用連線池,netty自帶的連線池
netty底層實現原理,(netty執行緒模型---讀寫半包處理---記憶體池管理)
netty UerverBootstrap如何啟動和處理channel的Accept事件
NioEventLoop接收channel read事件剖析。如何處理netty的讀寫事件
如何處理讀寫半包