Netty接收HTTP檔案上傳及檔案下載
這個處理器的原理是接收HttpObject物件,按照HttpRequest,HttpContent來做處理,檔案內容是在HttpContent訊息帶來的。
然後在HttpContent中一個chunk一個chunk讀,chunk大小可以在初始化HttpServerCodec時設定。將每個chunk交個httpDecoder複製一份,當讀到LastHttpContent物件時,表明上傳結束,可以將httpDecoder中快取的檔案通過HttpDataFactory寫到磁碟上,然後在刪除快取的HttpContent物件。
@Slf4j public class HttUploadHandler extends SimpleChannelInboundHandler<HttpObject> { public HttUploadHandler() { super(false); } private static final HttpDataFactory factory = new DefaultHttpDataFactory(true); private static final String FILE_UPLOAD = "/data/"; private static final String URI = "/upload"; private HttpPostRequestDecoder httpDecoder; HttpRequest request; @Override protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject httpObject) throws Exception { if (httpObject instanceof HttpRequest) { request = (HttpRequest) httpObject; if (request.uri().startsWith(URI) && request.method().equals(HttpMethod.POST)) { httpDecoder = new HttpPostRequestDecoder(factory, request); httpDecoder.setDiscardThreshold(0); } else { //傳遞給下一個Handler ctx.fireChannelRead(httpObject); } } if (httpObject instanceof HttpContent) { if (httpDecoder != null) { final HttpContent chunk = (HttpContent) httpObject; httpDecoder.offer(chunk); if (chunk instanceof LastHttpContent) { writeChunk(ctx); //關閉httpDecoder httpDecoder.destroy(); httpDecoder = null; } ReferenceCountUtil.release(httpObject); } else { ctx.fireChannelRead(httpObject); } } } private void writeChunk(ChannelHandlerContext ctx) throws IOException { while (httpDecoder.hasNext()) { InterfaceHttpData data = httpDecoder.next(); if (data != null && HttpDataType.FileUpload.equals(data.getHttpDataType())) { final FileUpload fileUpload = (FileUpload) data; final File file = new File(FILE_UPLOAD + fileUpload.getFilename()); log.info("upload file: {}", file); try (FileChannel inputChannel = new FileInputStream(fileUpload.getFile()).getChannel(); FileChannel outputChannel = new FileOutputStream(file).getChannel()) { outputChannel.transferFrom(inputChannel, 0, inputChannel.size()); ResponseUtil.response(ctx, request, new GeneralResponse(HttpResponseStatus.OK, "SUCCESS", null)); } } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.warn("{}", cause); ctx.channel().close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (httpDecoder != null) { httpDecoder.cleanFiles(); } } } 複製程式碼
檔案下載
參考官方Demo:ofollow,noindex">github.com/netty/netty…
做了改動:
- 為了更高效的傳輸大資料,例項中用到了ChunkedWriteHandler編碼器,它提供了以zero-memory-copy方式寫檔案。
- 通過ChannelProgressiveFutureListener對檔案下載過程進行監聽。
// 新增ChunkedHandler,主要作用是支援非同步傳送大的碼流(例如大檔案傳輸),但是不佔用過多的記憶體,防止發生java記憶體溢位錯誤 ch.pipeline().addLast(new ChunkedWriteHandler()); // 用於下載檔案 ch.pipeline().addLast(new HttpDownloadHandler()); 複製程式碼
@Slf4j public class HttpDownloadHandler extends SimpleChannelInboundHandler<FullHttpRequest> { public HttpDownloadHandler() { super(false); } private String filePath = "/data/body.csv"; @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { String uri = request.uri(); if (uri.startsWith("/download") && request.method().equals(HttpMethod.GET)) { GeneralResponse generalResponse = null; File file = new File(filePath); try { final RandomAccessFile raf = new RandomAccessFile(file, "r"); long fileLength = raf.length(); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream"); response.headers().add(HttpHeaderNames.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", file.getName())); ctx.write(response); ChannelFuture sendFileFuture = ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); sendFileFuture.addListener(new ChannelProgressiveFutureListener() { @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { log.info("file {} transfer complete.", file.getName()); raf.close(); } @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception { if (total < 0) { log.warn("file {} transfer progress: {}", file.getName(), progress); } else { log.debug("file {} transfer progress: {}/{}", file.getName(), progress, total); } } }); ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } catch (FileNotFoundException e) { log.warn("file {} not found", file.getPath()); generalResponse = new GeneralResponse(HttpResponseStatus.NOT_FOUND, String.format("file %s not found", file.getPath()), null); ResponseUtil.response(ctx, request, generalResponse); } catch (IOException e) { log.warn("file {} has a IOException: {}", file.getName(), e.getMessage()); generalResponse = new GeneralResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, String.format("讀取 file %s 發生異常", filePath), null); ResponseUtil.response(ctx, request, generalResponse); } } else { ctx.fireChannelRead(request); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { log.warn("{}", e); ctx.close(); } } 複製程式碼