1. 程式人生 > >利用Thrift和zk簡單實現服務治理框架中的訂閱釋出機制

利用Thrift和zk簡單實現服務治理框架中的訂閱釋出機制

本文簡單介紹下利用Thrift和zk簡單實現服務治理框架服務的訂閱釋出機制,類似於Dubbo的服務治理。這個只是簡單版本,只供學習和理解用。


全部程式碼下載:Github連結:github連結,點選驚喜;寫文章不易,歡迎大家採我的文章,以及給出有用的評論,當然大家也可以關注一下我的github;多謝;

1.什麼是服務治理:

1.1微服務簡單介紹:

微服務已經成為當下最熱門的話題之一。它是一種新的架構風格,涉及組織架構、設計、交付、運維等方面的變革,核心目標是為了解決系統的交付週期,並降低維護成本和研發成本。相比傳統的SOA架構或者單塊架構,微服務有很多的優勢,比如技術的多樣性、模組化、獨立部署等,但也帶來了相應的成本,比如運維成本、服務管理成本等。

1.2服務治理的出現

在微服務盛行下,利用RMI或Hessian等工具,簡單的暴露和引用遠端服務,通過配置服務的URL地址進行呼叫已經變得越來越不能滿足需求。
1.服務越來越多時,服務URL配置管理變得非常困難。
2.服務間依賴關係變得錯蹤複雜
3.服務的呼叫量越來越大,服務的容量問題就暴露出來,這個服務需要多少機器支撐?什麼時候該加機器?
4…….等等
為了滿足服務線下管控、保障線上高效執行,需要有一個統一的服務治理框架對服務進行統一、有效管控,保障服務的高效、健康執行。服務治理是分散式服務框架的一個可選特性,儘管從服務開發和執行角度看它不是必須的,但是如果沒有服務治理功能,分散式服務框架的服務SLA很難得到保障,服務化也很難真正實施成功。
基於以上原因,需要對各個服務做治理,這也是就為什麼有了dubbo這類服務治理框架,它與其他RPC框架相比(例如thrift,avro),不僅僅提供了透明的服務呼叫,而且還提供了服務治理,比如上述的呼叫統計管理、負載均衡,這樣每個業務模組只需專注於自己的內部業務邏輯即可。

1.3服務治理的幾個要素:

服務管理元件:這個元件是“服務治理”的核心元件,您的服務治理框架有多強大,主要取決於您的服務管理元件功能有多強大。它至少具有的功能包括:服務註冊管理、訪問路由;另外,它還可以具有:服務版本管理、服務優先順序管理、訪問許可權管理、請求數量限制、連通性管理、註冊服務叢集、節點容錯、事件訂閱-釋出、狀態監控,等等功能。

服務提供者(服務生產者):即服務的具體實現,然後按照服務治理框架特定的規範釋出到服務管理元件中。這意味著什麼呢?這意味著,服務提供者不一定按照RPC呼叫的方式釋出服務,而是按照整個服務治理框架所規定的方式進行釋出(如果服務治理框架要求服務提供者以RPC呼叫的形式進行釋出,那麼服務提供者就必須以RPC呼叫的形式進行釋出;如果服務治理框架要求服務提供者以Http介面的形式進行釋出,那麼服務提供者就必須以Http介面的形式進行釋出,但後者這種情況一般不會出現)。

服務使用者(服務消費者):即呼叫這個服務的使用者,呼叫者首先到服務管理元件中查詢具體的服務所在的位置;服務管理元件收到查詢請求後,將向它返回具體的服務所在位置(視服務管理元件功能的不同,還有可能進行這些計算:判斷服務呼叫者是否有許可權進行呼叫、是否需要生成認證標記、是否需要重新檢查服務提供者的狀態、讓呼叫者使用哪一個服務版本等等)。服務呼叫者在收到具體的服務位置後,向服務提供者發起正式請求,並且返回相應的結果。第二次呼叫時,服務請求者就可以像服務提供者直接發起呼叫請求了(當然,您可以有一個服務提供期限的設定,使用租約協議就可以很好的實現)。

參考於: http://blog.csdn
.net/yinwenjie/article/details/49869535

簡單畫了如下圖:
00

1.4服務的訂閱釋出機制

它的核心理念是實現服務消費者和服務提供者的解耦,讓服務消費者能夠像使用本地介面一樣消費遠端的服務提供者,而不需要關心服務提供者的位置資訊,實現透明化呼叫。常用的服務註冊中心有Zookeeper、ETCD,以及基於資料庫的配置中心。

2.設計一個服務治理框架中的訂閱釋出機制

2.1使用的技術:

2.2設計思路

1.利用Zookeeper建立/Service根目錄,在該目錄下建立相應的服務介面子目錄存放該介面的IP地址和埠號—註冊服務
2.利用Thrift建立服務和啟動服務
3.利用Zookeeper去對應目錄/Service訂閱相應服務獲得介面的IP地址和埠號,並註冊監聽事件,當目錄改變時更新介面的IP地址和埠號—訂閱服務

3.實現訂閱釋出機制

3.1實現步驟:

1.編寫Thrift的IDL並編譯出相應的介面類。
2.實現相應的介面。
3.編寫服務啟動和註冊服務類。
4.編寫相應的客戶端訂閱服務。

3.2程式碼實現

工程為maven工程,假如不建立maven工程,請下載對應的lib包。
具體實現原理,見註釋:
1.IDL檔案和編譯:

//名稱空間定義:java包
namespace java cn.wpeace.thrift
//結構體定義:轉化java中的實體類
struct Request{
      1:required string userName;
      2:required string password;
}
//定義返回型別
struct Student{
        1:required string naem;
        2:required i32 age;
}
struct People{
        1:required string naem;
        2:required i32 age;
        3:required string sex;
}
//異常描述定義
exception HelloException{
       1:required string msg;
}
//服務定義,生成介面用
service StudentService{
            list<Student> getAllStudent(1:Request request)throws (1:HelloException e);
}
//服務定義,生成介面用
service PeopleService{
            list<People> getAllPeople(1:Request request)throws (1:HelloException e);
}
//thrift -gen java ./zk.thrift

2.實現相應介面:

public class StudentServiceImpl implements Iface {// 實現的是StudentService類下面的介面
    @Override
    public List<Student> getAllStudent(Request request) throws HelloException, TException {
        System.out.println("呼叫studentService");
        System.out.println(request.getUserName());
        System.out.println(request.getPassword());
        List<Student> students = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            Student student = new Student();
            student.setNaem("peace" + i);
            student.setAge(22 + i);
            students.add(student);
        }
        return students;
    }
} 
public class PeopleServiceImpl implements Iface{
    @Override
    public List<People> getAllPeople(Request request) throws HelloException, TException {
        System.out.println("呼叫PeopleService");
        System.out.println(request.getUserName());
        System.out.println(request.getPassword());
        List<People>peoples=new ArrayList<>();
        for(int i=0;i<5;i++)
        {
            People people=new People("wpeace", 22+i, "男");
            peoples.add(people);
        }
        return peoples;
    }
} 

3.實現服務啟動和註冊類

package cn.wpeace.thriftService;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.StudentService;
import net.sf.json.JSONObject;
public class ServiceSatrt implements Watcher{
    //初始化log4j
    static{
        BasicConfigurator.configure();
    }
    private static final Log LOGGER=LogFactory.getLog(ServiceSatrt.class);
    private static final Integer[] PORTS={8081,8082};
    public static final String serviceNames[]={"studentService","peopleService"}; 
    private static final String SERVICE_IP="192.168.1.118";
    private CountDownLatch connectedSignal=new CountDownLatch(1);//用於建立連線
    private ZooKeeper zk ;
    /**
     * thrift服務啟動標記
     */
    private Integer isThriftStart=0;

    /**
     * 啟動所有服務
     */
    private void startServer(){
        ServiceSatrt.LOGGER.info("啟動Thrift執行緒");
        // 建立啟動執行緒:
        StartServerThread studenThread = new StartServerThread(PORTS[0],
                new StudentService.Processor<StudentService.Iface>(new StudentServiceImpl()));
        StartServerThread peopleThread = new StartServerThread(PORTS[1],
                new PeopleService.Processor<PeopleService.Iface>(new PeopleServiceImpl()));
        ExecutorService pool = Executors.newFixedThreadPool(2);

        pool.submit(studenThread);
        pool.submit(peopleThread);
        //關閉執行緒池:執行緒仍然在執行
        pool.shutdown();
    }
    private class StartServerThread implements Runnable{
        private Integer port;
        private TProcessor processor;
        public StartServerThread(Integer port,TProcessor processor) {
            this.port=port;
            this.processor=processor;
        }
        @Override
        public void run() {
            ServiceSatrt.LOGGER.info("thrift服務正在準備啟動");
            try {
                // 非阻塞式
                TNonblockingServerSocket serverSocket=new TNonblockingServerSocket(port);
                // 為伺服器設定對應的IO網路模型
                TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
                // 設定控制器
                tArgs.processor(processor);
                // 設定訊息封裝格式
                tArgs.protocolFactory(new TBinaryProtocol.Factory());//Thrift特有的一種二進位制描述格式
                // 啟動Thrift服務
                TNonblockingServer server = new TNonblockingServer(tArgs);
                server.setServerEventHandler(new StartServerEventHander());
                server.serve();//啟動後,程式就停在這裡了。
            } catch (TTransportException e) {
                e.printStackTrace();
            }

        }

    }
    private class StartServerEventHander implements TServerEventHandler{

        @Override
        public void preServe() {
            synchronized (isThriftStart) {
                isThriftStart++;//當全部服務啟動成功才連線zk
                if(isThriftStart==2){
                    synchronized (ServiceSatrt.this) {
                        ServiceSatrt.LOGGER.info("thrift服務啟動完成");
                        ServiceSatrt.this.notify();
                    }
                }
            }
        }
        @Override
        public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
            return null;
        }
        @Override
        public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
        }
        @Override
        public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
        }
    }
    private void connectZk() throws KeeperException, InterruptedException, IOException{
        // 連線到zk伺服器叢集,新增預設的watcher監聽
        zk= new ZooKeeper("192.168.1.127:2181", 120000, this);
        connectedSignal.await();
        // 建立一個父級節點Service
        Stat pathStat = null;
        try {
            pathStat = zk.exists("/Service", false);
            // 如果條件成立,說明節點不存在(只需要判斷一個節點的存在性即可)
            // 建立的這個節點是一個“永久狀態”的節點
            if (pathStat == null) {
                zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.exit(-1);
        }
        // 開始新增子級節點,每一個子級節點都表示一個這個服務提供者提供的業務服務
        for (int i = 0; i < 2; i++) {
            JSONObject nodeData = new JSONObject();
            nodeData.put("ip", SERVICE_IP);
            nodeData.put("port", PORTS[i]);
            zk.create("/Service/" + serviceNames[i], nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL);
        }
        // 執行到這裡,說明所有的service都啟動完成了
        ServiceSatrt.LOGGER.info("===================所有service都啟動完成了,主執行緒開始啟動===================");
    }
    @Override
    public void process(WatchedEvent event) {
        //建立連線用
        if(event.getState()==KeeperState.SyncConnected){
            connectedSignal.countDown();
            return;
        }
        //暫在這裡不做處理,正常情況下需要處理。

    }
    public static void main(String[] args) {
        //啟動服務
        ServiceSatrt serviceSatrt=new ServiceSatrt();
        serviceSatrt.startServer();
        //等待服務啟動完成
        synchronized (serviceSatrt) {
            try {
                while (serviceSatrt.isThriftStart<2) {
                    serviceSatrt.wait();
                }
            } catch (Exception e) {
                ServiceSatrt.LOGGER.error(e);
                System.out.println(-1);
            }
        }
        //啟動連線
        try {
            serviceSatrt.connectZk();
        } catch (Exception e) {
            ServiceSatrt.LOGGER.error(e);
            System.out.println(-1);
        }    
    }
}

4.編寫客戶端類:

package cn.wpeace.thriftClinet;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.sound.midi.VoiceStatus;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

import cn.wpeace.thrift.People;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.Request;
import cn.wpeace.thrift.Student;
import cn.wpeace.thrift.StudentService;
import cn.wpeace.thriftService.ServiceSatrt;
import net.sf.json.JSONObject;

public class ThriftClinet implements Watcher{
    static{
        BasicConfigurator.configure();
    }
    private static final Log LOGGER=LogFactory.getLog(ThriftClinet.class);
    private String serverIp;
    private String serverPort;
    private String servername;
    private CountDownLatch connectedSignal=new CountDownLatch(1);//用於建立連線
    private ZooKeeper zk;
    private  void init(String servername) throws IOException, KeeperException, InterruptedException{
        // 連線到zk伺服器叢集,新增預設的watcher監聽
        this.zk = new ZooKeeper("192.168.1.127:2181", 120000, this);
        connectedSignal.await();
        this.servername=servername;
        updateServer();
        ThriftClinet.LOGGER.info("初始化完成");
    }
    /**
     * 從zk上獲取Service中的節點資料:包括IP和埠
     * @throws KeeperException
     * @throws InterruptedException
     */
    private void updateServer() throws KeeperException, InterruptedException {
        this.serverIp=null;
        this.serverPort=null;
        /*
         * 
         * 判斷服務根節點是否存在
         */
        Stat pathStat = null;
        try {
            pathStat = this.zk.exists("/Service", false);
            // 如果條件成立,說明節點不存在
            // 建立的這個節點是一個“永久狀態”的節點
            if (pathStat == null) {
                ThriftClinet.LOGGER.info("客戶端創立Service");
                this.zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return;
            }
        } catch (Exception e) {
            ThriftClinet.LOGGER.error(e);
            System.exit(-1);
        }
        // 獲取服務列表
        List<String> serviceList = this.zk.getChildren("/Service", false);
        if (serviceList == null || serviceList.isEmpty()) {
            ThriftClinet.LOGGER.info("未發現相關服務,客戶端退出");
            return;
        }
        // 查詢所需的服務是否存在
        boolean isFound = false;
        byte[] data;// 獲取節點資料
        for (String name : serviceList) {
            if (StringUtils.equals(name, this.servername)) {
                isFound = true;
                break;// 找到一個就退出
            }
        }
        // 獲得資料
        if (isFound) {
            data = this.zk.getData("/Service/" + this.servername, false, null);
        } else {
            ThriftClinet.LOGGER.info("未發現相關服務,客戶端退出");
            return;
        }
        if (data == null || data.length == 0) {
            ThriftClinet.LOGGER.info("沒有發現有效資料,客戶端退出");
            return;
        }
        JSONObject fromObject = JSONObject.fromObject(new String(data));
        this.serverIp = fromObject.getString("ip");
        this.serverPort = fromObject.getString("port");
    }

    @Override
    public void process(WatchedEvent event) {
        //建立連線用
        if(event.getState()==KeeperState.SyncConnected){
            connectedSignal.countDown();
            return;
        }
        //如果發生 Service下的節點變換,就更新ip和埠
        if (event.getType() == EventType.NodeChildrenChanged   
                 && "/Service".equals(event.getPath())) {  
           try {
            updateServer();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
         }
    }
    public static void main(String[] args) {
        ThriftClinet studentClinet=new ThriftClinet();
        ThriftClinet peopleClinet=new ThriftClinet();
        /**
         * studnetService 測試
         */
        try {
            studentClinet.init(ServiceSatrt.serviceNames[0]);
            if(studentClinet.serverIp==null||studentClinet.serverPort==null){
                ThriftClinet.LOGGER.info("沒有發現有效資料,客戶端退出");
            }
            //如果是非阻塞型  需要使用
            TTransport tSocket = new TFramedTransport(new TSocket(studentClinet.serverIp,
                    Integer.parseInt(studentClinet.serverPort),  30000));  
            //設定封裝協議
            TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
            //建立呼叫client
            StudentService.Client client=new StudentService.Client(protocol);
            //設定呼叫引數:
            Request request=new Request().setUserName("peace").setPassword("123456");
            //準備傳輸
            tSocket.open();
            //正式呼叫介面
            List<Student> allStudent = client.getAllStudent(request);
            //請求結束,斷開連線
            tSocket.close();
            for(Student student:allStudent)
            {
                System.out.println(student.getNaem()+":"+student.getAge());
            }
        } catch (Exception e) {
            ThriftClinet.LOGGER.info("出現異常,客戶端退出");
        }

        /**
         * PeopleService測試
         */
        try {
            peopleClinet.init(ServiceSatrt.serviceNames[1]);
            if(peopleClinet.serverIp==null||peopleClinet.serverPort==null){
                ThriftClinet.LOGGER.info("沒有發現有效資料,客戶端退出");
            }
            //如果是非阻塞型  需要使用
            TTransport tSocket = new TFramedTransport(new TSocket(peopleClinet.serverIp,
                    Integer.parseInt(peopleClinet.serverPort),  30000));  
            //設定封裝協議
            TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
            //建立呼叫client
            PeopleService.Client client=new PeopleService.Client(protocol);
            //設定呼叫引數:
            Request request=new Request().setUserName("peace").setPassword("123456");
            //準備傳輸
            tSocket.open();
            //正式呼叫介面
            List<People> allPeople = client.getAllPeople(request);
            //請求結束,斷開連線
            tSocket.close();
            for(People people:allPeople)
            {
                System.out.println(people.getNaem()+":"+people.getAge()+"性別"+people.getSex());
            }
        } catch (Exception e) {
            ThriftClinet.LOGGER.info("出現異常,客戶端退出");
        }
    }
}

所有程式碼下載請見github,上面的連結。

3.3測試步驟:

1.啟動ServiceSatrt類
2.啟動ThriftClinet類
3.測試結果:
服務端:
01
02
客戶端:
03
04
本文來自伊豚(blog.wpeace.cn)