1. 程式人生 > >gRPC 實現多執行緒非同步服務端

gRPC 實現多執行緒非同步服務端

// AsyncServer_Demo.cpp : 定義控制檯應用程式的入口點。
//

#include "stdafx.h"
#include <memory>
#include <iostream>
#include <string>
#include <thread>
#include <process.h>
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>

#include "asyncservice.grpc.pb.h"


using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using cecily::Student;
using cecily::Teacher;
using cecily::School;




class ServerImpl final{
public:
	~ServerImpl() {
		server_->Shutdown();   //停止服務
		cq_->Shutdown();
	}

	void Run() {
		std::string server_address("0.0.0.0:66666");
		ServerBuilder builder;
		builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
		builder.RegisterService(&service_);
		cq_ = builder.AddCompletionQueue();
		server_ = builder.BuildAndStart();
		std::cout << "Server listening on " << server_address << std::endl;
		//註冊服務 通過CallData物件的狀態,在初始狀態下,將對應型別的CallData物件傳遞進入ServerCompletionQueue  的 cq_物件中
		//當有服務請求過來的時候 呼叫next 能得到對應的請求, 請求型別通過CallData物件的s_type_值區分,用來區分不同的服務,做不同的處理
		new CallData(&service_, cq_.get(), ServerImpl::CallData::SS_HelloStudent);
		new CallData(&service_, cq_.get(), ServerImpl::CallData::SS_HelloTeacher);
		//註冊服務  應該有多少個服務有要註冊多少個服務, 例子只有兩個服務
		//開啟多執行緒處理rpc呼叫請求
		for ( int i = 0 ; i < 8; i++ )
		{
			_beginthreadex(NULL,
				0,
				ServerImpl::ThreadHandlerRPC,
				(void*)this,
				0,
				0);
		}
		
	}

private:
	class CallData {
	public:
		enum ServiceType {
			SS_HelloStudent = 0,
			SS_HelloTeacher
		};
	public:
		CallData(School::AsyncService* service, ServerCompletionQueue* cq, ServiceType s_type)
		:service_(service),cq_(cq),s_type_(s_type), student_response(&ctx_), teacher_response(&ctx_), status_(CREATE){
			Process();
		}

		void Process() {
			if ( status_ == CREATE )
			{
				status_ = PROCESS;
				switch (s_type_)
				{
					//根據不同的服務 註冊不同的 服務型別到 ServerCompletionQueue 佇列, 看名字和使用有點像完成埠  (沒有去驗證研究)
				case ServerImpl::CallData::SS_HelloStudent:
					service_->RequestHelloStudent(&ctx_, &student_request, &student_response, cq_, cq_, this);
					break;
				case ServerImpl::CallData::SS_HelloTeacher:
					service_->RequestHelloTeacher(&ctx_, &teacher_request, &teacher_response, cq_, cq_, this);
					break;
				default:
					break;
				}
			}
			else if (status_ == PROCESS) {
				status_ = FINISH;
				new CallData(service_, cq_, this->s_type_);
				switch (s_type_)
				{
				case ServerImpl::CallData::SS_HelloStudent: {
					std::string name = student_request.name();
					int age = student_request.age();
					std::string prefix("Cecily HelloStudent: ");
					int nThreadID = GetCurrentThreadId();
					char szTmp[20] = { 0 };
					sprintf_s(szTmp,20, "執行緒:%d", nThreadID);
					std::string end(szTmp);
					teacher_request.set_name(prefix + name + end);
					status_ = FINISH;
					
					student_response.Finish(teacher_request, Status::OK, this);
				}
					break;
				case ServerImpl::CallData::SS_HelloTeacher: {
					std::string schoolname = teacher_request.school();
					std::string prefix("Cecily HelloTeacher: ");
					int nThreadID = GetCurrentThreadId();
					char szTmp[20] = { 0 };
					sprintf_s(szTmp, 20, "執行緒:%d", nThreadID);
					std::string end(szTmp);
					student_request.set_name(prefix + schoolname +end);
					status_ = FINISH;
					teacher_response.Finish(student_request, Status::OK, this);
				}
					break;
				default:
					break;
				}
			}
			else {
				GPR_ASSERT(status_ == FINISH);
				delete this;
			}
			
		}

	

	private:
		School::AsyncService* service_;
		ServerCompletionQueue* cq_;
		ServerContext ctx_;
		ServiceType s_type_;
		Student student_request;
		::grpc::ServerAsyncResponseWriter< ::cecily::Teacher> student_response;
		Teacher teacher_request;
		::grpc::ServerAsyncResponseWriter< ::cecily::Student> teacher_response;

		enum CallStatus { CREATE, PROCESS, FINISH };
		CallStatus status_; 
	};

private:

	static unsigned __stdcall ThreadHandlerRPC(void* lparam) {
		ServerImpl* impl = (ServerImpl*)lparam;
		impl->HandleRPCS();
		return 1;
	}
	void HandleRPCS() {
		void* tag; 
		bool ok;
		while (true) {
			GPR_ASSERT(cq_->Next(&tag, &ok));//從ServerCompletionQueue 佇列拿到獲取請求任務,根據請求任務處理 邏輯
			GPR_ASSERT(ok);
			static_cast<CallData*>(tag)->Process();
		}
	}


private:
	std::shared_ptr<ServerCompletionQueue> cq_;
	School::AsyncService service_;
	std::shared_ptr<Server> server_;

};


int main()
{
	ServerImpl server;
	server.Run();
	char c;
	std::cin >> c;
	system("pause");
    return 0;
}

以上內容屬於學習記錄隨筆,因為從接觸到寫這個部落格只有兩天時間,中間難免會有不正確的地方,後續發現再更改。