1. 程式人生 > >實現PHP伺服器+Android客戶端(Retrofit+RxJava)第六天推送的實現

實現PHP伺服器+Android客戶端(Retrofit+RxJava)第六天推送的實現

廢話不多說,今天來說說近幾天的成果。

  1. 如何實現推送
  2. android客戶端nio的使用
  3. 為啥要使用長連線
  4. 長連線的建立
  5. php伺服器如何實現長連線
  6. 後期需要優化的部分

如何實現推送

推送的原理其實也很簡單,伺服器和客戶端實現長連線,實現了長連線之後就想幹嘛幹嘛了。

android客戶端nio的使用

如果對nio不瞭解,大家可以自己再去了解,我這裡簡單說下:你就是非阻塞的io,相對於我們一般使用的bio來說使用可以更加的靈活吧,nio是事件驅動的,也就是可以讀取的時候讀取的事件響應,可寫的時候寫的事件響應,在讀寫的時候是非阻塞的,也就是如果有100個位元組要讀取,它不會等到你全部讀完再執行下一句,他會立馬返回,這個時候你可能只讀了50個位元組,我們要做的事情就是在下一次讀事件來的時候把另50個位元組讀出來。nio一般是用在伺服器這邊的,它對高併發的支援比較好,客戶端的話就沒有那麼大的意義,不過我這裡還是在客戶端用了(具體是因為我還不知道怎麼再php伺服器上用nio,只能在客戶端這裡先練練手),要把nio講清楚的話可以講一整篇,大家自己去慢慢理解,坑還是有的。

為啥要使用長連線

看過我這個系列之前的幾篇部落格的人應該都知道我這個app使用的基本都是http協議,也就是短連線去請求資料,其實看我這裡用的retrofit就知道是http協議,其實我的app寫下來基本http協議就已經可以滿足我的要求,那為什麼還要長連線呢,老實說,我只是看了這篇部落格攜程App的網路效能優化實踐,有兩點收穫,一個就是網路服務多使用tcp連線,其次就是在業務邏輯多的時候用hybrid(web 和native混合),web就是直接用webview請求html5頁面,native就是使用android原生控制元件寫的介面,hybrid好處很多,但是web部分的效果不是那麼理想,畢竟只是一個頁面,無法達到android原生控制元件的效能。扯了那麼多其他的,進入正題,其實我自己現在總結實現長連線的好處就是伺服器可以推動資訊給客戶端,而且http協議也是tcp連線,而一次連線的建立需要三次握手,消耗還是比較大的,長連線就可以減少http的這種建立連線和斷開連線的操作。

長連線的建立

上面已經說了使用nio,另外還有一點需要注意的是需要一個心跳去維護這個長連線,心跳就是每個一段時間傳送訊息給伺服器,伺服器返回資訊,確保這個長連線沒有中斷。其實最好還得有一個長連線池,進一步減少建立連線的消耗,我這裡只實現瞭如何用nio實現長連線,還沒有實現長連線池,心跳的話明白什麼意思之後還是挺簡單的,就自己去實現,下面的主要程式碼:

package com.sqq.tcpdemo.nioconnect;

import android.util.Log;
import com.sqq.tcpdemo.rxbus.ConnectedEvent;
import com.sqq.tcpdemo.rxbus.DisConnectEvent;
import
com.sqq.tcpdemo.rxbus.GetResponseEvent; import com.sqq.tcpdemo.rxbus.RxBus; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.AccessControlException; import java.util.Iterator; /** * Created by sqq on 2016/8/2. * rxbus+nio */ public class Client implements Runnable { static { try { // Needed for NIO selectors on Android 2.2. // 其實就是防止讀取到ipv6的地址 System.setProperty("java.net.preferIPv6Addresses", "false"); } catch (AccessControlException ignored) { } } final int connectTimeout; final InetAddress connectHost; final int connectTcpPort; final ByteBuffer readBuffer, writeBuffer; private Selector selector; private SelectionKey selectionKey; private SocketChannel clientChannel; private int emptySelects; // private boolean isClientClose = false; private boolean isConnected; public Client(Client.Builder builder) { connectTimeout = builder.timeout; connectHost = builder.host; connectTcpPort = builder.tcpPort; writeBuffer = ByteBuffer.allocate(builder.writeBufferSize); readBuffer = ByteBuffer.allocate(builder.objectBufferSize); isConnected = false; //isClientClose = false; } public void Connect() { Thread clientThread = new Thread(this); clientThread.setDaemon(true); clientThread.start(); } @Override public void run() { try { clientChannel = SocketChannel.open(); Socket socket = clientChannel.socket(); //確保資料及時發出 socket.setTcpNoDelay(true); //表示傳送請求之後10s內沒有接受到資料就斷開客戶端 //socket.setSoTimeout(10000); //下面兩句不能改變順序,為了方便就直接是阻塞模式連線,連線之後再設定非阻塞模式 socket.connect(new InetSocketAddress(connectHost, connectTcpPort), connectTimeout); //boolean connect = clientChannel.connect(new InetSocketAddress(connectHost, connectTcpPort)); clientChannel.configureBlocking(false); try { selector = Selector.open(); } catch (IOException ex) { throw new RuntimeException("Error opening selector.", ex); } if (socket.isConnected()) { //已經連線 selectionKey = clientChannel.register(selector, SelectionKey.OP_READ); isConnected = true; RxBus.getRxBus().send(new ConnectedEvent()); } else { selectionKey = clientChannel.register(selector, SelectionKey.OP_CONNECT); } } catch (IOException e) { e.printStackTrace(); Log.d("Client", e.toString()); //建立連線失敗,這裡應該返回,或者重試之類的 return; } try { checkSelector(); } catch (IOException e) { e.printStackTrace(); Log.d("Client", e.toString()); return; } } public void checkSelector() throws IOException { while (isConnected) { selector.select(); // 獲得selector中選中的項的迭代器 Iterator ite = selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 刪除已選的key,以防重複處理 ite.remove(); // 連線事件發生 if (key.isConnectable()) { //這裡其實不會走到這一步,前面的連線時阻塞的 connected(key); } else if (key.isReadable()) { canRead(key); } else if (key.isWritable()) { canWrite(key); } } } } public void send(String message) { Log.d("Client", message); try { writeBuffer.put(message.getBytes("UTF-8")); writeBuffer.flip(); selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE); selector.wakeup(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); Log.d("Client","解析失敗"); } } private void connected(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key .channel(); // 如果正在連線,則完成連線 if (channel.isConnectionPending()) { channel.finishConnect(); } // 設定成非阻塞 channel.configureBlocking(false); } private void canRead(SelectionKey key) throws IOException{ Log.d("Client", "read"); SocketChannel channel = (SocketChannel) key .channel(); int read = channel.read(readBuffer); if (read == -1) { Log.d("Client", "客戶端已經關閉"); key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); RxBus.getRxBus().send(new DisConnectEvent()); return; } byte[] data = readBuffer.array(); String msg = new String(data).trim(); RxBus.getRxBus().send(new GetResponseEvent(msg)); Log.d("Client", "服務端收到資訊:" + msg); } private void canWrite(SelectionKey key) throws IOException{ Log.d("Client", "write"); SocketChannel channel = (SocketChannel) key .channel(); //channel.write(ByteBuffer.wrap(new String("客戶端:1").getBytes())); //判斷當前位置到上界(limit)是否還有資料沒有寫到通道 while(writeBuffer.hasRemaining()){ channel.write(writeBuffer); writeBuffer.compact(); writeBuffer.flip(); } writeBuffer.clear(); selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); } public void close() { isConnected = false; try { if(selector!=null) selector.close(); if(clientChannel!=null) clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public static final class Builder { int writeBufferSize; int objectBufferSize; /** * 預設5s連線超時 */ int timeout; InetAddress host; int tcpPort; public Builder() { writeBufferSize = 8192; objectBufferSize = 2048; timeout = 5000; host = null; tcpPort = -1; } public Client.Builder setHost(String host) { try { this.host = InetAddress.getByName(host); } catch (UnknownHostException e) { e.printStackTrace(); } return this; } public Client.Builder setTcpPort(int tcpPort) { this.tcpPort = tcpPort; return this; } public Client build() { return new Client(this); } } }

demo地址:demo地址
大家把demo下載下來之後,可以先寫個java伺服器先測試一下,程式碼如下:

`package cn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO服務端
*/
public class NIOServer {
//通道管理器
private Selector selector;

/**
 * 獲得一個ServerSocket通道,並對該通道做一些初始化的工作
 * @param port  繫結的埠號
 * @throws IOException
 */
public void initServer(int port) throws IOException {
    // 獲得一個ServerSocket通道
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    // 設定通道為非阻塞
    serverChannel.configureBlocking(false);
    // 將該通道對應的ServerSocket繫結到port埠
    serverChannel.socket().bind(new InetSocketAddress(port));
    // 獲得一個通道管理器
    this.selector = Selector.open();
    //將通道管理器和該通道繫結,併為該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後,
    //當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}

/**
 * 採用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理
 * @throws IOException
 */
@SuppressWarnings("unchecked")
public void listen() throws IOException {
    System.out.println("服務端啟動成功!");
    // 輪詢訪問selector
    while (true) {
        //當註冊的事件到達時,方法返回;否則,該方法會一直阻塞
        selector.select();
        // 獲得selector中選中的項的迭代器,選中的項為註冊的事件
        Iterator ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey) ite.next();
            // 刪除已選的key,以防重複處理
            ite.remove();
            // 客戶端請求連線事件
            if (key.isAcceptable()) {
                ServerSocketChannel server = (ServerSocketChannel) key
                        .channel();
                // 獲得和客戶端連線的通道
                SocketChannel channel = server.accept();
                // 設定成非阻塞
                channel.configureBlocking(false);

                //在這裡可以給客戶端傳送資訊哦
                channel.write(ByteBuffer.wrap(new String("向客戶端傳送了一條資訊").getBytes()));
                //在和客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權。
                channel.register(this.selector, SelectionKey.OP_READ);

                // 獲得了可讀的事件
            } else if (key.isReadable()) {
                    read(key);
            }

        }

    }
}
/**
 * 處理讀取客戶端發來的資訊 的事件
 * @param key
 * @throws IOException 
 */
public void read(SelectionKey key) throws IOException{
    // 伺服器可讀取訊息:得到事件發生的Socket通道
    SocketChannel channel = (SocketChannel) key.channel();
    // 建立讀取的緩衝區
    ByteBuffer buffer = ByteBuffer.allocate(10);
    channel.read(buffer);
    byte[] data = buffer.array();
    String msg = new String(data).trim();
    System.out.println("服務端收到資訊:"+msg);
    ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
    channel.write(outBuffer);// 將訊息回送給客戶端
}

/**
 * 啟動服務端測試
 * @throws IOException 
 */
public static void main(String[] args) throws IOException {
    NIOServer server = new NIOServer();
    server.initServer(8000);
    server.listen();
}

}
`
上面的程式碼摘自Java NIO原理圖文分析及程式碼實現 ,寫完之後自己再去配置一下埠號、ip之類的就可以執行

php伺服器如何實現長連線

上面的例子執行成功之後最後再來說說怎麼用php實現長連線,php建立長連線的程式碼網上有一堆,這裡我也貼一個:

 <?php   
    //確保在連線客戶端時不會超時   
    set_time_limit(0);   

    //設定IP和埠號   
    $address='10.0.3.114';   
    $port=13448;    //除錯的時候,可以多換埠來測試程式!   

    //建立一個SOCKET   
    if(($sock=socket_create(AF_INET,SOCK_STREAM,SOL_TCP))<0)   
    {   
        echo "socket_create() failed reason:".socket_strerror($sock)."<br>";   
    }   

    //繫結到socket埠   
    if(($ret=socket_bind($sock,$address,$port))<0)   
    {   
        echo "socket_bind() failed reason:".socket_strerror($ret)."<br>";   
    }   

    //開始監聽   
    if(($ret=socket_listen($sock,4))<0)   
    {   
    echo "socket_listen() failed reason:".socket_strerror($ret)."<br>";   
    }   

    do {   
         if (($msgsock = socket_accept($sock)) < 0)   
         {   
             echo "socket_accept() failed: reason: " . socket_strerror($msgsock) . "/n"; 
             echo "/nThe Server is Stop……/n";
             break;   
         }   

         //發到客戶端   
         $msg ="Welcome To Server!";   
         socket_write($msgsock, $msg, strlen($msg));
         ob_flush();
         flush();
         socket_close($msgsock);   

         echo "/nThe Server is running……/n";
         printf("/nThe Server is running……/n");
    } while (true);   

    socket_close($sock);

上面的程式碼只是簡單的例子,並沒有建立長連線,傳送了訊息之後就關閉了連線。
其次需要注意的就兩點,一點是需要先在php.ini這個配置檔案中開啟socket的支援,具體做法:在php.ini找到extension=php_sockets.dll一句將前面的;去掉,還有一點要注意的就是ip地址,如果你是在本機上做的測試只需要寫127.0.0.1就可以,如果是和android客戶端(不是模擬器)建立連線就需要寫本機的ip。
最後要說的是,我自己伺服器端的實現是用了workerman框架,還是比較好用的

後期需要優化的部分

大家下載我的demo之後,主要看Client.java就可以,裡面已經實現長連線池,但總覺得有問題,還需要測試,心跳的話已經在伺服器端做了(框架中自帶了)。其次就是資料解析,我一直在考慮使用json還是用其他的,目前我用的是json解析,解析之後再做二次判斷,有些麻煩。目前整個專案我也已經開源。地址:我的專案