JAVA NIO SOCKET大檔案上傳伺服器
阿新 • • 發佈:2019-01-08
當前很多手機應用或者是網路應用都需要支援大檔案上傳功能,有些用FTP來實現上傳但是FTP存在許多的問題。比如FTP的安全問題還有不支援GZIP壓縮等問題。採用SOCKET來實現檔案上傳,很輕鬆就可以實現斷點再續和負載均衡,將上傳後的檔案直接儲存到APACHE等WEB伺服器的指定路徑下,便可以輕鬆的擁有一臺檔案伺服器。至於檔案的同步問題這個是另一個話題了不在次討論。本次上傳的程式碼去掉了複雜均衡方面的功能,只是本程式的V0.1版本,真正部署到應用中的支援負載均衡並改為了多執行緒下載。
簡單的說明一下,每次上傳時首先是傳送一個自定義格式的檔案頭,檔案頭包含檔名,檔案大小,檔案型別,在接收到後會生成一個檔案標識,改檔案標識是在伺服器資料夾裡生成的真實檔案的檔名,伺服器返回資料庫中的ID欄位和檔案當前位置給客戶端,如果斷點在續那麼客戶端會將ID與當前位置上傳上來,這時通過ID來得到檔案的性息,這裡一定不能用檔案在客戶端的位置來斷點定位,一定要到伺服器端的資料庫查詢,返回位置其實比較多餘,但是客戶端工程師要求這樣。
主程式
import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; public class NIOServer { /* 緩衝區大小 */ private int BLOCK = 4096; /* 接受資料緩衝區 */ private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK); /* 傳送資料緩衝區 */ private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK); private Selector selector; private RandomAccessFile raf = null; private Map<SocketChannel, Handler> channelMap = new HashMap<SocketChannel, Handler>(); private FileDBAdapter fileDB; public NIOServer(int port) throws IOException { // 開啟伺服器套接字通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 伺服器配置為非阻塞 serverSocketChannel.configureBlocking(false); // 檢索與此通道關聯的伺服器套接字 ServerSocket serverSocket = serverSocketChannel.socket(); // 進行服務的繫結 serverSocket.bind(new InetSocketAddress(port)); // 通過open()方法找到Selector selector = Selector.open(); // 註冊到selector,等待連線 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); fileDB = FileDBAdapter.getInstance(); System.out.println("Server Start----8888:"); } // 監聽 private void listen() throws IOException { while (true) { // 選擇一組鍵,並且相應的通道已經開啟 selector.select(); // 返回此選擇器的已選擇鍵集。 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); handleKey(selectionKey); } } } // 處理請求 private void handleKey(SelectionKey selectionKey) throws IOException { // 接受請求 ServerSocketChannel server = null; SocketChannel client = null; // 測試此鍵的通道是否已準備好接受新的套接字連線。 if (selectionKey.isAcceptable()) { // 返回為之建立此鍵的通道。 server = (ServerSocketChannel) selectionKey.channel(); // 接受到此通道套接字的連線。 // 此方法返回的套接字通道(如果有)將處於阻塞模式。 client = server.accept(); // 配置為非阻塞 client.configureBlocking(false); // 註冊到selector,等待連線 client.register(selector, SelectionKey.OP_READ); channelMap.put(client, new HandlerImpl()); } else if (selectionKey.isReadable() || selectionKey.isWritable()) { SocketChannel socketChannel = (SocketChannel) selectionKey .channel(); Handler handle = (HandlerImpl) channelMap.get(socketChannel); try { if(!handle.isHeader()) { handle.header(selectionKey); } else { handle.content(selectionKey); } } catch (Exception e) { e.printStackTrace(); } } } class HandlerImpl implements Handler { private SocketChannel client = null; private FileChannel fileChannel = null; private boolean isHeader = false; private long position = 0; private int id = 0; public void content(SelectionKey selectionKey) throws Exception { if(selectionKey.isReadable()) { // 返回為之建立此鍵的通道。 client = (SocketChannel) selectionKey.channel(); // 將緩衝區清空以備下次讀取 receivebuffer.clear(); // 讀取伺服器傳送來的資料到緩衝區中 int end = 0; end = client.read(receivebuffer); if(end == -1) { client.close(); selectionKey.cancel(); fileDB.updateSuccess(id); return; } receivebuffer.flip(); raf.write(receivebuffer.array()); position += end; fileDB.updatePosition(position, id); client.register(selector, SelectionKey.OP_READ); } else if(selectionKey.isWritable()) { } } @Override public void header(SelectionKey selectionKey) throws Exception { // 返回為之建立此鍵的通道。 client = (SocketChannel) selectionKey.channel(); // 將緩衝區清空以備下次讀取 receivebuffer.clear(); // 讀取伺服器傳送來的資料到緩衝區中 int end = 0; System.out.println("handler header"); end = client.read(receivebuffer); receivebuffer.flip(); int size = receivebuffer.getInt(); String header = new String(receivebuffer.array(), 4, size); System.out.println(header); String[] headers = header.split(":"); id = Integer.parseInt(headers[0]); String filename = headers[1]; String type = headers[2]; String fileMark = System.currentTimeMillis() + "." + type; long fileLength = Long.parseLong(headers[3]); position = Long.parseLong(headers[4]); FileEntity entity = null; if(!fileDB.isFile(id)) { entity = fileDB.queryByID(id); position = entity.getPosition(); } else { entity = new FileEntity(); entity.setMark(fileMark); entity.setFilename(filename); entity.setFileType(type); entity.setFileSize(fileLength); entity.setIsSuccess(0); entity.setDownURL("http://127.0.0.1/move/" + filename); id = fileDB.insert(entity); } raf = new RandomAccessFile("c:/" + entity.getMark(), "rw"); raf.seek(position); System.out.println(id); sendbuffer.clear(); sendbuffer.putInt(id); sendbuffer.putLong(position); sendbuffer.flip(); client.write(sendbuffer); fileChannel = raf.getChannel(); client.register(selector, SelectionKey.OP_READ); isHeader = true; } public boolean isHeader() { return isHeader; } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // TODO Auto-generated method stub int port = 8888; NIOServer server = new NIOServer(port); server.listen(); } }
資料庫訪問
import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class FileDBAdapter { private static FileDBAdapter db = null; private Connection conn = null; private FileDBAdapter() { try { Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); conn = DriverManager.getConnection( "jdbc:sqlserver://localhost:1433;databaseName=FILE", "sa", "123456"); conn.setAutoCommit(false); } catch (Exception e) { e.printStackTrace(); } } public static FileDBAdapter getInstance() { if (db == null) { db = new FileDBAdapter(); } return db; } @SuppressWarnings("finally") public int insert(FileEntity entity) { int id = 0; try { PreparedStatement ps = conn .prepareStatement("insert into FILEINFO values(?,?,?,?,?,?,?)", PreparedStatement.RETURN_GENERATED_KEYS); ps.setString(1, entity.getMark()); ps.setString(2, entity.getFilename()); ps.setString(3, entity.getFileType()); ps.setLong(4, entity.getFileSize()); ps.setInt(5, entity.getIsSuccess()); ps.setLong(6, entity.getPosition()); ps.setString(7, entity.getDownURL()); int flag = ps.executeUpdate(); if (flag == 1) { ResultSet rs = ps.getGeneratedKeys(); while(rs.next()) { id = rs.getInt(1); } } conn.commit(); } catch (SQLException e) { e.printStackTrace(); try { conn.rollback(); } catch (SQLException e1) { // e1.printStackTrace(); } } finally { return id; } } public void updatePosition(long position, int id) { try { System.out.println("update:" + position + ":" + id); PreparedStatement ps = conn .prepareStatement("update FILEINFO set _POSITION = ? where _ID = ?"); ps.setLong(1, position); ps.setInt(2, id); System.out.println("update"); ps.executeUpdate(); conn.commit(); } catch (SQLException e) { e.printStackTrace(); try { conn.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } } public void updateSuccess(int id) { try { PreparedStatement ps = conn .prepareStatement("update FILEINFO set _ISSUCCESS = 1 where _ID = ?"); ps.setInt(1, id); System.out.println("update"); ps.executeUpdate(); conn.commit(); } catch (SQLException e) { e.printStackTrace(); try { conn.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } } public FileEntity queryByID(int id) { ResultSet rs = null; FileEntity entity = new FileEntity(); try { PreparedStatement ps = conn .prepareStatement("select _ID, _FILE_MARK, _POSITION from FILEINFO where _ID = ?"); ps.setInt(1, id); rs = ps.executeQuery(); while (rs.next()) { entity.setId(rs.getInt("_ID")); entity.setMark(rs.getString("_FILE_MARK")); entity.setPosition(rs.getLong("_POSITION")); } rs.close(); } catch (SQLException e) { e.printStackTrace(); } return entity; } public boolean isFile(int id) { ResultSet rs = null; int count = 0; try { PreparedStatement ps = conn .prepareStatement("select count(_ID) from FILEINFO where _ID = ? and _ISSUCCESS != 1"); ps.setInt(1, id); rs = ps.executeQuery(); while (rs.next()) { count = rs.getInt(1); } rs.close(); } catch (SQLException e) { e.printStackTrace(); } if(count == 0) { return true; } else { return false; } } }
檔案實體
public class FileEntity {
private int id;
private String mark;
private String filename;
private String fileType;
private long fileSize;
private int isSuccess;
private long position;
private String downURL;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getMark() {
return mark;
}
public void setMark(String mark) {
this.mark = mark;
}
public String getFilename() {
return filename;
}
public void setFilename(String filename) {
this.filename = filename;
}
public String getFileType() {
return fileType;
}
public void setFileType(String fileType) {
this.fileType = fileType;
}
public long getFileSize() {
return fileSize;
}
public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}
public int getIsSuccess() {
return isSuccess;
}
public void setIsSuccess(int isSuccess) {
this.isSuccess = isSuccess;
}
public long getPosition() {
return position;
}
public void setPosition(long position) {
this.position = position;
}
public String getDownURL() {
return downURL;
}
public void setDownURL(String downURL) {
this.downURL = downURL;
}
}