1. 程式人生 > >高性能存儲項目筆記-netty1

高性能存儲項目筆記-netty1

little project too 定義 pri 編程 inbound 應用 ddl

大四畢業準研一的項目,項目主要用於接收udp,tcp,dns等數據,進行分析存盤。存盤後用於數據挖掘試著找出有異常行為的僵屍網絡主機。底層網絡框架使用netty。

netty的簡介:

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。 也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。 “快速”和“簡單”並不用產生維護性或性能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各種二進制,文本協議,並經過相當精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性。

netty的maven:

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>

netty的性能測試:

項目測試時用700Mb/s(忘記是MB還是Mb了,應該是Mb)的數據跑了一晚上沒有掉包。(之前有掉包的情況出現,原因是由於路由器的最大幀長度為1500字節,而傳輸的測試數據幀長2000+字節,甚至有4000的,即巨幀)。用1GMb+/s(幾乎已經達到測試網線的最大速率)的速度跑過幾分鐘,沒有掉包。性能可以說是非常高了。

客戶端是由C++編寫,代碼這裏略。(需要註意的是C++傳遞的結構體時會進行數據對齊,java按字節讀取時補齊的字段會讀為0)

接受線程服務器端源碼:

  1 /*
  2  * To change this license header, choose License Headers in Project Properties.
  3  * To change this template file, choose Tools | Templates
  4  * and open the template in the editor.
  5  */
  6 package packetserver;
7 8 9 import commonclasses.RawData;//原始數據 10 import commonclasses.ServerProperties;//配置文件 11 import commonclasses.StatisticData;//統計數據 12 import io.netty.bootstrap.ServerBootstrap; 13 import io.netty.channel.ChannelInitializer; 14 import io.netty.channel.ChannelPipeline; 15 import io.netty.channel.socket.SocketChannel; 16 17 import java.nio.ByteOrder;//java和linux默認的小端大端順序不一樣,需要設置 18 import io.netty.handler.codec.ByteToMessageDecoder; 19 20 import io.netty.buffer.ByteBuf; 21 import io.netty.channel.ChannelFuture; 22 import io.netty.channel.ChannelHandlerContext; 23 import io.netty.channel.ChannelInboundHandlerAdapter; 24 import io.netty.channel.ChannelOption; 25 import io.netty.channel.EventLoopGroup; 26 import io.netty.channel.nio.NioEventLoopGroup; 27 import io.netty.channel.socket.nio.NioServerSocketChannel; 28 import io.netty.handler.codec.MessageToByteEncoder; 29 import java.net.InetAddress; 30 import java.net.NetworkInterface; 31 import java.util.Enumeration; 32 import java.util.List; 33 34 /** 35 * 36 * @author gaoxiang 37 */ 38 public class RawDataNetAgent implements Runnable { 39 40 static int port = ServerProperties.rawDataServerPort; 41 static String hostadd = ServerProperties.recevierServerIP; 42 RawDataServer s1 = new RawDataServer(); 43 44 RawDataAgent fileAgent; 45 private Thread workerThread; 46 47 RawDataNetAgent(RawDataAgent fileAgent) { 48 this.fileAgent = fileAgent; 49 50 workerThread = new Thread(this); 51 workerThread.start(); 52 } 53 54 @Override 55 public void run() { 56 try { 57 s1.bind(); 58 System.out.println("NetAgent ended..."); 59 } catch (Exception e) { 60 // TODO Auto-generated catch block 61 e.printStackTrace(); 62 } 63 } 64 65 class RawDataServerInitializer extends ChannelInitializer<SocketChannel> { 66 67 @Override 68 protected void initChannel(SocketChannel ch) throws Exception { 69 ChannelPipeline pipeline = ch.pipeline(); 70 pipeline.addLast(new RawDataServerDecoder()); 71 // pipeline.addLast(new ServerEncoder()); 72 pipeline.addLast(new RawDataServerHandler()); 73 74 } 75 } 76 77 class RawDataServerEncoder extends MessageToByteEncoder { 78 79 @Override 80 protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { 81 byte[] body = (byte[]) (msg); 82 int dataLength = body.length; 83 out.writeInt(dataLength); 84 out.writeBytes(body); 85 } 86 87 } 88 89 class RawDataServerDecoder extends ByteToMessageDecoder { 90 91 final int socketHeaderLength = 16; 92 final int rawHeaderLength = 74; 93 94 @Override 95 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 96 if (in.readableBytes() < socketHeaderLength) { 97 return; 98 } 99 in.markWriterIndex(); 100 ByteBuf buf = in.order(ByteOrder.LITTLE_ENDIAN); 101 102 int ssyn = buf.readInt(); 103 if (ssyn != 0xfae9dafc) { 104 in.readerIndex(in.readerIndex() - 3); 105 in.discardReadBytes(); 106 System.out.println("ssyn != 0xfae9dafc"); 107 return; 108 } 109 110 int totallength = buf.readInt(); 111 if (totallength > in.readableBytes() + 8) { 112 in.resetReaderIndex(); 113 return; 114 } 115 out.add(in.readBytes(totallength - socketHeaderLength + 8)); 116 in.discardReadBytes(); 117 118 } 119 120 } 121 122 class RawDataServerHandler extends ChannelInboundHandlerAdapter { 123 124 int num = 0; 125 126 @Override 127 public void channelActive(ChannelHandlerContext ctx) throws Exception { 128 System.out.println("channelActive work"); 129 } 130 131 @Override 132 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 133 cause.printStackTrace(); 134 ctx.close(); 135 } 136 137 public int verseInt(int i) { 138 int j; 139 j = (i >> 24) & 0xff | (i >> 8) & 0xff00 | (i << 8) & 0Xff0000 | (i << 24) & 0xff000000; 140 return j; 141 } 142 143 @Override 144 public void channelRead(ChannelHandlerContext ctx, Object _msg) throws Exception { 145 ByteBuf bytes = ((ByteBuf) _msg).order(ByteOrder.LITTLE_ENDIAN); 146 int type = bytes.readUnsignedShort(); 147 int code = bytes.readUnsignedShort(); 148 long capip = bytes.readUnsignedInt(); 149 bytes.discardReadBytes(); 150 if (type == 1000 && code == 1001) { 151 SoKeyRz msg = new SoKeyRz(bytes); 152 RawData rf = new RawData(); 153 rf.protocolNumber = (byte) msg.tuple5.protocol; 154 rf.sPort = (short) msg.tuple5.source; 155 rf.dPort = (short) msg.tuple5.dest; 156 rf.sIP = verseInt((int) msg.tuple5.saddr); 157 rf.dIP = verseInt((int) msg.tuple5.daddr); 158 rf.timeStamp = msg.t_s * 1000 + msg.t_ms / 1000;// Caution, just for 159 // test!!!!!!! 160 rf.payload = msg.data; 161 /** 162 * have been changed to fit mapdb 163 */ 164 Object[] object = new Object[]{ 165 rf.sIP, rf.dIP, rf.sPort, rf.dPort, rf.protocolNumber, rf.timeStamp, 166 }; 167 AddRawDataToQueue(rf); 168 169 } 170 bytes.release(); 171 } 172 173 public void AddRawDataToQueue(RawData rf) { 174 fileAgent.offer(rf); 175 } 176 } 177 178 class RawDataServer { 179 180 ChannelFuture f; 181 182 public void closeServer() { 183 f.channel().close(); 184 } 185 186 public void bind() throws Exception { 187 EventLoopGroup bossGroup = new NioEventLoopGroup(); 188 EventLoopGroup workerGroup = new NioEventLoopGroup(); 189 try { 190 ServerBootstrap b = new ServerBootstrap(); 191 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// .localAddress(inetHost, 192 // inetPort) 193 .childHandler(new RawDataServerInitializer()).option(ChannelOption.SO_BACKLOG, 2048) 194 .childOption(ChannelOption.SO_KEEPALIVE, true); 195 196 f = b.bind(RawDataNetAgent.hostadd, RawDataNetAgent.port).sync(); 197 f.channel().closeFuture().sync(); 198 } finally { 199 bossGroup.shutdownGracefully(); 200 workerGroup.shutdownGracefully(); 201 } 202 } 203 204 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 205 206 @Override 207 protected void initChannel(SocketChannel arg0) throws Exception { 208 System.out.println("server initChannel.."); 209 arg0.pipeline().addLast(new RawDataServerHandler()); 210 } 211 } 212 213 public String getAddr(String rcvName) { 214 Enumeration<NetworkInterface> netInterfaces = null; 215 try { 216 netInterfaces = NetworkInterface.getNetworkInterfaces(); 217 218 while (netInterfaces.hasMoreElements()) { 219 220 NetworkInterface ni = netInterfaces.nextElement(); 221 222 if (ni.getDisplayName().equals(rcvName)) { 223 Enumeration<InetAddress> ips = ni.getInetAddresses(); 224 ips.nextElement(); 225 return ips.nextElement().getHostAddress(); 226 } 227 } 228 } catch (Exception e) { 229 e.printStackTrace(); 230 } 231 return "localhost"; 232 233 } 234 235 } 236 237 class SoKeyRz {//包的格式定義,包名公司定的 238 239 long soid; 240 long cap_ip; 241 int cap_port; 242 long t_s; 243 long t_ms; 244 byte[] srcmac = new byte[6]; 245 byte[] dstmac = new byte[6]; 246 Tuple5 tuple5 = new Tuple5(); 247 char src_dep[] = new char[16]; 248 long datalen; 249 char reserve[] = new char[8]; 250 byte data[] = null; 251 252 public SoKeyRz(ByteBuf buf) { 253 soid = buf.readUnsignedInt(); 254 cap_ip = buf.readUnsignedInt(); 255 cap_port = buf.readUnsignedShort(); 256 t_s = buf.readUnsignedInt(); 257 t_ms = buf.readUnsignedInt(); 258 ///////////// buf.readCharSequence(); 259 for (int i = 0; i < 6; i++) { 260 srcmac[i] = buf.readByte(); 261 } 262 // buf.readCharSequence(6, srcmac); 263 for (int i = 0; i < 6; i++) { 264 dstmac[i] = buf.readByte(); 265 } 266 tuple5.protocol = buf.readInt(); 267 tuple5.source = buf.readUnsignedShort(); 268 tuple5.dest = buf.readUnsignedShort(); 269 tuple5.saddr = buf.readUnsignedInt(); 270 tuple5.daddr = buf.readUnsignedInt(); 271 for (int i = 0; i < 16; i++) { 272 src_dep[i] = (char) buf.readByte(); 273 } 274 datalen = buf.readUnsignedInt(); 275 for (int i = 0; i < 8; i++) { 276 reserve[i] = (char) buf.readByte(); 277 } 278 data = new byte[(int) datalen]; 279 buf.readBytes(data); 280 } 281 ; 282 283 } 284 285 class Tuple5 {//tcp五元組 286 287 int protocol; 288 int source; 289 int dest; 290 long saddr; 291 long daddr; 292 } 293 }



高性能存儲項目筆記-netty1