1. 程式人生 > >Socket簡易分散式的多執行緒通訊

Socket簡易分散式的多執行緒通訊

分散式計算中processor與master需要進行通訊,兩端程式應用了簡單的C/S通訊模型,一個job在處理時,server進行任務分配,多個client執行完當前任務之後,主動向server請求,server根據schedule algorithm,為client返回任務號,並註明是local task(task需要的data block在記憶體中)還是remote task(task需要的data block不在記憶體中)

Server端程式碼

//server端程式
import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

public class Server {
    private ServerSocket serverSocket;
    private HashSet<Socket> allSockets;
    public static List<Integer> Job = new ArrayList<>();
    public static HashMap<Integer,List<Integer>> ClientMemory = new HashMap<>();
    public static HashMap<Integer,Integer> Local = new HashMap<>();

    public Server(){
        try{
            serverSocket = new ServerSocket(4700);
        }catch (IOException e){
            e.printStackTrace();
        }
        allSockets = new HashSet<Socket>();
    }
    public void startService() throws IOException{
        //ServerThread sT;
        while(true){
            Socket s = serverSocket.accept();
            System.out.println("已接收到Client的請求...");
            allSockets.add(s);
            ServerThread ss = new ServerThread(s);
            ss.start();
            System.out.println(ss.getId());
        }
        //sT.sendMessageToAllClient("Job is finished");
    }
    private class ServerThread extends Thread{
        Socket socket;
        public ServerThread(Socket socket){
            this.socket = socket;
        }
        public void run(){
            try {
                BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter pw = new PrintWriter(socket.getOutputStream());
                while(true){
                    String str = br.readLine().toString();
                    while(!Job.isEmpty()){
                        if(str == "Apply"){
                            str = br.readLine().toString();
                            int taskId = Job.get(Job.size()-1);
                            int cId = Integer.valueOf(str);
                            if(ClientMemory.get(cId).contains(taskId)){
                                pw.println("local");
                                pw.flush();
                            }else{
                                pw.println("remote");
                                pw.flush();
                                pw.print(Local.get(taskId));
                                pw.flush();
                            }
                            str = br.readLine().toString();
                            if(str == "update"){
                                str = br.readLine().toString();
                                ClientMemory.get(cId).add(Local.get(taskId));
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void sendMessageToAllClient(String message) throws IOException{
            for(Socket s: allSockets){
                PrintWriter pw =new PrintWriter(s.getOutputStream());
                pw.println(message);
                pw.flush();
            }
        }
        public void sendMessageToClient(Socket s,String message) throws IOException{
                PrintWriter pw =new PrintWriter(s.getOutputStream());
                pw.println(message);
                pw.flush();
        }
        //生成給Client的訊息
        //如果任務所需data在該Client本地 通知local
        //如果data在其他伺服器 告知remote和data位置
        public String assignTaskToClient(int taskId){
            String result = null;
            return result;
        }
        public void updateClientMemory(int ClientId,int BlockId){}
        //判斷task需要的data是否在Client記憶體中
        public boolean isLocal(int taskId){
            return false;
        }
        //返回演算法結果中 將分配給Client的任務是哪個
        public int result(){
            int result = 0;
            return result;
        }
    }
    public static void main(String arg[]){
            Server ser = new Server();
        try {
            ser.startService();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Client端程式碼

//client端程式
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;

import static java.lang.System.in;

public class Client extends Thread{
    private Socket socket;
    private int ClientId;
    //那幾塊資料在記憶體
    static List<Integer> Memory = new ArrayList<Integer>();
    //取遠端資料,同時要呼叫更新Memory的函式

    public Client(String host,int port,int clientId)throws UnknownHostException,IOException {
        socket = new Socket(host, port);
        ClientId = clientId;
    }


    //與主機建立連線
    public void connectSer() throws IOException{
        new ClientThread(socket).start();
    }
    private class ClientThread extends Thread{
        Socket socket;
        public ClientThread(Socket socket){
            this.socket = socket;
        }
        public void run(){
            try{
                String message = null;
                BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter pw = new PrintWriter(socket.getOutputStream());
                while(br.readLine().toString()!="Finish"){
                    pw.println("Apply");
                    pw.flush();
                    pw.println(ClientId);
                    pw.flush();
                    message = br.readLine().toString();
                    if(message =="local"){
                        ProcessTask(3000);
                    }else if(message == "remote"){
                        message = br.readLine().toString();
                        getData(Integer.valueOf(message));
                        pw.print("update");
                        pw.flush();
                        pw.print(Integer.valueOf(message));
                        pw.flush();
                        ProcessTask(3000);
                    }
                    br.readLine();
                }
                socket.close();
            }catch (IOException e){
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void getData(int serverId){
        System.out.println("Get data from No."+ serverId);
    }
    synchronized public void updateMemory(int BlockId){
        Memory.add(BlockId);
    }
    //模擬執行任務需要的時間
    public void ProcessTask(long taskCost) throws InterruptedException {
        ClientThread.sleep(taskCost);
    }
    //請求一個任務,需要告知Server遠端task的data已經拉入本地記憶體
    synchronized public int ApplyTask(){
        String dataMessage = null;
        int serId = 0;
        return serId;
    }
    public void close(){
        try{
            socket.close();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    public static void main(String arg[]){
        try {
            Client c1 = new Client("127.0.0.1",4700,1);
            Client c2 = new Client("127.0.0.1",4700,2);
            Client c3 = new Client("127.0.0.1",4700,3);
            Client c4 = new Client("127.0.0.1",4700,4);
            Client c5 = new Client("127.0.0.1",4700,5);
            Client c6 = new Client("127.0.0.1",4700,6);
            c1.connectSer();
            c2.connectSer();
            c3.connectSer();
            c4.connectSer();
            c5.connectSer();
            c6.connectSer();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

Socket的通訊太過麻煩,後期會用其他的框架替換掉