1. 程式人生 > >ceph原始碼分析--一個entry函式的呼叫

ceph原始碼分析--一個entry函式的呼叫

ceph中有很多的entry函式,往往是由各執行緒來呼叫處理的,本篇文章以bluestore中的MempoolThread為例來講講該函式的呼叫。

 struct MempoolThread : public Thread {
    BlueStore *store;
    Cond cond;
    Mutex lock;
    bool stop = false;
  public:
    explicit MempoolThread(BlueStore *s)
      : store(s),
    lock("BlueStore::MempoolThread::lock
") {} void *entry() override; void init() { assert(stop == false); create("bstore_mempool"); } void shutdown() { lock.Lock(); stop = true; cond.Signal(); lock.Unlock(); join(); } } ;

首先檢視其init函式,呼叫了父類Thread的create(“bstore_mempool”)

void
init() { assert(stop == false); create("bstore_mempool"); }

void Thread::create(const char *name, size_t stacksize),呼叫了try_create(stacksize)

void Thread::create(const char *name, size_t stacksize)
{
  assert(strlen(name) < 16);
  thread_name = name;

  int ret = try_create(stacksize);
  if
(ret != 0) { char buf[256]; snprintf(buf, sizeof(buf), "Thread::try_create(): pthread_create " "failed with error %d", ret); dout_emergency(buf); assert(ret == 0); } }

int Thread::try_create(size_t stacksize)呼叫了pthread_create(&thread_id, thread_attr, _entry_func, (void*)this),這是執行緒建立函式,關注_entry_func

int Thread::try_create(size_t stacksize)
{
  pthread_attr_t *thread_attr = NULL;
  pthread_attr_t thread_attr_loc;

  stacksize &= CEPH_PAGE_MASK;  // must be multiple of page
  if (stacksize) {
    thread_attr = &thread_attr_loc;
    pthread_attr_init(thread_attr);
    pthread_attr_setstacksize(thread_attr, stacksize);
  }

  int r;

  // The child thread will inherit our signal mask.  Set our signal mask to
  // the set of signals we want to block.  (It's ok to block signals more
  // signals than usual for a little while-- they will just be delivered to
  // another thread or delieverd to this thread later.)
  sigset_t old_sigset;
  if (g_code_env == CODE_ENVIRONMENT_LIBRARY) {
    block_signals(NULL, &old_sigset);
  }
  else {
    int to_block[] = { SIGPIPE , 0 };
    block_signals(to_block, &old_sigset);
  }
  r = pthread_create(&thread_id, thread_attr, _entry_func, (void*)this);
  restore_sigset(&old_sigset);

  if (thread_attr) {
    pthread_attr_destroy(thread_attr);  
  }

  return r;
}

void Thread::_entry_func(void *arg) 呼叫了((Thread)arg)->entry_wrapper()

void *Thread::_entry_func(void *arg) {
  void *r = ((Thread*)arg)->entry_wrapper();
  return r;
}

void *Thread::entry_wrapper()呼叫到entry~~~~旅行結束

void *Thread::entry_wrapper()
{
  int p = ceph_gettid(); // may return -ENOSYS on other platforms
  if (p > 0)
    pid = p;
  if (pid &&
      ioprio_class >= 0 &&
      ioprio_priority >= 0) {
    ceph_ioprio_set(IOPRIO_WHO_PROCESS,
            pid,
            IOPRIO_PRIO_VALUE(ioprio_class, ioprio_priority));
  }
  if (pid && cpuid >= 0)
    _set_affinity(cpuid);

  ceph_pthread_setname(pthread_self(), thread_name);
  return entry();
}