分散式多執行緒同步實現
阿新 • • 發佈:2018-12-18
簡介:多執行緒請求同一個資源,導致併發問題,在不使用第三方外掛的情況下,用程式碼實現同步,初步程式碼如下,如果有什麼建議和意見,請留言,大家一起學習!
原理:多個伺服器中,選一臺伺服器作為中介,然後在各個伺服器同時爭搶同一個資源時候,都跳轉到中介的伺服器裡,然後在中介伺服器里加鎖,同步,排隊
用到的技術是:socket ,多執行緒 ,lock
貼程式碼:
DisCur.json
{
"IP":"10.118.14.23",
"port":2000
}
package sf.ibu.eric.core; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; import sf.ibu.eric.common.MySocketServer; import sf.ibu.eric.common.Util; public class DisCur { private static boolean inited = false; private static ThreadLocal<BufferedReader> reader = new ThreadLocal<BufferedReader>(); private static ThreadLocal<PrintWriter> writer = new ThreadLocal<PrintWriter>(); /** * This method must run earlier to make sure server ready! * @throws Exception */ public static void init() throws Exception { if (!inited) { inited = true; startInnerServer(); } } /** * * @param key * Any server runs with same key consider same lock * @throws Exception */ public static void lock(String key) throws Exception { Socket socket = new Socket(Conf.getIns().getIP(), Conf.getIns().getPort()); OutputStream os = socket.getOutputStream(); PrintWriter pw = new PrintWriter(os); writer.set(pw); BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); reader.set(br); Util.write(pw, key); Util.read(br); } /** * please call in finnally block * @param key */ public static void unlock(String key) { PrintWriter pw = writer.get(); Util.writeAndClose(pw, ""); } public static void startInnerServer() throws Exception { if (Util.getLocalIP().equals(Conf.getIns().getIP())) { MySocketServer mySocketServer = new MySocketServer(Conf.getIns().getPort(),"DisCur server"); mySocketServer.startListener(); } } }
package sf.ibu.eric.common; import java.net.ServerSocket; import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sf.ibu.eric.core.Conf; /** * this is socket server for receiving connection from terminal manager and * terminal. So in spring context there are 2 instances. one port is for * terminal manager and other for terminal * * @author 01373525 * */ public class MySocketServer { private ServerSocket server; private int port; private String name; public static long id; private static final Logger logger = LoggerFactory.getLogger(MySocketServer.class); public MySocketServer(int port, String name) throws Exception { this.name = name; this.port = port; init(); } private void init() throws Exception { server = new ServerSocket(this.port); } public void startListener() { Thread thread = new Thread() { public void run() { while (true) { Socket socket = null; try { socket = server.accept(); initClient(socket); } catch (Exception e) { e.printStackTrace(); } } } }; thread.setName(port + " at " + name); thread.start(); } private void initClient(final Socket socket) throws Exception { String msg = Util.read(socket); String msgStr=msg.intern(); String address = socket.getRemoteSocketAddress().toString(); Thread thread = new Thread() { public void run() { try { synchronized (msgStr) { Util.write(socket, ""); Util.read(socket); } } catch (Exception e) { try { Util.write(socket, ""); } catch (Exception e1) { logger.error("Discur", e1); } logger.error("Discur", e); } } }; thread.setName(String.format("%s:%s:%d", msgStr, address, id++)); thread.start(); } }
package sf.ibu.eric.common; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; public class Util { public static String read(Socket socket) throws Exception { BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); String temp = null; StringBuilder sb = new StringBuilder(); while ((temp = br.readLine()) != null && !temp.equals("<<eos>>")) { sb.append(temp); } return sb.toString(); } public static void write(Socket socket, String content) throws Exception { OutputStream os = socket.getOutputStream(); PrintWriter pw = new PrintWriter(os); pw.write(content + "\n"); // <<eos>> means this transfer is done pw.write("<<eos>>\n"); pw.flush(); } public static void write(PrintWriter pw, String v) { pw.write(v + "\n"); pw.write("<<eos>>\n"); pw.flush(); } }
package sf.ibu.eric.core;
import java.util.ArrayList;
import org.json.eric.JSONArray;
import org.json.eric.JSONException;
import org.json.eric.JSONObject;
import sf.ibu.eric.common.Util;
public class Conf {
private JSONObject confJson;
private static Conf ins;
public static Conf getIns() throws Exception {
if(ins==null) {
ins=new Conf();
}
return ins;
}
private Conf() throws JSONException, Exception {
refresh();
}
private void refresh() throws JSONException, Exception {
String configureFile=Conf.class.getResource("/DisCur.json").toURI().getPath();
confJson=new JSONObject(Util.file2String(configureFile));
}
public String getIP() {
return confJson.optString("IP");
}
public int getPort() {
return confJson.optInt("port");
}
public ArrayList<String> getPaths(){
ArrayList<String> rList=new ArrayList<String>();
JSONArray jsonArray=confJson.optJSONArray("filePaths");
if(jsonArray!=null) {
for(int i=0;i<jsonArray.length();i++) {
rList.add(jsonArray.getString(i));
}
}
return rList;
}
public JSONObject getConfJson() {
return confJson;
}
}
main方法
static int num=0;
public static void main(String[] a) throws Exception {
DisCur.init();
Thread t1 = new Thread() {
public void run() {
for(int i=0;i<1000;i++) {
try {
DisCur.lock("a");
num++;
//write file
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
DisCur.unlock("a");
}
}
}
};
t1.start();
t1.join();
System.out.println(num);
}