1. 程式人生 > >grpc的簡單用例 (C++實現)

grpc的簡單用例 (C++實現)

ret ESS 其中 取名字 實現 ssa date min iter

這個用例的邏輯很簡單, 服務器運行一個管理個人信息的服務, 提供如下的四個服務:

(1) 添加一個個人信息  

註: 對應於Unary RPCs, 客戶端發送單一消息給服務器, 服務器返回單一消息

(2) 添加多個個人信息  

註: 對應於Client streaming RPCs, 客戶端使用提供的stream發送多個消息給服務端, 等客戶端寫完了所有的消息, 就會等待服務器讀取這些消息, 然後返回響應消息. gRPC保證在一次RPC調用中, 消息是順序的.

(3) 獲取最多N個個人信息

註: 對應於Server streaming RPCs, 客戶端發送一條消息給服務端, 然後獲取一個stream來讀取一系列的返回消息. 客戶端會一直讀取消息, 知道沒有消息可讀為止, gRPC保證在一次RPC調用中,消息是順序的.

(4) 獲取指定名字的所有個人信息

註: 對應於Bidirectional streaming RPCs, 這種rcp, 客戶端和服務端通過一個read-write stream來發送一系列的消息. 這兩個消息流可以獨立操作, 就是說, 客戶端和服務端可以以任意它們所想的順序操作這兩個消息流. 例如, 服務器可以等待接收到所有的客戶端消息時,才開始向客戶端發送消息, 或者它可以讀一條消息, 然後給客戶端發送一條消息, 或者別的想要的方式. 在兩個消息流的其中一個中, 消息是順序的.

在給出代碼之前, 先說明一件事, 在grpc中, 請求參數和返回值類型都需要是message類型, 而不能是string, int32等類型.下面給出proto文件的定義:

// [START declaration]
syntax = "proto3";
package tutorial;

import "google/protobuf/timestamp.proto";
// [END declaration]

// [START messages]
message Person {
    string name = 1;
    int32 id = 2;   // Unique ID number for this person.
    string email = 3;

    enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
    }

    message PhoneNumber {
        string number = 1;
        PhoneType type = 2;
    }

    repeated PhoneNumber phones = 4;

    google.protobuf.Timestamp last_updated = 5;
}

// Our address book file is just one of these.
message AddressBook {
    repeated Person people = 1;
}

// rpc調用的結果
message Result {
    bool success = 1;
}

// rpc請求的個數
message ReqNum {
    int32 num = 1;
}

message ReqName {
    string name = 1;
}

// [END messages]

// Interface exported by the server.
service Manage {
    // 添加一個人
    rpc AddPerson(Person) returns (Result) {}
    // 添加很多人
    rpc AddPersons(stream Person) returns (Result) {}
    // 獲取指定數目的個人列表
    rpc GetPersonsLimit(ReqNum) returns (stream Person) {}
    // 獲取名字為輸入的個人列表
    rpc GetPersons(stream ReqName) returns (stream Person) {}
}

Person的定義和之前的protobuf中一致, 新加了一些用於grpc調用的結構體, 這些結構體很簡單, 就不講了. service Manage中定義的是這個服務提供的rpc調用接口.

(1) 添加一個個人信息 對應的是 AddPerson

(2) 添加多個個人信息 對應的是 AddPersons

(3) 獲取最多N個個人信息 對應的是 GetPersonsLimit

(4) 獲取指定名字的所有個人信息 對應的是 GetPersons

rpc定義很直觀, 應該可以參照寫出需要的rpc, 按照我了解的, 每個rpc有一個輸入參數和一個輸出參數, 這個需要註意.

下面給出服務端重載proto的Manage服務的代碼:

#include <string>
#include <grpc/grpc.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <folly/concurrency/ConcurrentHashMap.h>
#include "person.grpc.pb.h"

class PersonManager {
public:
    explicit PersonManager() {
    }

    // AddPerson 用來添加一個人
    bool AddPerson(const tutorial::Person& p) {
        m_persons.insert(p.name(), p);
        return true;
    }

    // GetPerson 用來查找一個人
    tutorial::Person GetPerson(const std::string& name) const {
        return m_persons.at(name);
    }

    // GetPersons 用來獲取多個人
    std::vector<tutorial::Person> GetPersons(int num) const {
        std::vector<tutorial::Person> personList;
        auto it = m_persons.begin();
        while (it != m_persons.end()) {
            if (static_cast<int>(personList.size()) > num) {
                return personList;
            }
            personList.push_back(it->second);
            ++it;
        }
        return personList;
    }

private:
    folly::ConcurrentHashMap<std::string, tutorial::Person> m_persons;
};

class PersonService : public tutorial::Manage::Service {
public:
    explicit PersonService() {
    }

    // AddPerson 用來添加一個人
    grpc::Status AddPerson(grpc::ServerContext* context, const tutorial::Person *person, 
            tutorial::Result *res) override {
        m_mgr.AddPerson(*person);
        res->set_success(true);
        return grpc::Status::OK;
    }

    // AddPersons 用來添加多個用戶
    grpc::Status AddPersons(grpc::ServerContext* context, grpc::ServerReader<tutorial::Person>* reader,
            tutorial::Result *res) override {
        tutorial::Person person;
        while (reader->Read(&person)) {
            m_mgr.AddPerson(person);
        }
        res->set_success(true);
        return grpc::Status::OK;
    }

    // GetPersonsLimit 用來查詢一個人
    grpc::Status GetPersonsLimit(grpc::ServerContext* context, const tutorial::ReqNum *num,
            grpc::ServerWriter<tutorial::Person>* writer) override {
        auto persons = m_mgr.GetPersons(num->num());
        for (const auto& person : persons) {
            writer->Write(person);
        }
        return grpc::Status::OK;
    }

    // GetPersons 用來根據人名獲取所有的人
    grpc::Status GetPersons(grpc::ServerContext *context, 
            grpc::ServerReaderWriter<tutorial::Person, tutorial::ReqName>* stream) override {
        tutorial::ReqName name;
        while (stream->Read(&name)) {
            try {
                stream->Write(m_mgr.GetPerson(name.name()));
            } catch (const std::out_of_range& ex) {
                // 如果出現越界的問題, 則說明不存在
            }
        }
        return grpc::Status::OK;
    }

private:
    PersonManager m_mgr;
};

下面給出創建grpc服務器的代碼:

#include <grpcpp/resource_quota.h>
#include "person_manage.h"

// maxThreadNum 根據計算機硬件設置
const int maxThreadNum = 20;

void RunServer() {
    std::string server_address("localhost:50001");
    PersonService service;

    grpc::ServerBuilder builder;
    grpc::ResourceQuota quota;
    quota.SetMaxThreads(maxThreadNum);
    builder.SetResourceQuota(quota);
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    builder.RegisterService(&service);
    std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
    std::cout << "Server listening on " << server_address << std::endl;
    server->Wait();
}

int main(int argc, char** argv) {
    RunServer();

    return 0;
}

下面給出客戶端對proto中的Manage服務的封裝代碼:

#include <memory>
#include <vector>
#include <thread>
#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include "person.grpc.pb.h"

class PersonManip {
public:
    PersonManip(std::shared_ptr<grpc::Channel> channel)
        : m_stub(tutorial::Manage::NewStub(channel)) {
    }

    // 添加一個用戶
    bool AddPerson(const tutorial::Person& person) {
        grpc::ClientContext context;
        tutorial::Result res;
        grpc::Status status = m_stub->AddPerson(&context, person, &res);
        if (!status.ok()) {
            std::cout << "status error: " << status.error_message() << std::endl;
            return false;
        }
        return res.success();
    }

    // 添加多個用戶, 當前的服務端實現可能造成部分插入的情況
    bool AddPersons(const std::vector<tutorial::Person>& persons) {
        grpc::ClientContext context;
        tutorial::Result res;
        std::unique_ptr<grpc::ClientWriter<tutorial::Person>> writer(
                m_stub->AddPersons(&context, &res));
        for (const auto& person : persons) {
            if (!writer->Write(person)) {
                // Broken stream.
                break;
            }
        }
        writer->WritesDone();
        grpc::Status status = writer->Finish();
        if (!status.ok()) {
            std::cout << "status error: " << status.error_message() << std::endl;
            return false;
        }

        return res.success();
    }

    // 獲取限定數目的用戶
    bool GetPersonsLimit(int limitNum, std::vector<tutorial::Person>& persons) {
        grpc::ClientContext context;
        tutorial::ReqNum limit;
        limit.set_num(limitNum);
        std::unique_ptr<grpc::ClientReader<tutorial::Person>> reader(
                m_stub->GetPersonsLimit(&context, limit));
        tutorial::Person person;
        while (reader->Read(&person)) {
            persons.push_back(person);
        }
        grpc::Status status = reader->Finish();
        if (!status.ok()) {
            std::cout << "status error: " << status.error_message() << std::endl;
            return false;
        }

        return true;
    }

    // 獲取所有指定名字的用戶
    bool GetPersons(const std::vector<std::string>& personNames, std::vector<tutorial::Person>& persons) {
        grpc::ClientContext context;

        std::shared_ptr<grpc::ClientReaderWriter<tutorial::ReqName, tutorial::Person>> stream(
                m_stub->GetPersons(&context));
        std::thread writer([stream, &personNames]() {
                for (const auto& personName : personNames) {
                    tutorial::ReqName name;
                    name.set_name(personName);
                    stream->Write(name);
                }
                stream->WritesDone();
        });

        tutorial::Person person;
        while (stream->Read(&person)) {
            persons.push_back(person);
        }
        writer.join();
        grpc::Status status = stream->Finish();
        if (!status.ok()) {
            std::cout << "status error: " << status.error_message() << std::endl;
            return false;
        }

        return true;
    }

private:
    std::unique_ptr<tutorial::Manage::Stub> m_stub;
};

下面給出客戶端測試的代碼:

#include "person_manip.h"

tutorial::Person makePerson(const std::string& name, int id, 
        const std::string& email) {
    tutorial::Person person;
    person.set_name(name);
    person.set_id(id);
    person.set_email(email);
    return person;
}

void printPersons(const std::vector<tutorial::Person>& persons) {
    for (const auto& p : persons) {
        std::cout << "name: " << p.name() << " "
            << "id: " << p.id() << " "
            << "email: " << p.email() << std::endl;
    }
    std::cout << std::endl;
}

int main(int argc, char **argv) {
    PersonManip manip(
            grpc::CreateChannel("localhost:50001",
                grpc::InsecureChannelCredentials()));
    auto person = makePerson("Tom", 1, "[email protected]");
    auto suc = manip.AddPerson(person);
    if (!suc) {
        std::cout << "manip.AddPerson failed." << std::endl;
        return -1;
    }

    person = makePerson("Lilly", 2, "[email protected]");
    auto person2 = makePerson("Jim", 3, "[email protected]");
    
    std::vector<tutorial::Person> persons{person, person2};
    suc = manip.AddPersons(persons);
    if (!suc) {
        std::cout << "manip.AddPersons failed." << std::endl;
        return -1;
    }

    std::vector<tutorial::Person> resPersons;
    suc = manip.GetPersonsLimit(5, resPersons);
    if (!suc) {
        std::cout << "manip.GetPersonsLimit failed." << std::endl;
        return -1;
    }
    std::cout << "manip.GetPersonsLimit output:" << std::endl;
    printPersons(resPersons);

    resPersons.clear();
    std::vector<std::string> personNames;
    for (const auto& p : persons) {
        personNames.push_back(p.name());
    }
    suc = manip.GetPersons(personNames, resPersons);
    if (!suc) {
        std::cout << "manip.GetPersons failed." << std::endl;
        return -1;
    }
    std::cout << "manip.GetPersons output:" << std::endl;
    printPersons(resPersons);
    return 0;
}

這個我沒有使用單元測試, 可能使用單元測試會更好, 不過根據客戶端代碼和輸出, 也可以驗證服務的正確性.

完整的代碼參考: https://github.com/ss-torres/person-service.git

如果有什麽建議或者提議, 歡迎提出

grpc的簡單用例 (C++實現)