Tigase 傳送訊息的流程原始碼分析
public void setProperties(Map<String, Object> props){ for (String name : msgrcv_names) { mr = conf.getMsgRcvInstance(name); if (mr instanceof MessageReceiver) { ((MessageReceiver) mr).setParent(this); ((MessageReceiver) mr).start(); } } }
1、當客戶端傳送的message訊息到tigase服務端,每個一SOCKET連線都會被包裝成IOService物件,IOService包含一系列操作socket的方法(接收發送資料等),processSocketData()接收網路資料,由tigase.net處理解析成xml物件,並將packet放到接收佇列receivedPackets中再呼叫serviceListener.packetsReady(this)。由於ConnectionManager實現IOServiceListener介面,實現上呼叫的的是ConnectionManager中的packetsReady()來開始處理資料
ClientConnectionManager.processSocketData(XMPPIOService<Object>serv) JID id = serv.getConnectionId(); //[email protected]//packet 被設定上一些源資訊,和目的地資訊,接下來,這個資料包將會委託給父 MessageRouter 幫忙路由到 SessionManager元件中進行處理packet = (tigase.server.Message) [email protected]/192.168.0.33_5222_192.168.0.33_38624, [email protected], DATA=<message xmlns="jabber:client" id="44grM-176" type="chat" to="[email protected]"><thread>SWjZv5</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=170, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat packet = [email protected]/192.168.0.33_5222_192.168.0.33_38624, [email protected], DATA=<message to="[email protected]" type="chat" id="2jePE-253" xmlns="jabber:client"><thread>7VKMRq</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=168, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat 2、MessageRouter.processPacket(Packet packet)部分程式碼如下:/192.168.0.33_5222_192.168.0.33_38624 p.setPacketFrom(id); //packetFrom 設定為onnectionId p.setPacketTo(serv.getDataReceiver()); //packetTo 設定為sess-man --> SessionManager addOutPacket(p);//將會委託給父 MessageRouter 路由 }
//我們不會處理沒有目標地址的資料包,只是丟棄它們並寫一個日誌訊息 if (packet.getTo() == null) { log.log(Level.WARNING, "Packet with TO attribute set to NULL: {0}", packet); return; } //它不是一個服務發現包,我們必須找到一個處理元件 //下面的程式碼塊是“快速”找到一個元件if //這個包TO 元件ID,格式在以下一項: // 1。元件名+“@”+預設域名 // 2。元件名+“@”+任何虛擬主機名 // 3。元件名+ "."+預設域名 // 4。元件名+ "."+任何虛擬主機名 ServerComponent comp = getLocalComponent(packet.getTo()); //SessionManager comp.processPacket(packet, results);
3、SessionManager.processPacket(final Packet packet)處理,有要程式碼如下。 例如A->B,這樣做的目的是為了首先確定使用者A有許可權傳送packet,然後是確定使用者B有許可權接收資料。如果使用者B不線上,那麼離線訊息處理器會把packet儲存到資料庫當中。
//XMPPResourceConnection session——使用者會話儲存所有使用者會話資料,並提供對使用者資料儲存庫的訪問。它只允許在會話的生命週期內將資訊儲存在永久儲存或記憶體中。如果在分組處理時沒有聯機使用者會話,則此引數可以為空。 XMPPResourceConnection conn = getXMPPResourceConnection(packet); //現在要走SessionManager的處理函式,主要是走外掛流程,外掛在Tigase中也是一個重要的組成,入口就是在這裡,SM plugin processPacket(packet, conn);
插入下SM plugin 流程說明 :
這個設計有一個驚人的結果。如果你看下面的圖片,顯示了兩個使用者之間的通訊,你可以看到資料包被複制了兩次才送到最終目的地: 會話管理器(SessionManager)必須對資料包進行兩次處理。第一次以使用者A的名義將其作為傳出包進行處理,第二次以使用者B的名義將其作為傳入包進行處理。這是為了確保使用者A有許可權傳送一個包,所有的processor都應用到packet上,也為了確保使用者B有許可權接收packet,所有的processor都應用到packet了。例如,如果使用者B是離線的,那麼有一個離線訊息processor應該將包傳送到資料庫,而不是使用者B。protected XMPPResourceConnection getXMPPResourceConnection(Packet p) { XMPPResourceConnection conn = null; //首先根據這個包的發起者,來查詢他的連線資源類,找不到則找接收者的資源類 JID from = p.getPacketFrom(); if (from != null) { conn = connectionsByFrom.get(from); if (conn != null) { return conn; } } //這個接收者它可能是這個伺服器上某個使用者的訊息,讓我們為這個使用者查詢已建立的會話 JID to = p.getStanzaTo(); if (to != null) { if (log.isLoggable(Level.FINEST)) { log.finest("Searching for resource connection for: " + to); } conn = getResourceConnection(to); } else { // Hm, not sure what should I do now.... // Maybe I should treat it as message to admin.... log.log(Level.INFO, "Message without TO attribute set, don''t know what to do wih this: {0}", p); } // end of else return conn; } protected void processPacket(Packet packet, XMPPResourceConnection conn) { ... packet.setPacketTo(getComponentId()); //[email protected] ... if (!stop) { //授權匹配的processor處理packet walk(packet, conn); try { if ((conn != null) && conn.getConnectionId().equals(packet.getPacketFrom())) { handleLocalPacket(packet, conn); } } catch (NoConnectionIdException ex) { ... } } ... }
packetTo被設定為元件ID([email protected]),其值原先也是這個。其中walk(packet, conn)方法,匹配處理器(授權)。對於message,此處匹配到的processor是amp和message-carbons,message-carbons沒有怎麼處理,主要是amp在處理,packet被塞amp的佇列中等待處理。
private void walk(final Packet packet, final XMPPResourceConnection connection) { for (XMPPProcessorIfc proc_t : processors.values()) { XMPPProcessorIfc processor = proc_t; //根據element和xmlns,授權匹配成功的processor Authorization result = processor.canHandle(packet, connection); if (result == Authorization.AUTHORIZED) { .... ProcessingThreads pt = workerThreads.get(processor.id()); if (pt == null) { pt = workerThreads.get(defPluginsThreadsPool); } //packet 放到(addItem)授權了的processor的佇列 if (pt.addItem(processor, packet, connection)) { packet.processedBy(processor.id()); } else { ... } } else { ... } } }WorkerThread.run() 從佇列中取出packet,由SessionManager.process(QueueItem item)給amp處理。 SessionManager.pocess(QueueItem item) 如下:
@Override public void process(QueueItem item) { XMPPProcessorIfc processor = item.getProcessor(); try { //由授權的 processor 處理 packet processor.process(item.getPacket(), item.getConn(), naUserRepository,local_results, plugin_config.get(processor.id())); if (item.getConn() != null) { setPermissions(item.getConn(), local_results); } addOutPackets(item.getPacket(), item.getConn(), local_results); } catch (PacketErrorTypeException e) { ... } catch (XMPPException e) { ... } } //其中processor.process()------> MessageAmp.process(),如下: @Override public void process(Packet packet, XMPPResourceConnection session, NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException { if (packet.getElemName() == "presence") { ... } else { Element amp = packet.getElement().getChild("amp", XMLNS); if ((amp == null) || (amp.getAttributeStaticStr("status") != null)) { messageProcessor.process(packet, session, repo, results, settings); } else { ... } } // 其中messageProcessor.process() --------> Message.process(),如下 @Override public void process(Packet packet, XMPPResourceConnection session, NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException { ... try { ... // 在比較JIDs之前,記住要去除資源部分 id = (packet.getStanzaFrom() != null) ? packet.getStanzaFrom().getBareJID() : null; // 檢查這是否是來自客戶端的資料包 if (session.isUserId(id)) { // 這是來自這個客戶端的資料包,最簡單的操作是轉發到它的目的地: // Simple clone the XML element and.... // ... putting it to results queue is enough results.offer(packet.copyElementOnly()); return; } } catch (NotAuthorizedException e) { ... } // end of try-catch }檢查stanzaFfrom與session匹配通過後,將packet.copyElementOnly()放到results中,作後續投遞,原來的packet 就丟棄了。此時投遞的packet :packetFrom=null,packetTo=null。 packet在SessionManager.addOutPacket(Packet packet)中判斷packetFrom是否為空,為空則將其設定為ComponentId(此處為[email protected]),然後呼叫父類(AbstractMessageReceiver.java) 的addOutPacket(packet)方法塞到out_queue 佇列中。此時packet::[email protected],packetTo=null。
4、上層元件MessageRouter處理,把packet塞到in_queues. 又回到了MessageRouter.processPacket(Packet packet)處理:
不同的是 PacketTo為空,packet.getTo()的返回值是stanzaTo。getLocalComponent(packet.getTo());方法根據stanzaTo與compId、comp name、Component都匹配不到。此時packet會給元件SessionManager處理,Packet will be processed by: [email protected],由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packet packet)加入到in_queues。 5、第二次來到SessionManager.processPacket(final Packet packet)處理。不同的是在getXMPPResourceConnection(packet)方法中,conn = connectionsByFrom.get(from)返回值是null,所以是根據stanzaTo取獲取接收方的session,返回接收方連線的Connection。protected XMPPResourceConnection getXMPPResourceConnection(Packet p) { XMPPResourceConnection conn = null; JID from = p.getPacketFrom(); if (from != null) { conn = connectionsByFrom.get(from); if (conn != null) { return conn; } } // It might be a message _to_ some user on this server // so let's look for established session for this user... JID to = p.getStanzaTo(); if (to != null) { ... conn = getResourceConnection(to); } else { ... } // end of else return conn; }
6、如同步驟3,此時packet作為一個以使用者B的名義將其作為傳入包進行處理。
然後packetTo被設定為元件ID([email protected])
此時packet: packetFrom = [email protected],packetTo [email protected]。
之後packet又經walk(packet, conn)方法,匹配處理器(授權),扔給amp處理。
如同前面: 直到Message.process(),如下:@Override public void process(Packet packet, XMPPResourceConnection session, NonAuthUserRepository repo, Queue<Packet> results, Map<String, Object> settings) throws XMPPException { // For performance reasons it is better to do the check // before calling logging method. if (log.isLoggable(Level.FINEST)) { log.log(Level.FINEST, "Processing packet: {0}, for session: {1}", new Object[] { packet, session }); } // You may want to skip processing completely if the user is offline. if (session == null) { processOfflineUser( packet, results ); return; } // end of if (session == null) try { // Remember to cut the resource part off before comparing JIDs BareJID id = (packet.getStanzaTo() != null) ? packet.getStanzaTo().getBareJID() : null; // Checking if this is a packet TO the owner of the session if (session.isUserId(id)) { if (log.isLoggable(Level.FINEST)) { log.log(Level.FINEST, "Message 'to' this user, packet: {0}, for session: {1}", new Object[] { packet, session }); } if (packet.getStanzaFrom() != null && session.isUserId(packet.getStanzaFrom().getBareJID())) { JID connectionId = session.getConnectionId(); if (connectionId.equals(packet.getPacketFrom())) { results.offer(packet.copyElementOnly()); // this would cause message packet to be stored in offline storage and will not // send recipient-unavailable error but it will behave the same as a message to // unavailable resources from other sessions or servers return; } } // Yes this is message to 'this' client List<XMPPResourceConnection> conns = new ArrayList<XMPPResourceConnection>(5); // This is where and how we set the address of the component // which should rceive the result packet for the final delivery // to the end-user. In most cases this is a c2s or Bosh component // which keep the user connection. String resource = packet.getStanzaTo().getResource(); if (resource == null) { // If the message is sent to BareJID then the message is delivered to // all resources conns.addAll(getConnectionsForMessageDelivery(session)); } else { // Otherwise only to the given resource or sent back as error. XMPPResourceConnection con = session.getParentSession().getResourceForResource( resource); if (con != null) { conns.add(con); } } // MessageCarbons: message cloned to all resources? why? it should be copied only // to resources with non negative priority!! if (conns.size() > 0) { for (XMPPResourceConnection con : conns) { Packet result = packet.copyElementOnly(); result.setPacketTo(con.getConnectionId()); // In most cases this might be skept, however if there is a // problem during packet delivery an error might be sent back result.setPacketFrom(packet.getTo()); // Don't forget to add the packet to the results queue or it // will be lost. results.offer(result); if (log.isLoggable(Level.FINEST)) { log.log(Level.FINEST, "Delivering message, packet: {0}, to session: {1}", new Object[] { packet, con }); } } } else { // if there are no user connections we should process packet // the same as with missing session (i.e. should be stored if // has type 'chat' processOfflineUser( packet, results ); } return; } // end of else // Remember to cut the resource part off before comparing JIDs id = (packet.getStanzaFrom() != null) ? packet.getStanzaFrom().getBareJID() : null; // Checking if this is maybe packet FROM the client if (session.isUserId(id)) { // This is a packet FROM this client, the simplest action is // to forward it to is't destination: // Simple clone the XML element and.... // ... putting it to results queue is enough results.offer(packet.copyElementOnly()); return; } // Can we really reach this place here? // Yes, some packets don't even have from or to address. // The best example is IQ packet which is usually a request to // the server for some data. Such packets may not have any addresses // And they usually require more complex processing // This is how you check whether this is a packet FROM the user // who is owner of the session: JID jid = packet.getFrom(); // This test is in most cases equal to checking getElemFrom() if (session.getConnectionId().equals(jid)) { // Do some packet specific processing here, but we are dealing // with messages here which normally need just forwarding Element el_result = packet.getElement().clone(); // If we are here it means FROM address was missing from the // packet, it is a place to set it here: el_result.setAttribute("from", session.getJID().toString()); Packet result = Packet.packetInstance(el_result, session.getJID(), packet .getStanzaTo()); // ... putting it to results queue is enough results.offer(result); } } catch (NotAuthorizedException e) { log.log(Level.FINE, "NotAuthorizedException for packet: " + packet + " for session: " + session, e); results.offer(Authorization.NOT_AUTHORIZED.getResponseMessage(packet, "You must authorize session first.", true)); } // end of try-catch }
檢查stanzaTo與session匹配通過後,根據session拿到接收方所有的連線(可能多端登陸),然後Packet result = packet.copyElementOnly()生成新的packet(原packet丟棄了),並將packetTo設定為接收方連線的ConnectionId(例如:[email protected]/192.168.0.33_5222_192.168.0.33_38624),通過addOutPacket()方法塞到out_queue佇列。此時packet:packetFrom = [email protected],packetTo [email protected]/192.168.0.33_5222_192.168.0.33_38624。
7、 如同前面步驟2,不同的是根據packetTo匹配到元件 [email protected]8、 元件 [email protected] 從queue中取出packet,分發到目的地
public void processPacket(final Packet packet) { ... if (packet.isCommand() && (packet.getCommand() != Command.OTHER)) { ... } else { // 把packet 傳送給客戶端 if (!writePacketToSocket(packet)) { ... } } // end of else }
後續有時間會不斷更新,歡迎加入QQ群 310790965 更多的交流