1. 程式人生 > >分散式多執行緒同步實現

分散式多執行緒同步實現

簡介:多執行緒請求同一個資源,導致併發問題,在不使用第三方外掛的情況下,用程式碼實現同步,初步程式碼如下,如果有什麼建議和意見,請留言,大家一起學習!

原理:多個伺服器中,選一臺伺服器作為中介,然後在各個伺服器同時爭搶同一個資源時候,都跳轉到中介的伺服器裡,然後在中介伺服器里加鎖,同步,排隊

用到的技術是:socket ,多執行緒 ,lock

貼程式碼:

DisCur.json

{
	"IP":"10.118.14.23",
	"port":2000
}
package sf.ibu.eric.core;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

import sf.ibu.eric.common.MySocketServer;
import sf.ibu.eric.common.Util;

public class DisCur {
	private static boolean inited = false;
	private static ThreadLocal<BufferedReader> reader = new ThreadLocal<BufferedReader>();
	private static ThreadLocal<PrintWriter> writer = new ThreadLocal<PrintWriter>();
	/**
	 * This method must run earlier to make sure server ready!
	 * @throws Exception
	 */
	public static void init() throws Exception {
		if (!inited) {
			inited = true;
			startInnerServer();
		}
	}
	/**
	 * 
	 * @param key 
	 *    Any server runs with same key consider same lock
	 * @throws Exception
	 */
	public static void lock(String key) throws Exception {
		Socket socket = new Socket(Conf.getIns().getIP(), Conf.getIns().getPort());
		OutputStream os = socket.getOutputStream();
		PrintWriter pw = new PrintWriter(os);
		writer.set(pw);
		BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
		reader.set(br);
		Util.write(pw, key);
		Util.read(br);
	}
	/**
	 * please call in finnally block
	 * @param key
	 */
	public static void unlock(String key) {
		PrintWriter pw = writer.get();
		Util.writeAndClose(pw, "");
	}

	public static void startInnerServer() throws Exception {
		if (Util.getLocalIP().equals(Conf.getIns().getIP())) {
			MySocketServer mySocketServer = new MySocketServer(Conf.getIns().getPort(),"DisCur server");
			mySocketServer.startListener();
		}
	}

}
package sf.ibu.eric.common;

import java.net.ServerSocket;
import java.net.Socket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sf.ibu.eric.core.Conf;

/**
 * this is socket server for receiving connection from terminal manager and
 * terminal. So in spring context there are 2 instances. one port is for
 * terminal manager and other for terminal
 * 
 * @author 01373525
 *
 */
public class MySocketServer {
	private ServerSocket server;
	private int port;
	private String name;
	public static long id;
	private static final Logger logger = LoggerFactory.getLogger(MySocketServer.class);
	public MySocketServer(int port, String name) throws Exception {
		this.name = name;
		this.port = port;
		init();
	}

	private void init() throws Exception {
		server = new ServerSocket(this.port);
	}

	public void startListener() {
		Thread thread = new Thread() {
			public void run() {
				while (true) {
					Socket socket = null;
					try {
						socket = server.accept();
						initClient(socket);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		};
		thread.setName(port + " at " + name);
		thread.start();
	}

	private void initClient(final Socket socket) throws Exception {
		String msg = Util.read(socket);
		String msgStr=msg.intern();
		String address = socket.getRemoteSocketAddress().toString();
		Thread thread = new Thread() {
			public void run() {
				try {
					synchronized (msgStr) {
						Util.write(socket, "");
						Util.read(socket);
					}
				} catch (Exception e) {
					try {
						Util.write(socket, "");
					} catch (Exception e1) {
						logger.error("Discur", e1);
					}
					logger.error("Discur", e);
				}
			}
		};
		thread.setName(String.format("%s:%s:%d", msgStr, address, id++));
		thread.start();
	}

}
package sf.ibu.eric.common;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

public class Util {

	public static String read(Socket socket) throws Exception {
		BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
		String temp = null;
		StringBuilder sb = new StringBuilder();
		while ((temp = br.readLine()) != null && !temp.equals("<<eos>>")) {
			sb.append(temp);
		}
		return sb.toString();
	}

	public static void write(Socket socket, String content) throws Exception {
		OutputStream os = socket.getOutputStream();
		PrintWriter pw = new PrintWriter(os);
		pw.write(content + "\n");
		// <<eos>> means this transfer is done
		pw.write("<<eos>>\n");
		pw.flush();
	}

	public static void write(PrintWriter pw, String v) {
		pw.write(v + "\n");
		pw.write("<<eos>>\n");
		pw.flush();
	}

}
package sf.ibu.eric.core;

import java.util.ArrayList;

import org.json.eric.JSONArray;
import org.json.eric.JSONException;
import org.json.eric.JSONObject;

import sf.ibu.eric.common.Util;

public class Conf {
	private JSONObject confJson;
	private static Conf ins;
	public static Conf getIns() throws Exception {
		if(ins==null) {
			ins=new Conf();
		}
		return ins;
	}
	private Conf() throws JSONException, Exception {
		refresh();
	}
	private void refresh() throws JSONException, Exception {
		String configureFile=Conf.class.getResource("/DisCur.json").toURI().getPath();
		confJson=new JSONObject(Util.file2String(configureFile));
	}
	
	public String getIP() {
		return confJson.optString("IP");
	}
	public int getPort() {
		return confJson.optInt("port");
	}
	public ArrayList<String> getPaths(){
		ArrayList<String> rList=new ArrayList<String>();
		JSONArray jsonArray=confJson.optJSONArray("filePaths");
		if(jsonArray!=null) {
			for(int i=0;i<jsonArray.length();i++) {
				rList.add(jsonArray.getString(i));
			}
		}
		return rList;
	}
	public JSONObject getConfJson() {
		return confJson;
	}
	
}
main方法
static int num=0;
	public static void main(String[] a) throws Exception {
		DisCur.init();
		Thread t1 = new Thread() {
			public void run() {
				for(int i=0;i<1000;i++) {
					try {
						DisCur.lock("a");
						num++;
						//write file
					
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}finally {
						DisCur.unlock("a");
					}
				}
				

			}
		};
		t1.start();
		t1.join();
		System.out.println(num);
	}