1. 程式人生 > >grpc源碼分析之域名解析

grpc源碼分析之域名解析

spl 取ip地址 read fun 一個 通過 base 設置 sched

環境:

  win7_x64,VS2015、grpc_1.3.1

場景:

  在客戶端中使用grpc連接服務器,在多次輸入非法的地址後,再次輸入正確的地址連出現連接超時的現象。侯捷先生說過“源碼面前,了無秘密”,所以開始分析grpc源碼

使用GRPC進行連接的例子:

///< 創建通道
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel("127.0.0.1:8080", grpc::InsecureChannelCredentials());
///< 3秒超時
gpr_timespec tm_out{3, 0, GPR_TIMESPAN};
///< 等待連接 bool connected = channel->WaitForConnected<gpr_timespec>(tm_out); if(connected) { std::cout << "connect success!" << std::endl; } else { std::cout << "connect fail!" << std::endl; }

分析GRPC域名解析過程:

一、創建通道

1.1 創建通道證書

  有非安全的InsecureChannelCredentials,還有一個安全的SecureChannelCredentials,這裏我們使用非安全的通道

grpc::InsecureChannelCredentials()

1.2 創建通道,在grpc_channel_create_with_builder(channel.c)函數中

grpc_channel *grpc_channel_create_with_builder(
    grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
    grpc_channel_stack_type channel_stack_type) {
  
  ......
  channel->target = target;
  channel->is_client =
grpc_channel_stack_type_is_client(channel_stack_type); ...... grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { if (args->args[i].type != GRPC_ARG_STRING) { gpr_log(GPR_ERROR, "%s ignored: it must be a string", GRPC_ARG_DEFAULT_AUTHORITY); } else { channel->default_authority = grpc_mdelem_from_slices( exec_ctx, GRPC_MDSTR_AUTHORITY, grpc_slice_intern( grpc_slice_from_static_string(args->args[i].value.string))); } } ...... } done: grpc_channel_args_destroy(exec_ctx, args); return channel; }

  1.2.1 設置通道連接的目標(即服務器地址)

  1.2.2 設置通道類型為客戶端通道(GRPC_CLIENT_CHANNEL),有6種通道類型,有一些是負載均衡使用的,在channel_stack_type.h文件中定義。

  1.2.3 設置默認的權限

二、域名解析

2.1 開始解析(dns_resolver.c)

static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
                                       dns_resolver *r) {
  GRPC_RESOLVER_REF(&r->base, "dns-resolving");
  GPR_ASSERT(!r->resolving);
  r->resolving = true;
  r->addresses = NULL;
  grpc_resolve_address(
      exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
      grpc_closure_create(dns_on_resolved_locked, r,
                          grpc_combiner_scheduler(r->base.combiner, false)),
      &r->addresses);
}

  grpc_resolve_address就是一個函數指針,在不同的平臺下,指向不同的函數;在windows平臺下則指向resolve_address_impl函數(resolve_address_windows.c)

2.2 設置解析名稱並在另一線程進行請求

static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
                                 const char *default_port,
                                 grpc_pollset_set *interested_parties,
                                 grpc_closure *on_done,
                                 grpc_resolved_addresses **addresses) {
  request *r = gpr_malloc(sizeof(request));
  grpc_closure_init(&r->request_closure, do_request_thread, r,
                    grpc_executor_scheduler);
  r->name = gpr_strdup(name);
  r->default_port = gpr_strdup(default_port);
  r->on_done = on_done;
  r->addresses = addresses;
  grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}

  解析過程是通過do_request_thread函數完成的,這裏進行打包,在另一個地方進行調用

static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
                          grpc_error *error) {
  gpr_mu_lock(&g_executor.mu);
  if (g_executor.shutting_down == 0) {
    grpc_closure_list_append(&g_executor.closures, closure, error);
    maybe_spawn_locked();
  }
  gpr_mu_unlock(&g_executor.mu);
}

  添加解析操作到全局變量g_executor.closures中,然後調用maybe_spawn_locked函數。  

static void maybe_spawn_locked() {
  if (grpc_closure_list_empty(g_executor.closures) == 1) {
    return;
  }
  if (g_executor.shutting_down == 1) {
    return;
  }

  if (g_executor.busy != 0) {
    /* Thread still working. New work will be picked up by already running
     * thread. Not spawning anything. */
    return;
  } else if (g_executor.pending_join != 0) {
    /* Pickup the remains of the previous incarnations of the thread. */
    gpr_thd_join(g_executor.tid);
    g_executor.pending_join = 0;
  }

  /* All previous instances of the thread should have been joined at this point.
   * Spawn time! */
  g_executor.busy = 1;
  GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
                         &g_executor.options));
  g_executor.pending_join = 1;
}

  如果隊列g_executor.closures為空或者是已經關閉,直接返回。

  如果g_executor.busy !=0 表示解析線程已經被創建,直接返回,解析線程會依次從隊列中取出待解析的名稱,進行解析。

  如果上面都沒有執行,則調用gpr_thd_new函數創建解析線程。

2.3 在線程函數中,依次從隊列中取出待解析的對象調用do_request_thread函數

static void closure_exec_thread_func(void *ignored) {
  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
  while (1) {
    gpr_mu_lock(&g_executor.mu);
    if (g_executor.shutting_down != 0) {
      gpr_mu_unlock(&g_executor.mu);
      break;
    }
    if (grpc_closure_list_empty(g_executor.closures)) {
      /* no more work, time to die */
      GPR_ASSERT(g_executor.busy == 1);
      g_executor.busy = 0;
      gpr_mu_unlock(&g_executor.mu);
      break;
    } else {
      grpc_closure *c = g_executor.closures.head;
      grpc_closure_list_init(&g_executor.closures);
      gpr_mu_unlock(&g_executor.mu);
      while (c != NULL) {
        grpc_closure *next = c->next_data.next;
        grpc_error *error = c->error_data.error;
#ifndef NDEBUG
        c->scheduled = false;
#endif
        c->cb(&exec_ctx, c->cb_arg, error);
        GRPC_ERROR_UNREF(error);
        c = next;
      }
      grpc_exec_ctx_flush(&exec_ctx);
    }
  }
  grpc_exec_ctx_finish(&exec_ctx);
}

  cb函數是在打包時設置的,其實就是do_request_thread函數,詳看2.2

2.4 do_request_thread函數直接調用blocking_resolve_address_impl函數進行處理

static grpc_error *blocking_resolve_address_impl(    const char *name, const char *default_port,    grpc_resolved_addresses **addresses) {

struct addrinfo hints; struct addrinfo *result = NULL, *resp; char *host; char *port; int s; size_t i; grpc_error *error = GRPC_ERROR_NONE; /* parse name, splitting it into host and port parts */ gpr_split_host_port(name, &host, &port); ......if (port == NULL) { ...... port = gpr_strdup(default_port); } /* Call getaddrinfo */ memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result);
......
}

  首先,從名稱中獲取主機名和端口

    如果名稱中沒有端口,則使用默認的端口代替

  然後,調用getaddrinfo函數獲取IP地址

分析連接超時的過程:

  1. 設置非法(不存在)的名稱

  2. 調用getaddrinfo函數進行解析

    因為名稱不存在,但是域名解析服務器可能認為名稱是"存在"的,只是沒有找到而已,所以需要更多的時間來進行查找。

  3. getaddrinfo函數是阻塞的,在函數返回之前其他的名稱只是簡單添加到了隊列之中,並沒有立即進行解析。

  4. 多次輸入非法的名稱之後,導致隊列過長,即使設置了合法的名稱,但是還沒有對它進行解析,最後3秒超時。

解決方案:

  1. 只允許輸入合法IP地址,這樣不會進行名稱解析或解析立即返回,可以利用正則表達式對輸入IP進行合法性校驗。

  2. 域名解析交給其他的第三方庫完成,將獲取的合法IP設置給GRPC使用

grpc源碼分析之域名解析