1. 程式人生 > >Ceph 學習——OSD讀寫流程與原始碼分析(一)

Ceph 學習——OSD讀寫流程與原始碼分析(一)

直接上圖:


這裡寫圖片描述

同樣當前最新的版本,和之前的版本有所不同,有一些模組簡化了,類的名字也改了。先介紹圖中涉及的相關的類,然後在對類中具體函式主要呼叫流程進行分析。

OSD 模組主要的類


這裡寫圖片描述

盜圖:其中ReplicatedPG 在最新的版本中去掉了,更改為PrimaryLogPG類

OSD類

OSD和OSDService是核心類,他們直接在頂層負責一個OSD節點的工作,從客戶端的得到的訊息,就是先到達OSD類中,通過OSD類的處理,在呼叫PrimaryLogPG(之前為ReplicatedPG 類)類進行處理。該類中,在讀寫流程中的主要工作是訊息(Message)封裝為 RequestOp,檢查epoch (版本)是否需要更新,並獲取PG控制代碼,並做PG相關的檢查,最後將請求加入佇列。

PrimaryLogPG類

該類繼承自PG類,PGBackend::Listener(該類是一個抽象類)類PG類處理相關狀態的維護,以及實現PG層面的功能,核心功能是用boost庫的statechart狀態機來做PG狀態轉換。它實現了PG內的資料讀寫等功能。

PGBackend類

該類主要功能是將請求資料通過事務的形式同步到一個PG的其它從OSD上(注意:主OSD的操作PrimaryLogPG來完成)。
他有兩個子類,分別是 ReplicatedBackend和ECBackend,對應著PG的的兩種型別的實現。

OSD讀寫函式呼叫流程


這裡寫圖片描述

1)OSD::ms_fast_dispatch 函式是接收訊息Message的入口函式,他被網路模組的接收執行緒呼叫。主要工作是 檢查service服務 、把Message封裝為OpRequest型別、獲取session、獲取最新的OSdMap,最後dispatch_session_waiting,進入下一步。

void OSD::ms_fast_dispatch(Message *m)
{
  FUNCTRACE();
  if (service.is_stopping()) {//檢查service,如果停止了直接返回
    m->put();
    return;
  }
  OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);//把Message封裝為OpRequest型別
...
...

  if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||
      m->get_type() != CEPH_MSG_OSD_OP) {
    // queue it directly直接呼叫enqueue_op處理
enqueue_op( static_cast<MOSDFastDispatchOp*>(m)->get_spg(), op, static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch()); } else { Session *session = static_cast<Session*>(m->get_connection()->get_priv());//獲取 session 其中包含了一個Connection的相關資訊 if (session) { { Mutex::Locker l(session->session_dispatch_lock); op->get(); session->waiting_on_map.push_back(*op);//將請求加如waiting_on_map的列表裡 OSDMapRef nextmap = service.get_nextmap_reserved();//獲取最新的OSDMAP dispatch_session_waiting(session, nextmap);//該函式中 迴圈處理請求 service.release_map(nextmap); } session->put(); } } OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); }

2)OSD::dispatch_session_waiting 主要工作是迴圈處理佇列waiting_on_map中的元素,對比OSDmap,以及獲取他們的pgid,最後呼叫enqueue_op處理。

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
  assert(session->session_dispatch_lock.is_locked());

  auto i = session->waiting_on_map.begin();
  while (i != session->waiting_on_map.end()) {//迴圈處理waiting_on_map中的元素
    OpRequestRef op = &(*i);
    assert(ms_can_fast_dispatch(op->get_req()));
    const MOSDFastDispatchOp *m = static_cast<const MOSDFastDispatchOp*>(
      op->get_req());
    if (m->get_min_epoch() > osdmap->get_epoch()) {//osdmap版本不對應
      break;
    }
    session->waiting_on_map.erase(i++);
    op->put();

    spg_t pgid;
    if (m->get_type() == CEPH_MSG_OSD_OP) {
      pg_t actual_pgid = osdmap->raw_pg_to_pg(
    static_cast<const MOSDOp*>(m)->get_pg());
      //osdmap->get_primary_shard(actual_pgid, &pgid)獲取 pgid  該PG的主OSD
      if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {
    continue;
      }
    } else {
      pgid = m->get_spg();
    }
    enqueue_op(pgid, op, m->get_map_epoch());//獲取成功則呼叫enqueue_op處理
  }

  if (session->waiting_on_map.empty()) {
    clear_session_waiting_on_map(session);
  } else {
    register_session_waiting_on_map(session);
  }
}

3)OSD::enqueue_op 的主要工作是將求情加入到op_shardedwq佇列中

void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
{
...
  op->osd_trace.event("enqueue op");
  op->osd_trace.keyval("priority", op->get_req()->get_priority());
  op->osd_trace.keyval("cost", op->get_req()->get_cost());
  op->mark_queued_for_pg();
  logger->tinc(l_osd_op_before_queue_op_lat, latency);
  //加入op_shardedwq佇列中
  op_shardedwq.queue(
    OpQueueItem(
      unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(pg, op)),
      op->get_req()->get_cost(),
      op->get_req()->get_priority(),
      op->get_req()->get_recv_stamp(),
      op->get_req()->get_source().num(),
      epoch));
}

4)OSD::dequeue_op 呼叫函式進行osdmap的更新,呼叫do_request進入PG處理流程

void OSD::dequeue_op(
  PGRef pg, OpRequestRef op,
  ThreadPool::TPHandle &handle)
{
...
...
  logger->tinc(l_osd_op_before_dequeue_op_lat, latency);

  Session *session = static_cast<Session *>(
    op->get_req()->get_connection()->get_priv());
  if (session) {
      //呼叫該函式進行 osdmap的更新
    maybe_share_map(session, op, pg->get_osdmap());
    session->put();
  }
  //正在是刪除、直接返回
  if (pg->is_deleting())
    return;

  op->mark_reached_pg();
  op->osd_trace.event("dequeue_op");
  //呼叫pg的do_request處理
  pg->do_request(op, handle);

  // finish
  dout(10) << "dequeue_op " << op << " finish" << dendl;
  OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_END", false);
}

5)PrimaryLogPG::do_request該函式 主要你檢查PG的狀態,以及根據訊息型別進行不同處理

void PrimaryLogPG::do_request(
  OpRequestRef& op,
  ThreadPool::TPHandle &handle)
{
...
  // make sure we have a new enough map
  //檢查 osdmap
  auto p = waiting_for_map.find(op->get_source());
...

  //是否可以丟棄
  if (can_discard_request(op)) {
    return;
  }
...
...
  //PG還沒有peered
  if (!is_peered()) {
    // Delay unless PGBackend says it's ok
      //檢查pgbackend是否可以處理這個請求
    if (pgbackend->can_handle_while_inactive(op)) {
      bool handled = pgbackend->handle_message(op);//可以處理,則呼叫該函式處理
      assert(handled);
      return;
    } else {
      waiting_for_peered.push_back(op);//不可以則加入waiting_for_peered佇列
      op->mark_delayed("waiting for peered");
      return;
    }
  }
  ...
  ...
  //PG處於Peered 並且flushes_in_progress為0的狀態下
  assert(is_peered() && flushes_in_progress == 0);
  if (pgbackend->handle_message(op))
    return;

// 根據不同的訊息請求型別,進行相應的處理
  switch (op->get_req()->get_type()) {
  case CEPH_MSG_OSD_OP:
  case CEPH_MSG_OSD_BACKOFF:
    if (!is_active()) {//該PG狀態 為非active狀態
      dout(20) << " peered, not active, waiting for active on " << op << dendl;
      waiting_for_active.push_back(op);//加入佇列
      op->mark_delayed("waiting for active");
      return;
    }
    switch (op->get_req()->get_type()) {
    case CEPH_MSG_OSD_OP:
      // verify client features 如果是cache pool ,操作沒有帶CEPH_FEATURE_OSD_CACHEPOOL的feature標誌,返回錯誤資訊
      if ((pool.info.has_tiers() || pool.info.is_tier()) &&
      !op->has_feature(CEPH_FEATURE_OSD_CACHEPOOL)) {
    osd->reply_op_error(op, -EOPNOTSUPP);
    return;
      }
      do_op(op);//呼叫do_op 處理
      break;
    case CEPH_MSG_OSD_BACKOFF:
      // object-level backoff acks handled in osdop context
      handle_backoff(op);
      break;
    }
    break;

...
//各種訊息型別
...

  default:
    assert(0 == "bad message type in do_request");
  }
}

6)PrimaryLogPG::do_op 函式很長很負責,這裡著看相關呼叫流程好了,主要功能是解析出操作來,然後對操作的箇中引數進行檢查,檢查相關物件的狀態,以及該物件的head、snap、clone物件的狀態等,並呼叫函式獲取物件的上下文、操作的上下文(ObjectContext、OPContext)


void PrimaryLogPG::do_op(OpRequestRef& op)
{
  FUNCTRACE();
  // NOTE: take a non-const pointer here; we must be careful not to
  // change anything that will break other reads on m (operator<<).
  MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
  assert(m->get_type() == CEPH_MSG_OSD_OP);
  //解析欄位,從bufferlist解析資料
  if (m->finish_decode()) {
    op->reset_desc();   // for TrackedOp
    m->clear_payload();
  }
...
...
  if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
             CEPH_OSD_FLAG_LOCALIZE_READS)) &&
      op->may_read() &&
      !(op->may_write() || op->may_cache())) {
    // balanced reads; any replica will do  平衡讀,則主從OSD都可以讀取
    if (!(is_primary() || is_replica())) {
      osd->handle_misdirected_op(this, op);
      return;
    }
  } else {
    // normal case; must be primary  否則只能讀取主OSD
    if (!is_primary()) {
      osd->handle_misdirected_op(this, op);
      return;
    }
  }

  if (!op_has_sufficient_caps(op)) {
    osd->reply_op_error(op, -EPERM);
    return;
  }
  //op中包含includes_pg_op該操作,則呼叫 do_pg_op(op)處理
  if (op->includes_pg_op()) {
    return do_pg_op(op);
  }

  // object name too long?
  //檢查名字是否太長
  if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {
    dout(4) << "do_op name is longer than "
        << cct->_conf->osd_max_object_name_len
        << " bytes" << dendl;
    osd->reply_op_error(op, -ENAMETOOLONG);
    return;
  }
...
...
  // blacklisted?
  //傳送請求的客戶端是黑名單中的一個
  if (get_osdmap()->is_blacklisted(m->get_source_addr())) {
    dout(10) << "do_op " << m->get_source_addr() << " is blacklisted" << dendl;
    osd->reply_op_error(op, -EBLACKLISTED);
    return;
  }
...
...
  // missing object?
  //head物件是否處於缺失狀態
  if (is_unreadable_object(head)) {
    if (!is_primary()) {
      osd->reply_op_error(op, -EAGAIN);
      return;
    }
    if (can_backoff &&
    (g_conf->osd_backoff_on_degraded ||
     (g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
      add_backoff(session, head, head);
      maybe_kick_recovery(head);
    } else {
      wait_for_unreadable_object(head, op);//加入佇列,等待恢復完成
    }
    return;
  }

  // degraded object?
  //順序寫 且head物件正在恢復狀態
  if (write_ordered && is_degraded_or_backfilling_object(head)) {
    if (can_backoff && g_conf->osd_backoff_on_degraded) {
      add_backoff(session, head, head);
      maybe_kick_recovery(head);
    } else {
      wait_for_degraded_object(head, op);//加入佇列,等待
    }
    return;
  }
  //順序寫,切處於資料一致性檢查 scrub時期
  if (write_ordered && scrubber.is_chunky_scrub_active() &&
      scrubber.write_blocked_by_scrub(head)) {
    dout(20) << __func__ << ": waiting for scrub" << dendl;
    waiting_for_scrub.push_back(op);
    op->mark_delayed("waiting for scrub");
    return;
  }
 ...
 ...
  //若果是順序寫,並且該物件在該佇列中
  if (write_ordered && objects_blocked_on_cache_full.count(head)) {
    block_write_on_full_cache(head, op);
    return;
  }
  ...
  ...
  // io blocked on obc?
  //檢查物件是否被blocked
  if (!m->has_flag(CEPH_OSD_FLAG_FLUSH) &&
      maybe_await_blocked_head(oid, op)) {
    return;
  }

  //呼叫find_object_context獲取object_context
  int r = find_object_context(
    oid, &obc, can_create,
    m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
    &missing_oid);


// hit.set 不為空 則設定
  bool in_hit_set = false;
  if (hit_set) {
    if (obc.get()) {
      if (obc->obs.oi.soid != hobject_t() && hit_set->contains(obc->obs.oi.soid))
    in_hit_set = true;
    } else {
      if (missing_oid != hobject_t() && hit_set->contains(missing_oid))
        in_hit_set = true;
    }
    if (!op->hitset_inserted) {
      hit_set->insert(oid);
      op->hitset_inserted = true;
      if (hit_set->is_full() ||
          hit_set_start_stamp + pool.info.hit_set_period <= m->get_recv_stamp()) {
        hit_set_persist();
      }
    }
  }
  //agent_state 不為空
  if (agent_state) {
    if (agent_choose_mode(false, op))// 呼叫該函式進行選擇agent的狀態
      return;
  }
...
...
...
  op->mark_started();

  execute_ctx(ctx);//呼叫該函式,執行相關操作
  utime_t prepare_latency = ceph_clock_now();
  prepare_latency -= op->get_dequeued_time();
  osd->logger->tinc(l_osd_op_prepare_lat, prepare_latency);
  if (op->may_read() && op->may_write()) {
    osd->logger->tinc(l_osd_op_rw_prepare_lat, prepare_latency);
  } else if (op->may_read()) {
    osd->logger->tinc(l_osd_op_r_prepare_lat, prepare_latency);
  } else if (op->may_write() || op->may_cache()) {
    osd->logger->tinc(l_osd_op_w_prepare_lat, prepare_latency);
  }

  // force recovery of the oldest missing object if too many logs
  maybe_force_recovery();
}

7) PrimaryLogPG::find_object_context 函式主要根據 不同發情況 通過呼叫 PrimaryLogPG::get_object_context函式獲取 物件上下文。

/*
 * If we return an error, and set *pmissing, then promoting that
 * object may help.
 *
 * If we return -EAGAIN, we will always set *pmissing to the missing
 * object to wait for.
 *
 * If we return an error but do not set *pmissing, then we know the
 * object does not exist.
 */
//獲取一個物件的ObjectContext
int PrimaryLogPG::find_object_context(const hobject_t& oid,
                      ObjectContextRef *pobc,
                      bool can_create,
                      bool map_snapid_to_clone,
                      hobject_t *pmissing)
{
  FUNCTRACE();
  assert(oid.pool == static_cast<int64_t>(info.pgid.pool()));
  // want the head?
  if (oid.snap == CEPH_NOSNAP) {
    ObjectContextRef obc = get_object_context(oid, can_create);//如果是想要原始物件(head)直接呼叫
    if (!obc) {
      if (pmissing)
        *pmissing = oid;
      return -ENOENT;
    }
    dout(10) << "find_object_context " << oid
       << " @" << oid.snap
       << " oi=" << obc->obs.oi
       << dendl;
    *pobc = obc;

    return 0;
  }

  hobject_t head = oid.get_head();

  // we want a snap
  //不是map_snapid_to_clone物件且,該snap快照已經被刪除,直接返回-ENOENT
  if (!map_snapid_to_clone && pool.info.is_removed_snap(oid.snap)) {
    dout(10) << __func__ << " snap " << oid.snap << " is removed" << dendl;
    return -ENOENT;
  }

  SnapSetContext *ssc = get_snapset_context(oid, can_create);//呼叫get_snapset_context物件來獲取SnapSetContext物件。
  if (!ssc || !(ssc->exists || can_create)) {
    dout(20) << __func__ << " " << oid << " no snapset" << dendl;
    if (pmissing)
      *pmissing = head;  // start by getting the head
    if (ssc)
      put_snapset_context(ssc);
    return -ENOENT;
  }
//如果是map_snapid_to_clone
  if (map_snapid_to_clone) {
    dout(10) << "find_object_context " << oid << " @" << oid.snap
         << " snapset " << ssc->snapset
         << " map_snapid_to_clone=true" << dendl;
    if (oid.snap > ssc->snapset.seq) {//大於說明 該快照最新,osd還沒完成相關資訊的更新,直接返回head物件的上下文
      // already must be readable
      ObjectContextRef obc = get_object_context(head, false);//直接返回head物件的上下文
      dout(10) << "find_object_context " << oid << " @" << oid.snap
           << " snapset " << ssc->snapset
           << " maps to head" << dendl;
      *pobc = obc;
      put_snapset_context(ssc);
      return (obc && obc->obs.exists) ? 0 : -ENOENT;
    } else {
      vector<snapid_t>::const_iterator citer = std::find(//否則檢查snapset的克隆列表
    ssc->snapset.clones.begin(),
    ssc->snapset.clones.end(),
    oid.snap);
      if (citer == ssc->snapset.clones.end()) {
    dout(10) << "find_object_context " << oid << " @" << oid.snap
         << " snapset " << ssc->snapset
         << " maps to nothing" << dendl;
    put_snapset_context(ssc);
    return -ENOENT;
      }
      ...
      ...
      //找到,但處於缺失狀態
      if (pg_log.get_missing().is_missing(oid)) {
    dout(10) << "find_object_context " << oid << " @" << oid.snap
         << " snapset " << ssc->snapset
         << " " << oid << " is missing" << dendl;
    if (pmissing)
      *pmissing = oid;
    put_snapset_context(ssc);
    return -EAGAIN;
      }
    ...
    ...//各種情況下的find_object_context
}

8)get_object_context 實際去獲取上下文,先在快取裡面找,如果沒有在呼叫函式去獲取。另外在呼叫get_snapset_context獲取SnapSetContext。

ObjectContextRef PrimaryLogPG::get_object_context(
  const hobject_t& soid,
  bool can_create,
  const map<string, bufferlist> *attrs)
{
...
//先在快取裡面找
  ObjectContextRef obc = object_contexts.lookup(soid);
  osd->logger->inc(l_osd_object_ctx_cache_total);
  if (obc) {
    osd->logger->inc(l_osd_object_ctx_cache_hit);
    dout(10) << __func__ << ": found obc in cache: " << obc
         << dendl;
  } else {
    dout(10) << __func__ << ": obc NOT found in cache: " << soid << dendl;
    // check disk
    bufferlist bv;
    if (attrs) {
      assert(attrs->count(OI_ATTR));
      bv = attrs->find(OI_ATTR)->second;
    } else {
      int r = pgbackend->objects_get_attr(soid, OI_ATTR, &bv);//快取沒有就呼叫函式去獲取
      if (r < 0) {
    if (!can_create) {
      dout(10) << __func__ << ": no obc for soid "
           << soid << " and !can_create"
           << dendl;
      return ObjectContextRef();   // -ENOENT!
    }

    dout(10) << __func__ << ": no obc for soid "
         << soid << " but can_create"
         << dendl;
    // new object.
    object_info_t oi(soid);
    //呼叫get_snapset_context獲取 SnapSetContext
    SnapSetContext *ssc = get_snapset_context(
      soid, true, 0, false);
        assert(ssc);
    obc = create_object_context(oi, ssc);
    dout(10) << __func__ << ": " << obc << " " << soid
         << " " << obc->rwstate
         << " oi: " << obc->obs.oi
         << " ssc: " << obc->ssc
         << " snapset: " << obc->ssc->snapset << dendl;
    return obc;
      }
    }
 ...
 ...

  }
}

9)

SnapSetContext *PrimaryLogPG::get_snapset_context(
  const hobject_t& oid,
  bool can_create,
  const map<string, bufferlist> *attrs,
  bool oid_existed)
{
  Mutex::Locker l(snapset_contexts_lock);
  SnapSetContext *ssc;
  map<hobject_t, SnapSetContext*>::iterator p = snapset_contexts.find(
    oid.get_snapdir());
  if (p != snapset_contexts.end()) {
    if (can_create || p->second->exists) {
      ssc = p->second;
    } else {
      return NULL;
    }
  } else {
    bufferlist bv;
    if (!attrs) {
      int r = -ENOENT;
      if (!(oid.is_head() && !oid_existed)) {
    r = pgbackend->objects_get_attr(oid.get_head(), SS_ATTR, &bv);
      }
      if (r < 0 && !can_create)
    return NULL;
    } else {
      assert(attrs->count(SS_ATTR));
      bv = attrs->find(SS_ATTR)->second;
    }
    ssc = new SnapSetContext(oid.get_snapdir());
    _register_snapset_context(ssc);
    if (bv.length()) {
      bufferlist::iterator bvp = bv.begin();
      try {
    ssc->snapset.decode(bvp);
      } catch (buffer::error& e) {
        dout(0) << __func__ << " Can't decode snapset: " << e << dendl;
    return NULL;
      }
      ssc->exists = true;
    } else {
      ssc->exists = false;
    }
  }
  assert(ssc);
  ssc->ref++;
  return ssc;
}

10)該函式是由do_op呼叫的, 主要工作是檢查物件狀態和上下文相關資訊的獲取,並呼叫函式prepare _transactions 把操作封裝到事務中。如果是讀取操作,則呼叫相關讀取函式(同步、非同步)。如果是寫操作,則 呼叫calc_trim_to計算是否將舊的PG log日誌進行trim操作、 issue_repop(repop, ctx)向各個副本傳送同步操作請求、eval_repop(repop)檢查發向各個副本的同步操作請求是否已經reply成功

void PrimaryLogPG::execute_ctx(OpContext *ctx)
{
  FUNCTRACE();
  dout(10) << __func__ << " " << ctx << dendl;
  ctx->reset_obs(ctx->obc);
  ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctx
  OpRequestRef op = ctx->op;
  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
  ObjectContextRef obc = ctx->obc;
  const hobject_t& soid = obc->obs.oi.soid;

  // this method must be idempotent since we may call it several times
  // before we finally apply the resulting transaction.
  ctx->op_t.reset(new PGTransaction);

  if (op->may_write() || op->may_cache()) {
    // snap
    if (!(m->has_flag(CEPH_OSD_FLAG_ENFORCE_SNAPC)) &&//如果是對整個pool的快照操作
    pool.info.is_pool_snaps_mode()) {
      // use pool's snapc
      ctx->snapc = pool.snapc;//設定為該值  pool的資訊
    } else {//如果是使用者特定的快照   如RBD
      // client specified snapc
      ctx->snapc.seq = m->get_snap_seq();//設定為資訊帶的相關資訊
      ctx->snapc.snaps = m->get_snaps();
      filter_snapc(ctx->snapc.snaps);
    }
    if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
    ctx->snapc.seq < obc->ssc->snapset.seq) {//客戶端的 snap序號小於服務端的 返回錯誤
      dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
           << " < snapset seq " << obc->ssc->snapset.seq
           << " on " << obc->obs.oi.soid << dendl;
      reply_ctx(ctx, -EOLDSNAPC);
      return;
    }
...

  if (!ctx->user_at_version)
    ctx->user_at_version = obc->obs.oi.user_version;
  dout(30) << __func__ << " user_at_version " << ctx->user_at_version << dendl;
//若是讀操作,給objectContext加上ondisk_read_lock鎖
  if (op->may_read()) {
    dout(10) << " taking ondisk_read_lock" << dendl;
    obc->ondisk_read_lock();
  }

  {
#ifdef WITH_LTTNG
    osd_reqid_t reqid = ctx->op->get_reqid();
#endif
    tracepoint(osd, prepare_tx_enter, reqid.name._type,
        reqid.name._num, reqid.tid, reqid.inc);
  }

  int result = prepare_transaction(ctx);//將相關的操作封裝到 ctx->op_t中 封裝成事務
  {
#ifdef WITH_LTTNG
    osd_reqid_t reqid = ctx->op->get_reqid();
#endif
    tracepoint(osd, prepare_tx_exit, reqid.name._type,
        reqid.name._num, reqid.tid, reqid.inc);
  }

  if (op->may_read()) {
    dout(10) << " dropping ondisk_read_lock" << dendl;
    obc->ondisk_read_unlock();
  }

  bool pending_async_reads = !ctx->pending_async_reobc->ondisk_read_lock();ads.empty();
  if (result == -EINPROGRESS || pending_async_reads) {
    // come back later.
    if (pending_async_reads) {
      assert(pool.info.is_erasure());
      in_progress_async_reads.push_back(make_pair(op, ctx));
      ctx->start_async_reads(this);//如果是,則呼叫該函式 非同步讀取
    }
    return;
  }

  if (result == -EAGAIN) {
    // clean up after the ctx
    close_op_ctx(ctx);
    return;
  }

  bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
  // prepare the reply
  ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0,
                   successful_write, op->qos_resp);

  // read or error?
  if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
    // finish side-effects
    if (result >= 0)
      do_osd_op_effects(ctx, m->get_connection());

    complete_read_ctx(result, ctx);//同步讀取,
    return;
  }

  ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version);

  assert(op->may_write() || op->may_cache());

  // trim log?
  calc_trim_to();//呼叫函式 計算是否將舊的PG log日誌進行trim操作
  ...
  ...
  issue_repop(repop, ctx);//向各個副本傳送同步操作請求
  eval_repop(repop);//檢查發向各個副本的同步操作請求是否已經reply成功
  repop->put();
}

11)PrimaryLogPG::issue_repop函式 主要是把講求傳送到 副本OSD上進行處理

void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
{
  FUNCTRACE();
  const hobject_t& soid = ctx->obs->oi.soid;
  dout(7) << "issue_repop rep_tid " << repop->rep_tid
          << " o " << soid
          << dendl;

  repop->v = ctx->at_version;
  if (ctx->at_version > eversion_t()) {
    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
     i != actingbackfill.end();
     ++i) {
      if (*i == get_primary()) continue;
      pg_info_t &pinfo = peer_info[*i];
      // keep peer_info up to date
      if (pinfo.last_complete == pinfo.last_update)
    pinfo.last_complete = ctx->at_version;
      pinfo.last_update = ctx->at_version;
    }
  }
 //為寫做準備 給相關物件加ondisk_write_lock鎖
  ctx->obc->ondisk_write_lock();

  ctx->op_t->add_obc(ctx->obc);
  if (ctx->clone_obc) {
    ctx->clone_obc->ondisk_write_lock();
    ctx->op_t->add_obc(ctx->clone_obc);
  }
  if (ctx->head_obc) {
    ctx->head_obc->ondisk_write_lock();
    ctx->op_t->add_obc(ctx->head_obc);
  }

  Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
  Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
    ctx->obc,
    ctx->clone_obc,
    ctx->head_obc);
  if (!(ctx->log.empty())) {
    assert(ctx->at_version >= projected_last_update);
    projected_last_update = ctx->at_version;
  }
  for (auto &&entry: ctx->log) {
    projected_log.add(entry);
  }
  //將事務傳送到OSD處理,對於不同的PG實現,呼叫不同的類,PGBackend有兩個子類,ReplicatedBackend 和 ECBackend 兩個類對應不同的實現
  pgbackend->submit_transaction(
    soid,
    ctx->delta_stats,
    ctx->at_version,
    std::move(ctx->op_t),
    pg_trim_to,
    min_last_complete_ondisk,
    ctx->log,
    ctx->updated_hset_history,
    onapplied_sync,
    on_all_applied,
    on_all_commit,
    repop->rep_tid,
    ctx->reqid,
    ctx->op);
}

12)該函式用於最終呼叫網路介面,把更新請求傳送給從OSD,並呼叫queue_transactions 函式對該PG的主OSD上的實現更改。

void ReplicatedBackend::submit_transaction(
  const hobject_t &soid,
  const object_stat_sum_t &delta_stats,
  const eversion_t &at_version,
  PGTransactionUPtr &&_t,
  const eversion_t &trim_to,
  const eversion_t &roll_forward_to,
  const vector<pg_log_entry_t> &_log_entries,
  boost::optional<pg_hit_set_history_t> &hset_history,
  Context *on_local_applied_sync,
  Context *on_all_acked,
  Context *on_all_commit,
  ceph_tid_t tid,
  osd_reqid_t reqid,
  OpRequestRef orig_op)
{
  parent->apply_stats(
    soid,
    delta_stats);

  vector<pg_log_entry_t> log_entries(_log_entries);
  ObjectStore::Transaction op_t;
  PGTransactionUPtr t(std::move(_t));
  set<hobject_t> added, removed;
  generate_transaction(
    t,
    coll,
    log_entries,
    &op_t,
    &added,
    &removed);
  assert(added.size() <= 1);
  assert(removed.size() <= 1);

  auto insert_res = in_progress_ops.insert(
    make_pair(
      tid,
      InProgressOp(
    tid, on_all_commit, on_all_acked,
    orig_op, at_version)
      )
    );
  assert(insert_res.second);
  //構件InProgressOp請求記錄
  InProgressOp &op = insert_res.first->second;

  op.waiting_for_applied.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());
  op.waiting_for_commit.insert(
    parent->get_actingbackfill_shards().begin(),
    parent->get_actingbackfill_shards().end());

  //呼叫該函式,把請求傳送出去,傳送到從OSD
  issue_op(
    soid,
    at_version,
    tid,
    reqid,
    trim_to,
    at_version,
    added.size() ? *(added.begin()) : hobject_t(),
    removed.size() ? *(removed.begin()) : hobject_t(),
    log_entries,
    hset_history,
    &op,
    op_t);

  add_temp_objs(added);
  clear_temp_objs(removed);

  parent->log_operation(
    log_entries,
    hset_history,
    trim_to,
    at_version,
    true,
    op_t);

  op_t.register_on_applied_sync(on_local_applied_sync);
  op_t.register_on_applied(
    parent->bless_context(
      new C_OSD_OnOpApplied(this, &op)));
  op_t.register_on_commit(
    parent->bless_context(
      new C_OSD_OnOpCommit(this, &op)));

  vector<ObjectStore::Transaction> tls;
  tls.push_back(std::move(op_t));

  parent->queue_transactions(tls, op.op);//呼叫該函式完成最後的操作,對該PG的主OSD上的本地物件完成操作
}

13) 呼叫的queue_transactions函式,會呼叫到os層。
呼叫的函式位於 PrinaryLogPG.h

  void queue_transactions(vector<ObjectStore::Transaction>& tls,
              OpRequestRef op) override {
    osd->store->queue_transactions(osr.get(), tls, 0, 0, 0, op, NULL);//最終呼叫到os層
  }

其中 osd->store 定義為
ObjectStore *store;

相關推薦

Ceph 學習——OSD流程原始碼分析

直接上圖: 同樣當前最新的版本,和之前的版本有所不同,有一些模組簡化了,類的名字也改了。先介紹圖中涉及的相關的類,然後在對類中具體函式主要呼叫流程進行分析。 OSD 模組主要的類 盜圖:其中ReplicatedPG 在最新的版本中

flannel 實戰原始碼分析

Flannel 是由 CoreOS 維護的一個虛擬網路方案。目前是kubernetes預設的網路,它有golang編寫,在原始碼分析之前還是先看看怎樣使用。這裡不得不提一下kubernetes網路約束: 1. 所有容器之間都可以無須SNAT即可相互直接以IP

OpenCV學習筆記31KAZE 演算法原理原始碼分析KAZE的原始碼優化及SIFT的比較

  KAZE系列筆記: 1.  OpenCV學習筆記(27)KAZE 演算法原理與原始碼分析(一)非線性擴散濾波 2.  OpenCV學習筆記(28)KAZE 演算法原理與原始碼分析(二)非線性尺度空間構建 3.  Op

OpenCV學習筆記30KAZE 演算法原理原始碼分析KAZE特徵的效能分析比較

      KAZE系列筆記: 1.  OpenCV學習筆記(27)KAZE 演算法原理與原始碼分析(一)非線性擴散濾波 2.  OpenCV學習筆記(28)KAZE 演算法原理與原始碼分析(二)非線性尺度空間構

JVM學習記錄-線程安全鎖優化

多線程 image @param decimal 屬於 資源分配 try 可能 例如 前言 線程:程序流執行的最小單元。線程是比進程更輕量級的調度執行單位,線程的引入,可以把一個進程的資源分配和執行調度分開,各個線程既可以共享進程資源(內存地址、文件I/O等),又可以獨立

GCC原始碼分析——介紹安裝

原文連結:http://blog.csdn.net/sonicling/article/details/6702031     上半年一直在做有關GCC和LD的專案,到現在還沒做完。最近幾天程式設計的那臺電腦壞了,所以趁此間隙寫一點相關的分析和

MySQL 之 MHA + ProxySQL + keepalived 實現分離,高可用

don replica ifconfig soft 似的 恢復 select 8.0 ext 準備服務器: docker network create --subnet=192.168.0.0/16 staticnetdocker run -d --privileg

okhttp原始碼分析——基本流程超詳細

1.okhttp原始碼分析(一)——基本流程(超詳細) 2.okhttp原始碼分析(二)——RetryAndFollowUpInterceptor過濾器 3.okhttp原始碼分析(三)——CacheInterceptor過濾器 4.okhttp原始碼分析(四)——Conn

spring事務管理原始碼分析配置和事務增強代理的生成流程

在本篇文章中,將會介紹如何在spring中進行事務管理,之後對其內部原理進行分析。主要涉及 @EnableTransactionManagement註解為我們做了什麼? 為什麼標註了@Transactional註解的方法就可以具有事務的特性,保持了資料的ACID特性?spring到底是如何具有這樣

SpringIOC原始碼分析spring ioc 容器的載入流程

轉載自:https://blog.csdn.net/a724888/article/details/72716632 一:spring ioc 容器的載入流程 1.目標:熟練使用spring,並分析其原始碼,瞭解其中的思想。這篇主要介紹spring ioc 容器的載入2.前提條件:會使用de

Android系統原理原始碼分析1:利用Java反射技術阻止通過按鈕關閉對話方塊

本文為原創,如需轉載,請註明作者和出處,謝謝!     眾所周知,AlertDialog類用於顯示對話方塊。關於AlertDialog的基本用法在這裡就不詳細介紹了,網上有很多,讀者可以自己搜尋。那

Go學習之go-ethereum【以太坊】原始碼分析

關於Go語言環境的安裝與配置,我在《入門篇》進行了詳細講解,有需要的朋友可以前往閱讀,本文進入當下比較火熱的區塊鏈專案 - 以太坊(go-ethereum)進行原始碼解讀。本文內容純屬個人見解,有錯誤理解或者不足之處還請見諒,歡迎一起交流學習。    - 環境準備    -

RxJava2原始碼分析:基本流程分析

前言:到現在這個階段,網上關於RxJava2原始碼分析的文章已經滿天飛了,我寫這篇文章的目的並不是說我會分析的比他們好,比他們透徹,這篇文章的目的只是單純的記錄自己分析RxJava2原始碼的成功及收穫。 概述   對於一個程式設計人的技術成長,一般會經歷三個階段,首先是學會使用開源庫,然後是知道

Spring基於註解形式的 AOP的原理流程原始碼解析

在Spring的配置類上添加註解@EnableAspectJAutoProxy: @Configuration @EnableAspectJAutoProxy(proxyTargetClass = true) public class MvcContextCo

串列埠的實現

Windows開啟串列埠,讀寫串列埠,自動識別串列埠 該串列埠讀寫是採用非同步方式,即非阻塞模式進行讀寫串列埠 串列埠名形如: "COM3", "COM4", "COM22"等 其中COM1至COM9能成功開啟,但是COM10及以上開啟都是失敗的,需要特殊處理 及COM10

Android7.0去電流程原始碼分析

2.去電從撥號盤介面有關撥號的部分由DialpadFragment.java實現,無論是單卡還是雙卡,當點選撥號按鍵時,最後都會呼叫handleDialButtonPressed方法進行處理,DialogFragmentCall_Action的活動Call_Ac

Tomcat原始碼分析 ----- 手一個web伺服器

作為後端開發人員,在實際的工作中我們會非常高頻地使用到web伺服器。而tomcat作為web伺服器領域中舉足輕重的一個web框架,又是不能不學習和了解的。 tomcat其實是一個web框架,那麼其內部是怎麼實現的呢?如果不用tomcat我們能自己實現一個web伺服器嗎? 首先,tomcat內部的實現是

Spark Streaming執行流程原始碼解析

本系列主要描述Spark Streaming的執行流程,然後對每個流程的原始碼分別進行解析 之前總聽同事說Spark原始碼有多麼棒,咱也不知道,就是瘋狂點頭。今天也來擼一下Spark原始碼。 對Spark的使用也就是Spark Streaming使用的多一點,所以就拿Spark Streaming開涮。

RecyclerView 原始碼分析 —— 繪製流程解析

概述 對於 RecyclerView 是那麼熟悉又那麼陌生。熟悉是因為作為一名 Android 開發者,RecyclerView 是經常會在專案裡面用到的,陌生是因為只是知道怎麼用,但是卻不知道 RecyclerView 的內部實現機制。 但凡是一位有所追求的開發者,都不會只讓自己停留在只會使用上,

Flume NG原始碼分析基於靜態properties檔案的配置模組

日誌收集是網際網路公司的一個重要服務,Flume NG是Apache的頂級專案,是分散式日誌收集服務的一個開源實現,具有良好的擴充套件性,與其他很多開源元件可以無縫整合。搜了一圈發現介紹Flume NG的文章有不少,但是深入分析Flume NG原始碼的卻沒有。準備寫一個系列分析一下Flume NG的