1. 程式人生 > >如何解決本地大批量資料的更新,和後臺的同步,講解socket的IPC和socket的通訊

如何解決本地大批量資料的更新,和後臺的同步,講解socket的IPC和socket的通訊

說這個問題首先我先說下這個業務的使用場景。隨著網際網路的發展進入了下半場,有以前的app大而且多的局面滿滿的走向精而細的劃分,每一個app的如何基於大資料統計使用者行為是衡量一款產品的優劣標準之一,因為這些資料驅動老闆、產品、市場、運營的業務決策,深度瞭解你的使用者行為,評估營銷效果,優化產品體驗,提升運營效率,在探索不同業務的關鍵行為中,洞察指標背後掩藏的故事。對產品的定位和改進是非常重要的因素,接下來我就圍繞我們開發無埋點統計中遇到的問題跟大家交流產品的統計資料遇到的解決方案:

  • 本部落格不介紹如何實現無埋點統計,這裡我講的是資料的處理,至於無埋點如何插樁,這個點不是今天我們要討論的,以後有機會可以跟大家詳細的分享安卓AOP的相關知識點
  • 如何實現後臺與前端的介面統一,可以再web端動態的控制選擇埋點,這個在以前的博文稍微有介紹:就是頻繁的截圖
  • 基於第二個問題,頻繁的截圖必然產生大量的資料,資料如何進行通訊

今天的文章就是圍繞第三個問題進行的拓展和延伸

本地截圖的大量資料如何進行傳輸:

思路:

  1. 因為產品在開發完版本以後讓老闆、產品、市場、運營可以再後端web端動態控制,他們不懂程式,只知道圖形化的操作,那麼我們需要把這個app介面的圖形化操作傳到伺服器,讓其進行選擇,可以通過本地的adb命令Socket的IPC程序通訊完成但是這裡的侷限性是手機和伺服器是不同的機器,如何通訊?
  2. 可能有人說socket可以支援TCP/IP通訊,是的,沒問題,如果僅僅是這個問題那麼也沒有必要寫部落格了,今天主要是要根據這個我們能不能做成socket如何通過IPC完成資料的上傳工作

圍繞這個問題之前我們先回顧下一些知識點:

首先說下執行緒間通訊的幾種方式:

1:使用管道流Pipes

“管道”是java.io包的一部分。它是Java的特性,而不是Android特有的。一條“管道”為兩個執行緒建立一個單向的通道。生產者負責寫資料,消費者負責讀取資料。

public class SecAct extends Activity {

    private static final String TAG = "PipeExampleActivity";
    private EditText editText;

    PipedReader r;
    PipedWriter w;

    private Thread workerThread;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_sec);

        r = new PipedReader();
        w = new PipedWriter();

        try {
            w.connect(r);
        } catch (IOException e) {
            e.printStackTrace();
        }
        editText = (EditText) findViewById(R.id.edit_text);
        editText.addTextChangedListener(new TextWatcher() {
            @Override
            public void beforeTextChanged(CharSequence charSequence, int start, int count, int after) {
            }

            @Override
            public void onTextChanged(CharSequence charSequence, int start, int before, int count) {
                try {
                    if (count > before) {
                        w.write(charSequence.subSequence(before, count).toString());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void afterTextChanged(Editable editable) {
            }
        });

        workerThread = new Thread(new TextHandlerTask(r));
        workerThread.start();
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        workerThread.interrupt();
        try {
            r.close();
            w.close();
        } catch (IOException e) {
        }
    }

    private static class TextHandlerTask implements Runnable {
        private final PipedReader reader;

        public TextHandlerTask(PipedReader reader) {
            this.reader = reader;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    int i;
                    while ((i = reader.read()) != -1) {
                        char c = (char) i;

                        Log.d(TAG, "char = " + c);
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

2:共享記憶體

說到這個我們就得聊到java的記憶體模型了。

java記憶體模型是什麼呢?它規範了java虛擬機器如何與計算機記憶體的協同工作

shixinzhang

  • 堆為JVM內所有的執行緒共享,存在記憶體中所有的物件和陣列資料
  • 棧為每個執行緒所有,棧中存放了當前方法的呼叫資訊以及基本資料型別和引用資料型別的資料

shixinzhang

java中的堆,堆在虛擬機器啟動的時候建立。堆佔用的記憶體是垃圾回收器回收,不用我們手動回收。

JVM沒有規定死使用哪種回收機制,不同的虛擬機器可以實現不同的回收演算法。

堆中包含了java程式建立的所有的物件, 不論是哪個執行緒。

一個物件的成員變數隨著這個物件自身存放在堆上。不管這個成員變數是基本型別還是引用型別。

java中的棧

棧線上程建立的時候建立,它和C語言的棧類似,在一個方法中,你建立的區域性變數和部分結果都會儲存在棧中,並在方法呼叫和返回中起作用。當前的棧只對當前的執行緒可見,即便兩個執行緒執行同樣的程式碼,這兩個執行緒仍然會在自己的執行緒棧中建立一個本地副本。

因此每一個執行緒擁有每個本地變數的獨有版本。

棧中儲存方法呼叫棧、基本型別的資料、以及物件的引用。

計算機中的記憶體、暫存器、快取

一個現代計算機通常由兩個或者多個 CPU,每個 CPU 都包含一系列的暫存器,CPU 在暫存器上執行操作的速度遠大於在主存上執行的速度。

每個 CPU 可能還有一個 CPU 快取層。CPU 訪問快取層的速度快於訪問主存的速度,但通常比訪問內部暫存器的速度還要慢一點。

通常情況下,當一個 CPU 需要讀取主存時,它會將主存的部分讀到 CPU 快取中。它甚至可能將快取中的部分內容讀到它的內部暫存器中,然後在暫存器中執行操作。

當 CPU 需要將結果寫回到主存中去時,它會將內部暫存器的值重新整理到快取中,然後在某個時間點將值重新整理回主存。  

多執行緒可能出現的問題 通過上述介紹,我們可以知道,如果多個執行緒共享一個物件,每個執行緒在自己的棧中會有物件的副本。

如果執行緒 A 對物件中的某個變數進行修改後還沒來得及寫回主存,執行緒 B 也對該變數進行了修改,那最後重新整理回主記憶體後的值一定和期望的值不一致。

就好比我和你同時開發同一模組程式碼,拭心下筆如有神不一會兒搞定了註冊登入並且提交,小翔沒有從伺服器拉程式碼就矇頭狂寫,最後一 pull 程式碼,就會發現自己寫的好多都跟伺服器上的衝突了!

競態條件與臨界區 當多個執行緒操作同一資源時,如果對資源的訪問順序敏感,就稱存在競態條件。導致競態條件發生的程式碼區稱作臨界區。

在臨界區中使用適當的同步就可以避免競態條件,比如 synchronized, 顯式鎖和原子操作類等。

記憶體可見性 我寫的程式碼你無法立即看到,這就是所謂的“記憶體可見性”問題。

為了讓執行緒 A 對變數做的修改執行緒 B 立即可以看到,我們可以使用 volatile 修飾變數或者對修改操作使用同步。  

當執行緒訪問某一個物件時候值的時候:  首先通過物件的引用找到對應在堆記憶體的變數的值;  然後把堆記憶體變數的具體值 load 到執行緒工作記憶體中,建立一個變數副本;  之後執行緒就不再和物件在堆記憶體變數值有任何關係,而是直接修改副本變數的值,在修改完之後也不會立即同步修改共享堆記憶體中該變數的值;  直到某一個時刻(執行緒退出之前),自動把執行緒變數副本的值回寫到物件在堆中變數。這樣在堆中的物件的值就產生變化了。  

多個執行緒共享同一份記憶體,就是說,一個變數可以同時被多個執行緒所訪問。這裡要特別注意同步和原子操作的問題。

synchronized(this) {
    while(isConditionFullfilled == false) {
        wait();
    }
    notify();
}

因為如果不加同步或者原子性可能會出現,這個執行緒修改了,另外一個執行緒沒法讀到。

3:使用Hander和Message

上面講的是執行緒間通訊的問題,接下來再來說說程序間通訊的問題。而且今天主要給大家講解android有哪些可以跨程序通訊的機制,android系統歸根到底還是一個Linux系統,Linux系統有著非常成熟完善的跨程序通訊的機制,比如:管道,System V,Socket等

linux下的程序通訊手段基本上是從Unix平臺上的程序通訊手段繼承而來的

下面分別講解這幾個,以及為什麼有了這些還要共有跨程序機制還有哪些特有的機制.

Socket是目前用於最廣泛的程序間通訊的機制,他與其他Linux通訊機制不同的地方在於除了可以用於單機內的程序間通訊以外,還可以用於不同機器的程序間通訊,但是Socket本身不支援同時等待或者超時處理,所以他不能直接用來多程序之間的相互實時通訊。在我們開發的專案中使用的Socket的程序通訊方法是,建立一個程序專門用於通訊伺服器(Server)來中轉各個程序間的通訊,它首先啟動一個用來監視連線要求的listening Socket,並把它的描述(Descriptor)號加入到一個事先定義好的fd_set的集合中,這個fd_set的集合用來存放listening Socket和後來生成的通訊Socket的描述號。Server運用system call select來實時檢查是否有資料到達這個集合中的任何一個socket,如果有資料到達listening Socket,則這一定是客戶端發起的連線請求,於是生成一個新的通訊Socket與該客戶端連線,將生成的Socket描述號加入到fd_set的集合中,將客戶端的ID號和與之對應的Socket的描述號記錄在ID登記表中。如果有資料到達某個通訊Socket,則這一定是某個客戶端發起的通訊請求,讀出資料並取出收信客戶端ID號,在ID登記表中找到與之對應的Socket描述號,將資料通過對應Socket傳送到收信客戶端。

其他各個程序作為作為客戶端,(client)。客戶端的動作是首先建立通訊Socket連線伺服器端,然後通過通訊Socket進行送信和收信。

首先給出Server端的程式,在這裡假設有兩個客戶端要進行實時通訊,ClientA向ClientB傳送字元1,ClientB向ClientA傳送字元2。

#include  <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <netinet/in.h>
int main()
{
  int        rcd ;
  struct sockaddr_un  server_sockaddr ;
  int        backlog ;
  ushort        ci ;
  int        watch_fd_list[3] ;
  fd_set        catch_fd_set ;
  fd_set        watchset ;
  int    new_cli_fd ;
  int   maxfd;
  int    socklen ,server_len;
  struct sockaddr_un  cli_sockaddr ;
  struct {
    char  module_id ;  /* Module ID    */
    int  cli_sock_fd ;  /* Socket ID    */
  } cli_info_t[2] ;
   
  for (ci=0;ci<=1;ci++)
    cli_info_t[ci].cli_sock_fd=-1;
     
  for (ci=0;ci<=2;ci++)
    watch_fd_list[ci]=-1;  
     
  int server_sockfd,client_sockfd;
   
  server_sockfd = socket( AF_UNIX, SOCK_STREAM, 0 ) ;
  server_sockaddr.sun_family = AF_UNIX ;
  strcpy( server_sockaddr.sun_path, "server_socket" ) ;
  server_len=sizeof(server_sockaddr);
  rcd = bind( server_sockfd, ( struct sockaddr * )&server_sockaddr, server_len ) ;
   
  backlog = 5 ;
  rcd = listen( server_sockfd, backlog ) ;
  printf("SERVER::Server is  waitting on socket=%d \n",server_sockfd);
  watch_fd_list[0]=server_sockfd;
  FD_ZERO( &watchset ) ;
  FD_SET( server_sockfd, &watchset ) ;
  maxfd=watch_fd_list[0];

在上面的程式中,Server生成listening Socket(server_sockfd),初始化Socket監視集合(watchset),並將listening Socket放入Socket監視集合中。

while (1){
char ch;
int fd;
int nread;
 
catch_fd_set=watchset;
rcd = select( maxfd+1, &catch_fd_set, NULL, NULL, (struct timeval *)0 ) ;

在上面的程式中,Server運用系統呼叫函式 select來實時檢查是否有資料到達Socket監視集合中的任何一個socket。

  if ( rcd < 0 ) {
  printf("SERVER::Server 5 \n");
  exit(1);
}
if ( FD_ISSET( server_sockfd, &catch_fd_set ) ) {
  socklen = sizeof( cli_sockaddr ) ;
  new_cli_fd = accept( server_sockfd, ( struct sockaddr * )
    &( cli_sockaddr ), &socklen ) ;
  printf(" SERVER::open communication with  Client %s on socket %d\n", 
       cli_sockaddr.sun_path,new_cli_fd);  
 
  for (ci=1;ci<=2;ci++){
    if(watch_fd_list[ci] != -1) continue;
    else{  
      watch_fd_list[ci] = new_cli_fd;
      break;
    }  
  }  
  FD_SET(new_cli_fd , &watchset ) ;
  if ( maxfd < new_cli_fd ) {
    maxfd = new_cli_fd ;
  }
   
  for ( ci=0;ci<=1;ci++){
    if(cli_info_t[ci].cli_sock_fd == -1) {
      cli_info_t[ci].module_id=cli_sockaddr.sun_path[0];
      cli_info_t[ci].cli_sock_fd=new_cli_fd;
      break;
    }    
  }
     
  continue;  
}

在上面的程式中,Server運用系統呼叫函式FD_ISSET來檢查是否有客戶端的連線請求到達Listening Socket, 如果返回值大於0,Server生成一個新的通訊Socket (new_cli_fd)與客戶端連線。將新生成的通訊Socket放入Socket監視集合中(FD_SET)。將客戶端的資訊(ID號和Socket描述號)儲存在登錄檔cli_info_t中

    for ( ci = 1; ci<=2 ; ci++ ) {
      int      dst_fd = -1 ;
      char      dst_module_id;
      char       src_module_id;
      int    i;
      if (watch_fd_list[ ci ]==-1) continue;
      if ( !FD_ISSET( watch_fd_list[ ci ], &catch_fd_set ) ) {
        continue ;
      }
      ioctl(watch_fd_list[ ci ],FIONREAD,&nread);
      if (nread==0){
        continue;
      }  
      read( watch_fd_list[ ci ], &dst_module_id, 1 ) ;
      for (i=0;i<=1;i++){
        if(cli_info_t[i].module_id == dst_module_id) 
          dst_fd=  cli_info_t[i].cli_sock_fd;  
         if(cli_info_t[i].cli_sock_fd==watch_fd_list[ ci ]) 
        src_module_id=  cli_info_t[i].module_id;    
      }
      read( watch_fd_list[ ci ], &ch, 1 ) ;
      printf("SERVER::char=%c to  Client %c on socket%d\n",ch, dst_module_id,dst_fd);  
      write(dst_fd,&src_module_id, 1 ) ;
      write(dst_fd,&ch, 1 ) ;
    }
  }  
}

在上面的程式中,如果有資料到達某個通訊Socket,Server則讀出資料並取出收信客戶端ID號。在ID登記表中找到收信客戶端對應的Socket描述號。並將資料通過對應Socket傳送到收信客戶端

給出客戶端 ClientA的程式

ClientB的程式只需將 char dst_module_id='B'; 改為char dst_module_id='A'; char ch='1'; 改為char char ch='2';既可。

#include  <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <sys/un.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
int main(){
     
    int client_sockfd;
    int len;
    struct sockaddr_un server_sockaddr,cli_sockaddr;
    int result;
    char dst_module_id='B';
    char ch='1';
    char src_module_id;
     
     
    client_sockfd= socket(AF_UNIX,SOCK_STREAM,0);
     
    cli_sockaddr.sun_family = AF_UNIX ;
    strcpy( cli_sockaddr.sun_path, "A" ) ;
    bind(client_sockfd,(struct sockaddr * )&cli_sockaddr, sizeof( cli_sockaddr ) ) ;
    server_sockaddr.sun_family=AF_UNIX;
    strcpy( server_sockaddr.sun_path, "server_socket" ) ;
    len=sizeof(server_sockaddr);
     
    result = connect(client_sockfd,( struct sockaddr * )&server_sockaddr,len);
    if (result <0){
        printf("ClientA::error on connecting \n");  
        exit(1);
    }
     
    printf("ClientA::succeed in connecting with server\n");
    sleep(10);
    write(client_sockfd,&dst_module_id,1);
    write(client_sockfd,&ch,1); 
    read (client_sockfd,&src_module_id,1); 
    read (client_sockfd,&ch,1); 
    printf("ClientA::char from  Client %c =%c\n", src_module_id,ch);    
    close (client_sockfd);
     
}

下面是樣本程式的執行結果

[[email protected] test]# ./server &
[3] 4301
[[email protected] test]# SERVER::Server is  waitting on socket=3
./clientA & ./clientB &
[4] 4302
[5] 4303
ClientA::succeed in connecting with server
 SERVER::open communication with  Client A on socket 4
[[email protected] test]#  SERVER::open communication with  Client B on socket 5
ClientB::succeed in connecting with server
SERVER::char=1 to  Client B on socket5
ClientB::char from  Client A =1
SERVER::char=2 to  Client A on socket4
ClientA::char from  Client B =2

為什麼使用

AF_UNIX Socket伺服器呢?

  1. 它更容易,因為埠不能被其他任何東西使用
  2. 減少開銷
  3. 偵聽埠往往會使USB繫結在某些裝置上無法使用
  4. 更好的隔離; 在裝置上執行的常規應用程式無法連線到抽象Socket,也無法通過網路連線
  5. 使所有埠免費供其他程式使用
  6. 某些裝置(例如三星)不允許您在/ data / local / tmp中建立常規Socket檔案,因此無法使用Socket檔案

為什麼不用AF_INET而使用AF_UNIX?

AF_INETåéä¿¡è¿ç¨

TCP/IP四層模型的通訊原理

傳送方和依賴方依賴IP:port來標識,將本地的socket繫結到對應的IP埠上,傳送資料時,指定IP埠,經過Internet,可以通過此IP埠最終找到接收方,接收資料時,可以從對方的資料包中找到對方的ip,

傳送方通過系統呼叫send()將原始資料傳送到作業系統核心緩衝區中。核心緩衝區從上到下依次經過TCP層、IP層、鏈路層的編碼,分別新增對應的頭部資訊,經過網絡卡將一個數據包傳送到網路中。經過網路路由到接收方的網絡卡。網絡卡通過系統中斷將資料包通知到接收方的作業系統,再沿著傳送方編碼的反方向進行解碼,即依次經過鏈路層、IP層、TCP層去除頭部、檢查校驗等,最終將原始資料上報到接收方程序。  

AF_UNIX域socket的通訊過程。

典型的本地ipc,類似於管道,依賴路徑名標識傳送方和接收方,即傳送資料時,指定接收方繫結的路徑名,作業系統根據該路徑名直接找到對應的接收方,並將原始資料直接拷貝到接受方的核心緩衝區,並上報給接受方的程序進行處理,同樣接收方可以從收到的資料包獲取傳送方的路徑名,並通過此路徑向其傳送資料。

AF_UNIXåéä¿¡è¿ç¨

相同點:

作業系統提供的介面socket(),bind(),connect(),accept(),send(),recv(),以及用來對其進行多路複用事件檢測的select(),poll(),epoll()都是完全相同的。收發資料的過程中,上層應用感知不到底層的差別。

不同點:

1:建立的socket傳遞的地址域不同,以及bind()的地址結構稍有區別

socket傳遞不同的地址域AF_INET和AF_UNIX

bind的地址結構分別為sockaddr_in(指定IP埠)和sockaddr_un(指定的路徑名)

2:AF_INET需要經過多個協議的編解碼,消耗系統的cpu,並且資料傳輸需要經過網絡卡,收到網絡卡寬頻的限制。

AF_UNIX資料到達核心緩衝區,由核心根據指定路徑名找到接收方socket對應的核心緩衝區,直接將資料拷貝過去,不經過協議層的編解碼,節省cpu,並且不經過網絡卡,因此不受網絡卡寬頻的限制。

3:AF_UNIX的傳輸速率遠遠的大於AF_INET

4:AF_INET不可以作為本機的跨程序通訊,同樣的可以用於不同機器的通訊,其就是為了在不同機器間進行網路互聯傳遞資料而生,而AF_UNIX僅僅可以用於本機內程序間的通訊。

使用場景:

AF_UNIX由於其對系統cpu的較少消耗,不受限於網絡卡頻寬,及高效的傳遞速率,本機通訊則首選AF_UNIX域。

AF_INET則用於跨機器之間的通訊。

這裡其實我有個拓展就是我們安卓的裝置也可以變相的完成不同機器的通訊?如果去操作呢?

我們可以通過埠的轉發完成本地pc機器連結到手機裝置,完成資料傳輸,但是這個必須得通過usb介面

然後在把這個pc的埠開發出去和伺服器連結可以完成這個變相的程序間通訊

使用TCP埠進行forward之外,我們還可以使用unix domain socket進行forward:

$ adb forward localfilesystem:socket dev:/dev/block/mmcblk0p6

好啦,基本上完成了我們本地的資料傳輸。完成了這些以後產品交接完以後我們可以讓老闆、產品、市場、運營在區域網內開放本地埠就可以做資料的傳輸,當然這裡至於如何開放埠等一些命令,我們會在手機端的連結的時候通過指令碼自動完成,無需擔心: