1. 程式人生 > >JAVA NIO SOCKET大檔案上傳伺服器

JAVA NIO SOCKET大檔案上傳伺服器

        當前很多手機應用或者是網路應用都需要支援大檔案上傳功能,有些用FTP來實現上傳但是FTP存在許多的問題。比如FTP的安全問題還有不支援GZIP壓縮等問題。採用SOCKET來實現檔案上傳,很輕鬆就可以實現斷點再續和負載均衡,將上傳後的檔案直接儲存到APACHE等WEB伺服器的指定路徑下,便可以輕鬆的擁有一臺檔案伺服器。至於檔案的同步問題這個是另一個話題了不在次討論。本次上傳的程式碼去掉了複雜均衡方面的功能,只是本程式的V0.1版本,真正部署到應用中的支援負載均衡並改為了多執行緒下載。

        簡單的說明一下,每次上傳時首先是傳送一個自定義格式的檔案頭,檔案頭包含檔名,檔案大小,檔案型別,在接收到後會生成一個檔案標識,改檔案標識是在伺服器資料夾裡生成的真實檔案的檔名,伺服器返回資料庫中的ID欄位和檔案當前位置給客戶端,如果斷點在續那麼客戶端會將ID與當前位置上傳上來,這時通過ID來得到檔案的性息,這裡一定不能用檔案在客戶端的位置來斷點定位,一定要到伺服器端的資料庫查詢,返回位置其實比較多餘,但是客戶端工程師要求這樣。

主程式

import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class NIOServer {


	/* 緩衝區大小 */
	private int BLOCK = 4096;
	/* 接受資料緩衝區 */
	private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
	/* 傳送資料緩衝區 */
	private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);

	private Selector selector;

	private RandomAccessFile raf = null;

	private Map<SocketChannel, Handler> channelMap = new HashMap<SocketChannel, Handler>();
	
	private FileDBAdapter fileDB;

	public NIOServer(int port) throws IOException {
		// 開啟伺服器套接字通道
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		// 伺服器配置為非阻塞
		serverSocketChannel.configureBlocking(false);
		// 檢索與此通道關聯的伺服器套接字
		ServerSocket serverSocket = serverSocketChannel.socket();
		// 進行服務的繫結
		serverSocket.bind(new InetSocketAddress(port));
		// 通過open()方法找到Selector
		selector = Selector.open();
		// 註冊到selector,等待連線
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		
		fileDB = FileDBAdapter.getInstance();
		
		System.out.println("Server Start----8888:");
	}

	// 監聽
	private void listen() throws IOException {
		while (true) {
			// 選擇一組鍵,並且相應的通道已經開啟
			selector.select();

			// 返回此選擇器的已選擇鍵集。
			Set<SelectionKey> selectionKeys = selector.selectedKeys();

			Iterator<SelectionKey> iterator = selectionKeys.iterator();
			while (iterator.hasNext()) {
				SelectionKey selectionKey = iterator.next();
				iterator.remove();
				handleKey(selectionKey);
			}
		}
	}

	// 處理請求
	private void handleKey(SelectionKey selectionKey) throws IOException {
		// 接受請求
		ServerSocketChannel server = null;
		SocketChannel client = null;

		// 測試此鍵的通道是否已準備好接受新的套接字連線。
		if (selectionKey.isAcceptable()) {
			// 返回為之建立此鍵的通道。
			server = (ServerSocketChannel) selectionKey.channel();
			// 接受到此通道套接字的連線。
			// 此方法返回的套接字通道(如果有)將處於阻塞模式。
			client = server.accept();
			// 配置為非阻塞
			client.configureBlocking(false);

			// 註冊到selector,等待連線
			client.register(selector, SelectionKey.OP_READ);
			
			channelMap.put(client, new HandlerImpl());
		} else if (selectionKey.isReadable() || selectionKey.isWritable()) {
			SocketChannel socketChannel = (SocketChannel) selectionKey
					.channel();
			Handler handle = (HandlerImpl) channelMap.get(socketChannel);
			try {
				
				if(!handle.isHeader()) {
					handle.header(selectionKey);
				} else {
					handle.content(selectionKey);
				}
			} catch (Exception e) {

				e.printStackTrace();
			}
		}

	}

	class HandlerImpl implements Handler {
		
		private SocketChannel client = null;
		private FileChannel fileChannel = null;
		private boolean isHeader = false;
		private long position = 0;
		private int id = 0;
		public void content(SelectionKey selectionKey) throws Exception {
			if(selectionKey.isReadable()) {
				// 返回為之建立此鍵的通道。
				client = (SocketChannel) selectionKey.channel();
				// 將緩衝區清空以備下次讀取
				receivebuffer.clear();
				// 讀取伺服器傳送來的資料到緩衝區中
				int end = 0;
	
				end = client.read(receivebuffer);
				if(end == -1) {
					client.close();
					selectionKey.cancel();
					fileDB.updateSuccess(id);
					return;
				}
				receivebuffer.flip();
				raf.write(receivebuffer.array());
				position += end;
			
				fileDB.updatePosition(position, id);	
				client.register(selector, SelectionKey.OP_READ);
			} else if(selectionKey.isWritable()) {
				
			}

		}

		@Override
		public void header(SelectionKey selectionKey) throws Exception {
			// 返回為之建立此鍵的通道。
			client = (SocketChannel) selectionKey.channel();
			// 將緩衝區清空以備下次讀取
			receivebuffer.clear();
			// 讀取伺服器傳送來的資料到緩衝區中
			int end = 0;
			System.out.println("handler header");
			end = client.read(receivebuffer);

			receivebuffer.flip();
			
			int size = receivebuffer.getInt();
			
			String header = new String(receivebuffer.array(), 4, size);
			System.out.println(header);
			
			String[] headers = header.split(":");
			id = Integer.parseInt(headers[0]);
			
			String filename = headers[1];
			String type = headers[2];
			String fileMark = System.currentTimeMillis() + "." + type;
			long fileLength = Long.parseLong(headers[3]);
			position = Long.parseLong(headers[4]);
			
			
			FileEntity entity = null;
			
			if(!fileDB.isFile(id)) {
				entity = fileDB.queryByID(id);
				position = entity.getPosition();
			} else {
				entity = new FileEntity();
				entity.setMark(fileMark);
				entity.setFilename(filename);
				entity.setFileType(type);
				entity.setFileSize(fileLength);
				entity.setIsSuccess(0);
				entity.setDownURL("http://127.0.0.1/move/" + filename);
				
				id = fileDB.insert(entity);
				
			}
			raf = new RandomAccessFile("c:/" + entity.getMark(), "rw");
			raf.seek(position);
			System.out.println(id);
			sendbuffer.clear();
			sendbuffer.putInt(id);
			sendbuffer.putLong(position);
			sendbuffer.flip();
			client.write(sendbuffer);
			fileChannel = raf.getChannel();
			client.register(selector, SelectionKey.OP_READ);
			isHeader = true;
		}

		public boolean isHeader() {
			return isHeader;
		}

	}

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		int port = 8888;
		NIOServer server = new NIOServer(port);
		server.listen();
	}
}

資料庫訪問

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class FileDBAdapter {

	private static FileDBAdapter db = null;

	private Connection conn = null;

	private FileDBAdapter() {
		try {
			Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
			conn = DriverManager.getConnection(
					"jdbc:sqlserver://localhost:1433;databaseName=FILE", "sa",
					"123456");
			conn.setAutoCommit(false);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static FileDBAdapter getInstance() {
		if (db == null) {
			db = new FileDBAdapter();
		}
		return db;
	}

	@SuppressWarnings("finally")
	public int insert(FileEntity entity) {
		int id = 0;
		try {
			PreparedStatement ps = conn
					.prepareStatement("insert into FILEINFO values(?,?,?,?,?,?,?)", PreparedStatement.RETURN_GENERATED_KEYS);
			ps.setString(1, entity.getMark());
			ps.setString(2, entity.getFilename());
			ps.setString(3, entity.getFileType());
			ps.setLong(4, entity.getFileSize());
			ps.setInt(5, entity.getIsSuccess());
			ps.setLong(6, entity.getPosition());
			ps.setString(7, entity.getDownURL());
			int flag = ps.executeUpdate();
		
			if (flag == 1) {
				ResultSet rs = ps.getGeneratedKeys();
				while(rs.next()) {
					id = rs.getInt(1);
				}
			}
			conn.commit();
		} catch (SQLException e) {
			e.printStackTrace();
			try {
				conn.rollback();
			} catch (SQLException e1) {
				// e1.printStackTrace();
			}
		} finally {
			return id;
		}
	}

	public void updatePosition(long position, int id) {
		try {
			System.out.println("update:" + position + ":" + id);
			PreparedStatement ps = conn
					.prepareStatement("update FILEINFO set _POSITION = ? where _ID = ?");
			ps.setLong(1, position);
			ps.setInt(2, id);
			System.out.println("update");
			ps.executeUpdate();
			conn.commit();
		} catch (SQLException e) {
			e.printStackTrace();
			try {
				conn.rollback();
			} catch (SQLException e1) {
				e1.printStackTrace();
			}
		}
	}
	
	public void updateSuccess(int id) {
		try {
			
			PreparedStatement ps = conn
					.prepareStatement("update FILEINFO set _ISSUCCESS = 1 where _ID = ?");
			ps.setInt(1, id);
			System.out.println("update");
			ps.executeUpdate();
			conn.commit();
		} catch (SQLException e) {
			e.printStackTrace();
			try {
				conn.rollback();
			} catch (SQLException e1) {
				e1.printStackTrace();
			}
		}
	}

	public FileEntity queryByID(int id) {
		ResultSet rs = null;
		FileEntity entity = new FileEntity();
		
		try {
			PreparedStatement ps = conn
					.prepareStatement("select _ID, _FILE_MARK, _POSITION from FILEINFO where _ID = ?");
			ps.setInt(1, id);
			rs = ps.executeQuery();
			while (rs.next()) {
				entity.setId(rs.getInt("_ID"));
				entity.setMark(rs.getString("_FILE_MARK"));
				entity.setPosition(rs.getLong("_POSITION"));
			}
			rs.close();
		} catch (SQLException e) {
			e.printStackTrace();
		}
		return entity;
	}
	
	public boolean isFile(int id) {
		ResultSet rs = null;
		int count = 0;
		try {
			PreparedStatement ps = conn
					.prepareStatement("select count(_ID) from FILEINFO where _ID = ? and _ISSUCCESS != 1");
			ps.setInt(1, id);
			rs = ps.executeQuery();
			while (rs.next()) {
				count = rs.getInt(1);
			}
			rs.close();
		} catch (SQLException e) {
			e.printStackTrace();
		}
		if(count == 0) {
			return true;
		} else {
			return false;
		}
	}

}

檔案實體
public class FileEntity {

	private int id;
	
	private String mark;
	
	private String filename;
	
	private String fileType;
	
	private long fileSize;
	
	private int isSuccess;
	
	private long position;
	
	private String downURL;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getMark() {
		return mark;
	}

	public void setMark(String mark) {
		this.mark = mark;
	}

	public String getFilename() {
		return filename;
	}

	public void setFilename(String filename) {
		this.filename = filename;
	}

	public String getFileType() {
		return fileType;
	}

	public void setFileType(String fileType) {
		this.fileType = fileType;
	}

	public long getFileSize() {
		return fileSize;
	}

	public void setFileSize(long fileSize) {
		this.fileSize = fileSize;
	}

	public int getIsSuccess() {
		return isSuccess;
	}

	public void setIsSuccess(int isSuccess) {
		this.isSuccess = isSuccess;
	}

	public long getPosition() {
		return position;
	}

	public void setPosition(long position) {
		this.position = position;
	}

	public String getDownURL() {
		return downURL;
	}

	public void setDownURL(String downURL) {
		this.downURL = downURL;
	} 

}