1. 程式人生 > >分散式Web應用----基於Socket+動態代理實現簡單RPC 生產者消費者模型

分散式Web應用----基於Socket+動態代理實現簡單RPC 生產者消費者模型

寫在前面

前面一文主要簡單介紹了JAVA動態代理基本原理,這也是實現RPC的基本知識,這裡我們運用Socket簡單實現一個遠端過程呼叫,方便大家理解RPC的基本原理,希望對大家有所幫助。

新建People介面類與Man實現類

介面類

public interface People {
    public  String sayHello(String name);
}

實現類

public class Man implements People {
    public String sayHello(String name) {
        System.out.println("call Man sayHello "
); return "Hello "+name; } }

這兩個類都比較簡單,沒什麼好說的了。

新建生產者執行緒類ProducerTask 和處理Socket服務類RPCProducer

生產者處理連線執行緒類

public class ProducerTask implements  Runnable {

    Socket client=null;
    public  ProducerTask(Socket client){
        this.client=client;
    }
    public void run() {
        ObjectInputStream input=null
; ObjectOutputStream output=null; try { input=new ObjectInputStream(client.getInputStream()); String interFaceName=input.readUTF(); Class<?> service=Class.forName(interFaceName); String methodName=input.readUTF(); Class<?> [] paramterTypes= (Class<?>[]) input.readObject(); Object [] arguments= (Object[]) input.readObject(); Method method=service.getMethod(methodName,paramterTypes); Object result=method.invoke(service.newInstance(),arguments); System.out.println("遠端呼叫物件名為:"
+service.getName()); System.out.println("遠端呼叫方法名為:"+methodName); for (Object obj:arguments){ System.out.println("遠端呼叫引數為:"+obj); } output=new ObjectOutputStream(client.getOutputStream()); output.writeObject(result); } catch (Exception e) { e.printStackTrace(); }finally { try { output.close(); input.close(); client.close(); } catch (IOException e) { e.printStackTrace(); } } } }

ProducerTask 類主要是處理請求子執行緒,在該類中通過Socket連接獲取消費者傳送過來的 物件名、方法名、方法引數、引數列表,再利用反射機制,完成方法的呼叫,並向消費者返回呼叫的結果。

處理Socket服務類

public class RPCProducer {
    static Executor executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    public static void exproter(String hostName,int port) throws Exception {
        ServerSocket server=new ServerSocket();
        server.bind(new InetSocketAddress(hostName,port));
        try {
            while (true){
                System.out.println("get client call");
                executor.execute(new ProducerTask(server.accept()));
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            server.close();
        }
    }
}

該類是生產者監聽連線類,這裡面主要使用了執行緒池,每當一個連線過來時,將執行緒池中增加一個執行緒,並讓子執行緒去處理連線請求。

新建消費者動態代理類和消費者類

消費者動態代理類

public class ConsumerHandler<T> implements InvocationHandler {
    private Class<?> object;
    private InetSocketAddress address;
    public ConsumerHandler(Class<?> object, InetSocketAddress address){
        this.object=object;
        this.address=address;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println(" call peopleProxyHandler invoke ");
        System.out.println("呼叫物件為:"+object.getClass().getName());
        System.out.println("呼叫方法為:"+method.getName());
        for (Object obj:args){
            System.out.println("引數為:"+obj.toString());
        }

        Class cl=Class.forName(object.getClass().getName());

        Socket socket=null;
        ObjectOutputStream output=null;
        ObjectInputStream  input=null;

        socket=new Socket();
        socket.connect(address);
        output=new ObjectOutputStream(socket.getOutputStream());
        output.writeUTF(object.getName());
        output.writeUTF(method.getName());
        output.writeObject(method.getParameterTypes());
        output.writeObject(args);
        input =new ObjectInputStream(socket.getInputStream());

        return  input.readObject();
    }
}

該類實現了InvocationHandler 介面,主要將對本地方法的呼叫,代理到此類中,在invoke方法中,獲取呼叫方法的物件名、方法名、引數型別及引數列表,再通過連線生產者的Socket服務,將這些引數傳送給生產者,並返回生產者返回的結果。

消費者類

public class RPCConsumer<T> {
    public T improter(Class<?> serviceClass, InetSocketAddress address){
        ConsumerHandler<T> consumerHandler=new ConsumerHandler<T>(serviceClass,address);
        Class<?> intf=serviceClass.getInterfaces()[0];
        Class<?> [] interfaces=new Class<?>[]{intf};
        return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),interfaces,consumerHandler);
    }
}

該類主要是返回一個代理物件,完成動態代理功能

新建生產者和服務者測試類

生產者測試類

public class ProducerTest {
    public  static  void  main(String [] rags){
        new Thread(new Runnable() {
            public void run() {
                try {
                    RPCProducer.exproter("localhost",8080);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

這是生產者測試類,主要在開啟生產者服務,監聽埠為8080

消費者測試類

    public static void main(String [] rags){
        RPCConsumer<People> peopleRPCConsumer=new RPCConsumer<People>();
        People people=peopleRPCConsumer.improter(Man.class,new InetSocketAddress("localhost",8080));
        System.out.println(people.sayHello("zhaochao"));

    }

消費者測試類,通過peopleRPCConsumer 生成代理物件people,呼叫people的方法,完成遠端呼叫

結果

生產者輸出

get client call
get client call
call Man sayHello 
遠端呼叫物件名為:com.zhaocaho.proxy.Man
遠端呼叫方法名為:sayHello
遠端呼叫引數為:zhaochao

消費者輸出

 call peopleProxyHandler invoke 
呼叫物件為:java.lang.Class
呼叫方法為:sayHello
引數為:zhaochao
Hello zhaochao