輕量級分散式RPC框架實現(續)
最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的dubbo。這個RPC框架雖小,但是麻雀雖小,五臟俱全,有興趣的可以學習一下。
本人在這個簡易版的RPC上添加了如下特性:
* 服務非同步呼叫的支援,回撥函式callback的支援
* 客戶端使用長連線(在多次呼叫共享連線)
* 服務端非同步多執行緒處理RPC請求
2、簡介
RPC,即 Remote Procedure Call(遠端過程呼叫),呼叫遠端計算機上的服務,就像呼叫本地服務一樣。RPC可以很好的解耦系統,如WebService就是一種基於Http協議的RPC。
這個RPC整體框架如下:
這個RPC框架使用的一些技術所解決的問題:
服務釋出與訂閱:服務端使用Zookeeper註冊服務地址,客戶端從Zookeeper獲取可用的服務地址。
通訊:使用Netty作為通訊框架。
Spring:使用Spring配置服務,載入Bean,掃描註解。
動態代理:客戶端使用代理模式透明化服務呼叫。
訊息編解碼:使用Protostuff序列化和反序列化訊息。
3、服務端釋出服務
使用註解標註要釋出的服務
服務註解
[java] view plain copy print?- @Target({ElementType.TYPE})
- @Retention
- @Component
- public@interface RpcService {
- Class<?> value();
- }
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class<?> value();
}
一個服務介面:
[java] view plain copy print?
- publicinterface HelloService {
- String hello(String name);
- String hello(Person person);
- }
public interface HelloService {
String hello(String name);
String hello(Person person);
}
一個服務實現:使用註解標註
[java]
view plain
copy
print?
- @RpcService(HelloService.class)
- publicclass HelloServiceImpl implements HelloService {
- @Override
- public String hello(String name) {
- return"Hello! " + name;
- }
- @Override
- public String hello(Person person) {
- return"Hello! " + person.getFirstName() + " " + person.getLastName();
- }
- }
@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "Hello! " + name;
}
@Override
public String hello(Person person) {
return "Hello! " + person.getFirstName() + " " + person.getLastName();
}
}
服務在啟動的時候掃描得到所有的服務介面及其實現:
[java] view plain copy print?
- @Override
- publicvoid setApplicationContext(ApplicationContext ctx) throws BeansException {
- Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
- if (MapUtils.isNotEmpty(serviceBeanMap)) {
- for (Object serviceBean : serviceBeanMap.values()) {
- String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
- handlerMap.put(interfaceName, serviceBean);
- }
- }
- }
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
在Zookeeper叢集上註冊服務地址:
[java] view plain copy print?
- publicclass ServiceRegistry {
- privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
- private CountDownLatch latch = new CountDownLatch(1);
- private String registryAddress;
- public ServiceRegistry(String registryAddress) {
- this.registryAddress = registryAddress;
- }
- publicvoid register(String data) {
- if (data != null) {
- ZooKeeper zk = connectServer();
- if (zk != null) {
- AddRootNode(zk); // Add root node if not exist
- createNode(zk, data);
- }
- }
- }
- private ZooKeeper connectServer() {
- ZooKeeper zk = null;
- try {
- zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
- @Override
- publicvoid process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- latch.await();
- } catch (IOException e) {
- LOGGER.error("", e);
- }
- catch (InterruptedException ex){
- LOGGER.error("", ex);
- }
- return zk;
- }
- privatevoid AddRootNode(ZooKeeper zk){
- try {
- Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
- if (s == null) {
- zk.create(Constant.ZK_REGISTRY_PATH, newbyte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (KeeperException e) {
- LOGGER.error(e.toString());
- } catch (InterruptedException e) {
- LOGGER.error(e.toString());
- }
- }
- privatevoid createNode(ZooKeeper zk, String data) {
- try {
- byte[] bytes = data.getBytes();
- String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- LOGGER.debug("create zookeeper node ({} => {})", path, data);
- } catch (KeeperException e) {
- LOGGER.error("", e);
- }
- catch (InterruptedException ex){
- LOGGER.error("", ex);
- }
- }
- }
public class ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
public ServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
AddRootNode(zk); // Add root node if not exist
createNode(zk, data);
}
}
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException e) {
LOGGER.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
}
return zk;
}
private void AddRootNode(ZooKeeper zk){
try {
Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
if (s == null) {
zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
LOGGER.error(e.toString());
} catch (InterruptedException e) {
LOGGER.error(e.toString());
}
}
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException e) {
LOGGER.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
}
}
}
這裡在原文的基礎上加了AddRootNode()判斷服務父節點是否存在,如果不存在則新增一個PERSISTENT的服務父節點,這樣雖然啟動服務時多了點判斷,但是不需要手動命令新增服務父節點了。
關於Zookeeper的使用原理,可以看這裡《ZooKeeper基本原理》。
4、客戶端呼叫服務
使用代理模式呼叫服務:
[java] view plain copy print?- publicclass RpcProxy {
- private String serverAddress;
- private ServiceDiscovery serviceDiscovery;
- public RpcProxy(String serverAddress) {
- this.serverAddress = serverAddress;
- }
- public RpcProxy(ServiceDiscovery serviceDiscovery) {
- this.serviceDiscovery = serviceDiscovery;
- }
- @SuppressWarnings("unchecked")
- public <T> T create(Class<?> interfaceClass) {
- return (T) Proxy.newProxyInstance(
- interfaceClass.getClassLoader(),
- new Class<?>[]{interfaceClass},
- new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- RpcRequest request = new RpcRequest();
- request.setRequestId(UUID.randomUUID().toString());
- request.setClassName(method.getDeclaringClass().getName());
- request.setMethodName(method.getName());
- request.setParameterTypes(method.getParameterTypes());
- request.setParameters(args);
- if (serviceDiscovery != null) {
- serverAddress = serviceDiscovery.discover();
- }
- if(serverAddress != null){
- String[] array = serverAddress.split(":");
- String host = array[0];
- int port = Integer.parseInt(array[1]);
- RpcClient client = new RpcClient(host, port);
- RpcResponse response = client.send(request);
- if (response.isError()) {
- thrownew RuntimeException("Response error.",new Throwable(response.getError()));
- } else {
- return response.getResult();
- }
- }
- else{
- thrownew RuntimeException("No server address found!");
- }
- }
- }
- );
- }
- }
public class RpcProxy {
private String serverAddress;
private ServiceDiscovery serviceDiscovery;
public RpcProxy(String serverAddress) {
this.serverAddress = serverAddress;
}
public RpcProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
if(serverAddress != null){
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
RpcClient client = new RpcClient(host, port);
RpcResponse response = client.send(request);
if (response.isError()) {
throw new RuntimeException("Response error.",new Throwable(response.getError()));
} else {
return response.getResult();
}
}
else{
throw new RuntimeException("No server address found!");
}
}
}
);
}
}
這裡每次使用代理遠端呼叫服務,從Zookeeper上獲取可用的服務地址,通過RpcClient send一個Request,等待該Request的Response返回。這裡原文有個比較嚴重的bug,在原文給出的簡單的Test中是很難測出來的,原文使用了obj的wait和notifyAll來等待Response返回,會出現“假死等待”的情況:一個Request傳送出去後,在obj.wait()呼叫之前可能Response就返回了,這時候在channelRead0裡已經拿到了Response並且obj.notifyAll()已經在obj.wait()之前呼叫了,這時候send後再obj.wait()就出現了假死等待,客戶端就一直等待在這裡。使用CountDownLatch可以解決這個問題。
注意:這裡每次呼叫的send時候才去和服務端建立連線,使用的是短連線,這種短連線在高併發時會有連線數問題,也會影響效能。
從Zookeeper上獲取服務地址:
[java] view plain copy print?
- publicclass ServiceDiscovery {
- privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
- private CountDownLatch latch = new CountDownLatch(1);
- privatevolatile List<String> dataList = new ArrayList<>();
- private String registryAddress;
- public ServiceDiscovery(String registryAddress) {
- this.registryAddress = registryAddress;
- ZooKeeper zk = connectServer();
- if (zk != null) {
- watchNode(zk);
- }
- }
- public String discover() {
- String data = null;
- int size = dataList.size();
- if (size > 0) {
- if (size == 1) {
- data = dataList.get(0);
- LOGGER.debug("using only data: {}", data);
- } else {
- data = dataList.get(ThreadLocalRandom.current().nextInt(size));
- LOGGER.debug("using random data: {}", data);
- }
- }
- return data;
- }
- private ZooKeeper connectServer() {
- ZooKeeper zk = null;
- try {
- zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
- @Override
- publicvoid process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- latch.await();
- } catch (IOException | InterruptedException e) {
- LOGGER.error("", e);
- }
- return zk;
- }
- privatevoid watchNode(final ZooKeeper zk) {
- try {
- List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
- @Override
- publicvoid process(WatchedEvent event) {
- if (event.getType() == Event.EventType.NodeChildrenChanged) {
- watchNode(zk);
- }
- }
- });
- List<String> dataList = new ArrayList<>();
- for (String node : nodeList) {
- byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
- dataList.add(new String(bytes));
- }
- LOGGER.debug("node data: {}", dataList);
- this.dataList = dataList;
- } catch (KeeperException | InterruptedException e) {
- LOGGER.error("", e);
- }
- }
- }
public class ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> dataList = new ArrayList<>();
private String registryAddress;
public ServiceDiscovery(String registryAddress) {
this.registryAddress = registryAddress;
ZooKeeper zk = connectServer();
if (zk != null) {
watchNode(zk);
}
}
public String discover() {
String data = null;
int size = dataList.size();
if (size > 0) {
if (size == 1) {
data = dataList.get(0);
LOGGER.debug("using only data: {}", data);
} else {
data = dataList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random data: {}", data);
}
}
return data;
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
}
return zk;
}
private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
LOGGER.debug("node data: {}", dataList);
this.dataList = dataList;
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
}
}
}
每次服務地址節點發生變化,都需要再次watchNode,獲取新的服務地址列表。
5、訊息編碼
請求訊息:
[java] view plain copy print?- publicclass RpcRequest {
- private String requestId;
- private String className;
- private String methodName;
- private Class<?>[] parameterTypes;
- private Object[] parameters;
- public String getRequestId() {
- return requestId;
- }
- publicvoid setRequestId(String requestId) {
- this.requestId = requestId;
- }
- public String getClassName() {
- return className;
- }
- publicvoid setClassName(String className) {
- this.className = className;
- }
- public String getMethodName() {
- return methodName;
- }
- publicvoid setMethodName(String methodName) {
- this.methodName = methodName;
- }
- public Class<?>[] getParameterTypes() {
- return parameterTypes;
- }
- publicvoid setParameterTypes(Class<?>[] parameterTypes) {
- this.parameterTypes = parameterTypes;
- }
- public Object[] getParameters() {
- return parameters;
- }
- publicvoid setParameters(Object[] parameters) {
- this.parameters = parameters;
- }
- }
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
響應訊息:
[java]
view plain
copy
print?
- publicclass RpcResponse {
- private String requestId;
- private String error;
- private Object result;
- publicboolean isError() {
- return error != null;
- }
- public String getRequestId() {
- return requestId;
- }
- publicvoid setRequestId(String requestId) {
- this.requestId = requestId;
- }
- public String getError() {
- return error;
- }
- publicvoid setError(String error) {
- this.error = error;
- }
- public Object getResult() {
- return result;
- }
- publicvoid setResult(Object result) {
- this.result = result;
- }
- }
public class RpcResponse {
private String requestId;
private String error;
private Object result;
public boolean isError() {
return error != null;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
訊息序列化和反序列化工具:(基於 Protostuff 實現) [java] view plain copy print?
- publicclass SerializationUtil {
- privatestatic Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
- privatestatic Objenesis objenesis = new ObjenesisStd(true);
- private SerializationUtil() {
- }
- @SuppressWarnings("unchecked")
- privatestatic <T> Schema<T> getSchema(Class<T> cls) {
- Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
- if (schema == null) {
- schema = RuntimeSchema.createFrom(cls);
- if (schema != null) {
- cachedSchema.put(cls, schema);
- }
- }
- return schema;
- }
- /**
- * 序列化(物件 -> 位元組陣列)
- */
- @SuppressWarnings("unchecked")
- publicstatic <T> byte[] serialize(T obj) {
- Class<T> cls = (Class<T>) obj.getClass();
- LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
- try {
- Sc