1. 程式人生 > >藉助shared_ptr實現copy-on-write以提高多執行緒併發效能

藉助shared_ptr實現copy-on-write以提高多執行緒併發效能

        鎖競爭是伺服器效能四大殺手之一(其他三個是:資料拷貝、環境切換、動態資源申請),本文將基於之前釋出的kimgbo網路I/O庫,以一個多執行緒群發聊天伺服器的實現為例,介紹如何藉助shared_ptr提高多執行緒併發的效能。

        多執行緒群發聊天伺服器實現的功能是,客戶端連線伺服器後,可以向伺服器傳送訊息(訊息=訊息頭+訊息體),伺服器負責將訊息轉發給其他正處於連線狀態的客戶端(包括髮送訊息的客戶端)。示意圖如下:

                   

       傳統的基於Reactor模式的伺服器,使用工作執行緒池來處理連線請求,並通過在操作之前加鎖的方式來保護連線佇列的資料安全。多執行緒對於請求佇列的取出和插入操作實際上是序列的,整個伺服器的併發效能較差。如果能讓插入和取出處理轉發任務的兩個操作實現並行,則能夠大大提升伺服器的效能。

        shared_ptr是採用引用計數方式的智慧指標,如果當前只有一個觀察者,則其引用計數為1,可以通過shared_ptr::unique()判斷,通過shared_ptr實現copy-on-write的原理如下:

1、read端在讀之前將引用計數加1,讀完將引用計數減1,這樣可以保證在讀期間其引用計數大於1,可以阻止併發寫。

大致的流程如下:

    ConnectionListPtr connections = getConnectionList(); /*重新讓一個shared_ptr指向連線佇列,引用計數加1*/
    for (ConnectionList::iterator it = connections->begin(); it != connections->end(); ++it) /*讀資料*/
    {
      m_codec.send((*it).get(), message); /*業務處理*/
    }

2、write端在寫之前先檢查引用計數是否為1,如果為1則直接修改。

3、write端寫時如果發現引用計數大於1,則說明此時資料正在被read,則不能再原來的資料上併發寫,應該建立一個副本,並在副本上修改,然後用副本替換以前的資料。

大致的流程如下:

    MutexLockGuard lock(m_mutex); /*此處需要加一下鎖,但是僅僅是在寫入是加鎖,減少了讀時鎖的使用*/
    if(!m_connections.unique())
    {
    m_connections.reset(new ConnectionList(*m_connections)); /*如果引用計數大於1,則建立一個副本並在副本上修改,shared_ptr通過reset操作後會使引用計數減1,原先的資料在read結束後引用計數會減為0,進而被系統釋放*/
    }
    assert(m_connections.unique()); /*斷言新建立的副本引用計數一定為1*/
    

    /*下面執行相關業務處理*/
    if (conn->connected())
    { 
        m_connections->insert(conn);
    }
    else
    {
      m_connections->erase(conn);
    }

以上就是copy-on-write的大致實現流程,下面給出伺服器程式的核心程式碼,全部程式碼參見kimgbo網路庫的example/chat目錄下https://github.com/kimg-bo/kimgbo ,kimgbo網路庫的使用方式和muduo基本類似。核心程式碼如下:

#include <stdio.h>
#include <set>
#include <functional>
#include <memory>
#include "Logging.h"
#include "Mutex.h"
#include "EventLoop.h"
#include "TcpServer.h"
#include "codec.h"

using namespace kimgbo;
using namespace kimgbo::net;
	
class ChatServer
{
public:
  ChatServer(EventLoop* loop,
             const InetAddress& listenAddr)
  : m_loop(loop),
    m_server(m_loop, listenAddr, "ChatServer"),
    m_codec(std::bind(&ChatServer::onStringMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
    m_connections(new ConnectionList)
  {
    m_server.setConnectionCallback(
        std::bind(&ChatServer::onConnection, this, std::placeholders::_1));
    m_server.setMessageCallback(
        std::bind(&LengthHeaderCodec::onMessage, &m_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  }

  void setThreadNum(int numThreads)
  {
    m_server.setThreadNum(numThreads);
  }

  void start()
  {
    m_server.start();
  }

private:
	typedef std::set<TcpConnectionPtr> ConnectionList; //存放連結的集合
	typedef std::shared_ptr<ConnectionList> ConnectionListPtr; //指向連線集合的shared_ptr
	
  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_INFO << conn->localAddress().toIpPort() << " -> "
        << conn->peerAddress().toIpPort() << " is "
        << (conn->connected() ? "UP" : "DOWN");

    MutexLockGuard lock(m_mutex); //write加鎖
    LOG_INFO << "lock(m_mutex) ok"; 
    if(!m_connections.unique()) //檢查引用計數
    {
    	LOG_INFO << "m_connections.unique()."; 
    	m_connections.reset(new ConnectionList(*m_connections)); //如果大於1,建立副本
    }
    assert(m_connections.unique());
    
    if (conn->connected())
    {
    	LOG_INFO << "insert before."; 
      m_connections->insert(conn);
      LOG_INFO << "insert ok"; 
    }
    else
    {
      m_connections->erase(conn);
    }
  }
  
  ConnectionListPtr getConnectionList() //獲取連結集合
  {
  	MutexLockGuard lock(m_mutex);
  	return m_connections;
  }

  void onStringMessage(const TcpConnectionPtr&, const kimgbo::string& message, Timestamp)
  {
    ConnectionListPtr connections = getConnectionList(); //read操作直接讀
    for (ConnectionList::iterator it = connections->begin(); it != connections->end(); ++it)
    {
      m_codec.send((*it).get(), message);
    }
  }
  
private:
  EventLoop* m_loop;
  TcpServer m_server;
  LengthHeaderCodec m_codec;
  MutexLock m_mutex;
  ConnectionListPtr m_connections;
};

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid();
  if (argc > 1)
  {
    EventLoop loop;
    uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
    InetAddress serverAddr(port);
    ChatServer server(&loop, serverAddr);
    if (argc > 2)
    {
      server.setThreadNum(atoi(argv[2]));
    }
    server.start();
    loop.loop();
  }
  else
  {
    printf("Usage: %s port [thread_num]\n", argv[0]);
  }
}

kimgbo開源網路I/O庫見:https://github.com/kimg-bo/kimgbo

相關推薦

藉助shared_ptr實現copy-on-write提高執行併發效能

        鎖競爭是伺服器效能四大殺手之一(其他三個是:資料拷貝、環境切換、動態資源申請),本文將基於之前釋出的kimgbo網路I/O庫,以一個多執行緒群發聊天伺服器的實現為例,介紹如何藉助shared_ptr提高多執行緒併發的效能。         多執行緒群發聊天伺

shared_ptr實現copy-on-write

最近再看陳碩的《Linux多執行緒服務端程式設計使用muduoC++網路庫》,2.8節看到這個內容:使用shared_ptr實現copy-on-write的手法降低鎖競爭。 目的: 利用普通mutex替換讀寫鎖 shared_ptr是引用技術型智慧指標,當只有一個觀察者時

理解 shared_ptr實現copy-on-write(COW)

shared_ptr實現COW(Copy-On-Write) 前不久在《Linux多執行緒服務端程式設計使用muduoC++網路庫》2.8節看到這個內容,一直沒有真正理解,後來在書中7.3中再次提到使用shared_ptr實現copy-on-write的手法降

shared_ptr實現copy-on-write (1)

    在《Linux多執行緒服務端程式設計使用muduoC++網路庫》2.8節說“借shared_ptr實現copy-on-write”。那麼copy-on-write是怎樣的技術?    COW(Copy-On-Write)通過淺拷貝(shallow copy)只複製引

OpenStack 整合使用Ceph實現(Copy-On-Write)克隆 (筆記)

作者:zhangshen發表於:2014-08-28 本文使用的系統環境: CentOS6.5 四臺機器 規劃如下: HostName IP        安裝服務 c01 192.168.40.101 mon mds osd c02 192.168.40.10

SpringBoot實戰實現分散式鎖一之重現執行併發場景

實戰前言:上篇博文我總體介紹了我這套視訊課程:“SpringBoot實戰實現分散式鎖” 總體涉及的內容,從本篇文章開始,我將開始介紹其中涉及到的相關知識要點,感興趣的小夥伴可以關注關注學習學習!!工欲善其事,必先利其器,介紹分散式鎖使用的前因後果之前,得先想辦法說清楚為啥需要分散式鎖以及

乾貨!執行池+CountDownLatch,實現 執行併發計算、彙總

目錄結構 抽象類:求和器 單執行緒 求和器 VS 多執行緒 求和器 1)執行緒池 多個執行緒 一起併發執行,效能很生猛 2)CountDownLatch 主執行緒 使用 latch.await() 阻塞住,直到所有 子任務 都執行完畢了

QT實現哈夫曼壓縮(執行

本人菜雞程式設計師一枚,最近剛剛學習的資料結構中的哈夫曼樹,就用QT寫了一個哈夫曼壓縮,話不多說先上步驟,再上程式碼。(如果有更好的想法,歡迎指點) 1.先寫出建最小堆和建哈夫曼樹程式碼(建最小堆的程式碼可以通過STL中的堆代替) 2.寫出壓縮類的程式碼,類中

3種方式實現python執行併發處理

標籤: python奇淫技巧 最優執行緒數 Ncpu=CPU的數量 Ucpu=目標CPU使用率 W/C=等待時間與計算時間的比率 為保持處理器達到期望的使用率,最優的執行緒池的大小等於$$Nthreads=Ncpu*Ucpu*(1+W/C$$ cpu密集型任務,即$W<

OkHttp實現執行併發下載的筆記

         打個廣告,不瞭解OkHttp的話可以先看下  http://blog.csdn.net/brycegao321/article/details/51830525             需求:  手機拍攝若干張照片, 在wifi連線下上傳到伺服器。  

Callable+ThreadPoolExecutor實現執行併發並獲得返回值

前言 經常會遇到一些效能問題,比如呼叫某個介面,可能要迴圈呼叫100次,並且需要拿到每一次呼叫的返回結果,通常我們都是放在for迴圈中一次次的序列呼叫,這種方式可想而知道有多慢,那怎麼解決這個問題呢? 多執行緒 為了解決以上問題,我使用的方式是多執行緒。多

Java執行/併發20、Future實現類:FutureTask

FutureTask是future的實現類,它同時實現了兩個介面:Runnable和Future,所以它既可以作為Runnable被執行緒執行,又可以作為Future得到Callable的返回值。 因此我們可以: - 呼叫FutureTask物件的ru

python 實現執行併發執行 【join函式】

主執行緒啟動一個子執行緒t並等到t執行緒結束後才執行: import threading import time def reading(): for i in range(5): print("reading", i) time.

執行併發下的單例模式實現

1.1 天生執行緒安全的餓漢式單例 1.2 懶漢式單例 1.2.1 執行緒不安全的懶漢式單例 1.2.2 執行緒安全的懶漢式單例

怎麼實現springMVC 執行併發

我們知道Spring通過各種DAO模板類降低了開發者使用各種資料持久技術的難度。這些模板類都是執行緒安全的,也就是說,多個DAO可以複用同一個模板例項而不會發生衝突。 我們使用模板類訪問底層資料,根據持久化技術的不同,模板類需要繫結資料連線或會話的資源。但這些資源本身是非執

Windows下基於socket執行併發通訊的實現

    本文介紹了在Windows 作業系統下基於TCP/IP 協議Socket 套介面的通訊機制以及多執行緒程式設計知識與技巧,並給出多執行緒方式實現多使用者與服務端(C/S)併發通訊模型的詳細演算法,最後展現了用C++編寫的多使用者與伺服器通訊的應用例項並附有程式。 關

Java提高——執行(二)join、sleep、yield

join、sleep、yield都是Thread類的方法join執行緒join()方法:讓“主執行緒”執行緒等待“子執行緒”執行完之後再執行。//子執行緒 public class son extends Thread(){ void run(){

Java提高——執行(五)生產消費者問題

生產者/消費者問題是個典型的多執行緒問題,類似於hello world對於一門程式語言而言,涉及的物件包括“生產者”、“消費者”、“倉庫”和“產品”。該模型需要注意以下幾點:1、生產者只有在倉庫未滿的時候生產,倉滿則停止生產。2、消費者只有在倉庫有產品的情況下才能消費,空倉則

如何提高執行程式的cpu利用率

  正如大家所知道的那樣,多核多cpu越來越普遍了,而且編寫多執行緒程式也是件很簡單的事情。在Windows下面,呼叫CreateThread函式一次就能夠以你想要的函式地址新建一個子執行緒執行。然後,事情確實你發現建立多執行緒根本沒有讓程式快多少,也沒有提高多少cpu利用率

java 執行併發系列之 生產者消費者模式的兩種實現

生產者消費者模式是併發、多執行緒程式設計中經典的設計模式,生產者和消費者通過分離的執行工作解耦,簡化了開發模式,生產者和消費者可以以不同的速度生產和消費資料。真實世界中的生產者消費者模式生產者和消費者模式在生活當中隨處可見,它描述的是協調與協作的關係。比如一個人正在準備食物(