1. 程式人生 > >thrift學習第二步,一個簡單的雙向通訊demo以及原理講解,以及注意事項

thrift學習第二步,一個簡單的雙向通訊demo以及原理講解,以及注意事項

經過一番研究後,摸清楚了thrift的基本使用方法和其工作原理.thrift的確是封裝了java原生Socket,所以從根本上來講是可以實現雙向通訊的,正如上一篇文章中翻譯的外文中所說的,無需建立雙連線和輪詢,只需要做很少量的工作這個目的就可以達到先談談thrift的使用步驟,首先我們需要根據thrift規定的語法來書寫一個.thrift文件,比如:

namespace java com.zyt.student

struct Student { 
   1: string name;
} 


service Zthrift { 
   oneway void send(Student msg)
   }

這個文件將來會被用來生成用來呼叫的.java檔案.正如官網上所說,我們需要下載一個thrift編譯器,用來將這個文件編譯成我們能看懂的.java檔案.具體怎麼做就不贅述了,官網都有,屬於基礎知識.編譯後我們將得到一個ZThrift.java檔案.
然後就是呼叫過程,首先在客戶端,我們需要一個TSocket(使用起來和原生Socket很類似),只是thrift隱藏了許多細節,引入了thrift獨有的資料傳輸協議(或者理解為格式).然後使用上面生成的檔案中的一個叫做Client的內部類來作為我們傳送資料的類,這個類中的send方法對應上面我們寫的(帶有返回值的方法在雙向通訊時會報錯,具體原因可能是由於記憶體溢位造成的,畢竟雙方都變成了回覆方,無休止的來回傳送對虛擬機器來說是致命的).通常我們在使用thrift的時候,一般步驟是先初始化協議,然後建立ZThrift.Client的物件,然後開啟連線:TSocket.open(),然後使用client的send方法去傳送資料.但是我們要實現雙向通訊,就必須既有傳送程式,又有接收程式,這就要求必須再寫一段程式來接收服務端返回的資料.幸運的是,在我們上面的ZThrift類中,有個叫做Processor的類,它繼承自TBaseProcessor類,這個類有一個”阻塞式”方法可以用來監控服務端是否發回資料:process(TProticol in,TProtocol out).只要將我們客戶端的TSocket傳入TProtocol的建構函式裡面,就可以起到監控作用.而且由於這個方法是阻塞式的,我們有必要新開啟一個執行緒來執行這個工作.
客戶端的工作量其實是很少的,基本上就是在原有的基礎上新加了一個執行緒來接收伺服器返回的資料而已.而服務端就有點坑爹了,為什麼呢?因為一臺電腦要與伺服器通訊,只需要知道伺服器的ip即可,但是反過來就沒那麼容易了,我們的個人電腦和各種終端往往是置於內網當中,這使得通訊變得艱難起來(這就是NAT埠對映技術出現的原因).但是會編寫程式的人都知道,使用Socket就可以實現雙向通訊,即只需要一個來自客戶端的Socket,我們就可以開啟通往客戶端的道路.因此我們要做的,就是在服務端攔截到這個來自客戶端的Socket.通過研究發現,TSocket底層是封裝了Socket的,因此要攔截Socket就可以從TSocket來下手,從上一期的文章中已經得知,Thrift提供了一個叫做TProcessorFactory的類,這個類可以用來處理客戶端發來的資料,其中有個方法叫做getProcessor(TTransport),它給我們提供了攔截Socket的機會,為什麼呢,因為TSocket的父類的父類就是這個TTransport,也就是說,這個TTransport是來自於客戶端的.那麼接下來,我們可以將這個TTransport攔截下來,用來構建一個ZThrift.Client的物件,然後我們就可以使用這個Client來給客戶端回訊息了.接下來怎麼做,相信大家也明白了吧,下面貼上原始碼:
客戶端:

package test;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

import com.thrift.common.ZThrift;
import
com.thrift.common.ZThrift.Iface; import com.thrift.common.Student; public class ZClient { public static void main(String[]args){ TSocket tSocket=new TSocket("192.168.1.173",9999); ZThrift.Client client=new ZThrift.Client(new TBinaryProtocol(tSocket)); try { tSocket.open(); new Thread(new Runnable() { @Override public void run() { ZThrift.Processor<Iface> mp=new ZThrift.Processor<Iface>(new Iface() { @Override public void send(Student msg) throws TException { System.out.println("收到訊息:"+msg); } }); try { while(mp.process(new TBinaryProtocol(tSocket), new TBinaryProtocol(tSocket) )){ //阻塞式方法,不需要內容 } } catch (TException e) { System.out.println("連線已斷開..."); e.printStackTrace(); } } }).start(); client.send(new Student("小明")); } catch (TTransportException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (TException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

然後是服務端:

package test;

import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;

import com.thrift.common.ZThrift;
import com.thrift.common.ZThrift.Iface;
import com.thrift.common.Student;

public class ZServer {

    public static void main(String[]args){
        try {
            TServerSocket tServerSocket=new TServerSocket(9999);
            TThreadPoolServer.Args a=new TThreadPoolServer.Args(tServerSocket);
            TBinaryProtocol.Factory factory=new TBinaryProtocol.Factory();
            TProcessorFactory tProcessorFactory=new TProcessorFactory(null){
                public TProcessor getProcessor(org.apache.thrift.transport.TTransport tTransport) {
                    new Thread(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                Thread.sleep(5000);//延時五秒回覆
                            } catch (InterruptedException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                            }
                            ZThrift.Client client=new ZThrift.Client(new TBinaryProtocol(tTransport));//這裡可以把client提取作為成員變數來多次使用
                            try {
                                client.send(new Student("小紅"));
                            } catch (TException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }).start();


                    return new ZThrift.Processor<Iface>(new Iface() {


                        @Override
                        public void send(Student msg)
                                throws TException {
                            System.out.println(msg.toString());
                        }

                    });
                };
            };
            a.protocolFactory(factory);
            a.processorFactory(tProcessorFactory);
            TThreadPoolServer tThreadPoolServer=new TThreadPoolServer(a);
            System.out.println("start server...");
            tTheadPoolServer.serve();
        } catch (TTransportException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

以上程式碼供大家參考.