Thrift筆記(六)--單端口 多服務
阿新 • • 發佈:2018-07-31
oid con uri try process else ssis int 序列化
多個服務,使用監聽一個端口。先上一個demo
Test.thrift
namespace java com.gxf.thrift enum RequestType { SAY_HELLO, //問好 QUERY_TIME, //詢問時間 } struct Request { 1: required RequestType type; // 請求的類型,必選 2: required string name; // 發起請求的人的名字,必選 3: optional i32 age; // 發起請求的人的年齡,可選 } exception RequestException {1: required i32 code; 2: optional string reason; } // 服務名 service HelloWordService { string doAction(1: Request request) throws (1:RequestException qe); // 可能拋出異常。 } //乘法 service MultiSerivce{ i32 multi(1:i32 n1, 2:i32 n2); }
定義了兩個服務,一個是hello,一個是mult。第一輸出hello信息,第二個做乘法運算。使用thrift命令,生成java代碼
thrift -gen java Test.thrift
Server端,兩個服務實現類
import org.apache.commons.lang3.StringUtils; import java.util.Date; public class HelloWordServiceImpl implements HelloWordService.Iface { // 實現這個方法完成具體的邏輯。 public String doAction(Request request) throws RequestException, org.apache.thrift.TException { System.out.println("Get request: " + request); if (StringUtils.isBlank(request.getName()) || request.getType() == null) { throw new com.gxf.thrift.RequestException(); } String result = "Hello, " + request.getName(); if (request.getType() == com.gxf.thrift.RequestType.SAY_HELLO) { result += ", Welcome!"; } else { result += ", Now is " + new Date().toLocaleString(); } return result; } }
import org.apache.thrift.TException; public class MultiServiceImpl implements MultiSerivce.Iface { @Override public int multi(int n1, int n2) throws TException { return n1 + n2; } }
Service端服務啟動類
import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import java.net.ServerSocket; public class HelloWordServer { public static void main(String[] args) throws Exception { ServerSocket socket = new ServerSocket(7912); TServerSocket serverTransport = new TServerSocket(socket); TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory(); TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor(); multiplexedProcessor.registerProcessor("helloService", new HelloWordService.Processor<>( new HelloWordServiceImpl())); multiplexedProcessor.registerProcessor("multiService", new MultiSerivce.Processor<>( new MultiServiceImpl() )); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); serverArgs.processor(multiplexedProcessor); serverArgs.protocolFactory(proFactory); TServer server = new TThreadPoolServer(serverArgs); System.out.println("Start server on port 7912..."); server.serve(); } }
Client端測試類
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TMultiplexedProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; public class HelloWordClient { public static void main(String[] args) throws Exception { TTransport transport = new TSocket("127.0.0.1", 7912); TProtocol protocol = new TBinaryProtocol(transport); TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "multiService"); transport.open(); MultiSerivce.Client multiClient = new MultiSerivce.Client(tMultiplexedProtocol); int mulRes = multiClient.multi(1, 3); System.out.println("mulRes = " + mulRes); TMultiplexedProtocol helloProtocol = new TMultiplexedProtocol(protocol, "helloService"); HelloWordService.Client helloClient = new HelloWordService.Client(helloProtocol); Request helloRequest = new Request(); helloRequest.setAge(28); helloRequest.setName("guanxiangfei"); helloRequest.setType(RequestType.QUERY_TIME); String helloRes = helloClient.doAction(helloRequest); System.out.println("helloRes: " + helloRes); transport.close(); // 請求結束,斷開連接 } }
下面主要分析Server端源碼
跟進TMultiplexedProcessor類
public class TMultiplexedProcessor implements TProcessor { private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP = new HashMap<String,TProcessor>(); private TProcessor defaultProcessor;
這裏有個map存放 servicename --> processor,接著看註冊源碼
public void registerProcessor(String serviceName, TProcessor processor) { SERVICE_PROCESSOR_MAP.put(serviceName, processor); }
直接在map中放了servicename --> processor。我們跟進serve方法
public void serve() { try { serverTransport_.listen(); } catch (TTransportException ttx) { LOGGER.error("Error occurred during listening.", ttx); return; } // Run the preServe event if (eventHandler_ != null) { eventHandler_.preServe(); } setServing(true); while (!stopped_) { TTransport client = null; TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; ServerContext connectionContext = null; try { client = serverTransport_.accept(); if (client != null) { processor = processorFactory_.getProcessor(client); inputTransport = inputTransportFactory_.getTransport(client); outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); if (eventHandler_ != null) { connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol); } while (true) { if (eventHandler_ != null) { eventHandler_.processContext(connectionContext, inputTransport, outputTransport); } if(!processor.process(inputProtocol, outputProtocol)) { break; } } } } catch (TTransportException ttx) { // Client died, just move on } catch (TException tx) { if (!stopped_) { LOGGER.error("Thrift error occurred during processing of message.", tx); } } catch (Exception x) { if (!stopped_) { LOGGER.error("Error occurred during processing of message.", x); } } if (eventHandler_ != null) { eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol); } if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } } setServing(false); }
有個accept接收到客戶端連接,重點看
if(!processor.process(inputProtocol, outputProtocol)) { break; }
這裏是處理客戶斷傳來的請求,繼續跟進
@Override public boolean process(TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin(); ProcessFunction fn = processMap.get(msg.name); if (fn == null) { TProtocolUtil.skip(in, TType.STRUCT); in.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: ‘"+msg.name+"‘"); out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); x.write(out); out.writeMessageEnd(); out.getTransport().flush(); return true; } fn.process(msg.seqid, in, out, iface); return true; }
獲取客戶端的請求類型和方法,獲取服務端註冊的service。跟進fn.process()看服務端處理過程
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException { T args = getEmptyArgsInstance(); try { args.read(iprot); } catch (TProtocolException e) { iprot.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid)); x.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); return; } iprot.readMessageEnd(); TSerializable result = null; byte msgType = TMessageType.REPLY; try { result = getResult(iface, args); } catch (TTransportException ex) { LOGGER.error("Transport error while processing " + getMethodName(), ex); throw ex; } catch (TApplicationException ex) { LOGGER.error("Internal application error processing " + getMethodName(), ex); result = ex; msgType = TMessageType.EXCEPTION; } catch (Exception ex) { LOGGER.error("Internal error processing " + getMethodName(), ex); if(!isOneway()) { result = new TApplicationException(TApplicationException.INTERNAL_ERROR, "Internal error processing " + getMethodName()); msgType = TMessageType.EXCEPTION; } } if(!isOneway()) { oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid)); result.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); } }
接著跟進getResult方法,看下服務端如何計算結果
public multi_result getResult(I iface, multi_args args) throws org.apache.thrift.TException { multi_result result = new multi_result(); result.success = iface.multi(args.n1, args.n2); result.setSuccessIsSet(true); return result; }
這裏可以看出,調用了服務端實現類對象對應的方法。保存在multi_result對象中,序列化發給客戶端。
總結:
單端口,多服務,主要就是用一個map放service-->processor映射。客戶端傳servicename給服務端
Thrift筆記(六)--單端口 多服務