區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端一
阿新 • • 發佈:2018-10-31
概述 關註 ror end int sign port contex 結構 區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端一,2018年下半年,區塊鏈行業正逐漸褪去發展之初的浮躁、回歸理性,表面上看相關人才需求與身價似乎正在回落。但事實上,正是初期泡沫的漸退,讓人們更多的關註點放在了區塊鏈真正的技術之上。
Fabric 1.0源代碼筆記 之 gossip(流言算法) #GossipServer(Gossip服務端)
1、GossipServer概述
GossipServer相關代碼,分布在protos/gossip、gossip/comm目錄下。目錄結構如下:
- protos/gossip目錄:
????* message.pb.go,GossipClient接口定義及實現,GossipServer接口定義。 - gossip/comm目錄:
???? comm.go,Comm接口定義。
???? conn.go,connFactory接口定義,以及connectionStore結構體及方法。
???? comm_impl.go,commImpl結構體及方法(同時實現GossipServer接口/Comm接口/connFactory接口)。
???? demux.go,ChannelDeMultiplexer結構體及方法。
2、GossipClient接口定義及實現
2.1、GossipClient接口定義
type GossipClient interface { ????// GossipStream is the gRPC stream used for sending and receiving messages ????GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error) ????// Ping is used to probe a remote peer‘s aliveness ????Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) } //代碼在protos/gossip/message.pb.go
2.2、GossipClient接口實現
type gossipClient struct { ????cc *grpc.ClientConn } func NewGossipClient(cc *grpc.ClientConn) GossipClient { ????return &gossipClient{cc} } func (c *gossipClient) GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error) { ????stream, err := grpc.NewClientStream(ctx, &_Gossip_serviceDesc.Streams[0], c.cc, "/gossip.Gossip/GossipStream", opts...) ????if err != nil { ????????return nil, err ????} ????x := &gossipGossipStreamClient{stream} ????return x, nil } func (c *gossipClient) Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) { ????out := new(Empty) ????err := grpc.Invoke(ctx, "/gossip.Gossip/Ping", in, out, c.cc, opts...) ????if err != nil { ????????return nil, err ????} ????return out, nil } //代碼在protos/gossip/message.pb.go
2.3、Gossip_GossipStreamClient接口定義及實現
type Gossip_GossipStreamClient interface {
????Send(*Envelope) error
????Recv() (*Envelope, error)
????grpc.ClientStream
}
type gossipGossipStreamClient struct {
????grpc.ClientStream
}
func (x *gossipGossipStreamClient) Send(m *Envelope) error {
????return x.ClientStream.SendMsg(m)
}
func (x *gossipGossipStreamClient) Recv() (*Envelope, error) {
????m := new(Envelope)
????if err := x.ClientStream.RecvMsg(m); err != nil {
????????return nil, err
????}
????return m, nil
}
//代碼在protos/gossip/message.pb.go
3、GossipServer接口定義
3.1、GossipServer接口定義
type GossipServer interface {
????// GossipStream is the gRPC stream used for sending and receiving messages
????GossipStream(Gossip_GossipStreamServer) error
????// Ping is used to probe a remote peer‘s aliveness
????Ping(context.Context, *Empty) (*Empty, error)
}
func RegisterGossipServer(s *grpc.Server, srv GossipServer) {
????s.RegisterService(&_Gossip_serviceDesc, srv)
}
func _Gossip_GossipStream_Handler(srv interface{}, stream grpc.ServerStream) error {
????return srv.(GossipServer).GossipStream(&gossipGossipStreamServer{stream})
}
func _Gossip_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
????in := new(Empty)
????if err := dec(in); err != nil {
????????return nil, err
????}
????if interceptor == nil {
????????return srv.(GossipServer).Ping(ctx, in)
????}
????info := &grpc.UnaryServerInfo{
????????Server: srv,
????????FullMethod: "/gossip.Gossip/Ping",
????}
????handler := func(ctx context.Context, req interface{}) (interface{}, error) {
????????return srv.(GossipServer).Ping(ctx, req.(*Empty))
????}
????return interceptor(ctx, in, info, handler)
}
var _Gossip_serviceDesc = grpc.ServiceDesc{
????ServiceName: "gossip.Gossip",
????HandlerType: (*GossipServer)(nil),
????Methods: []grpc.MethodDesc{
????????{
????????????MethodName: "Ping",
????????????Handler: _Gossip_Ping_Handler,
????????},
????},
????Streams: []grpc.StreamDesc{
????????{
????????????StreamName: "GossipStream",
????????????Handler: _Gossip_GossipStream_Handler,
????????????ServerStreams: true,
????????????ClientStreams: true,
????????},
????},
????Metadata: "gossip/message.proto",
}
//代碼在protos/gossip/message.pb.go
3.2、Gossip_GossipStreamServer接口定義及實現
type Gossip_GossipStreamServer interface {
????Send(*Envelope) error
????Recv() (*Envelope, error)
????grpc.ServerStream
}
type gossipGossipStreamServer struct {
????grpc.ServerStream
}
func (x *gossipGossipStreamServer) Send(m *Envelope) error {
????return x.ServerStream.SendMsg(m)
}
func (x *gossipGossipStreamServer) Recv() (*Envelope, error) {
????m := new(Envelope)
????if err := x.ServerStream.RecvMsg(m); err != nil {
????????return nil, err
????}
????return m, nil
}
//代碼在protos/gossip/message.pb.go
4、Comm接口/connFactory接口定義
4.1、Comm接口定義
type Comm interface {
????//返回此實例的 PKI id
????GetPKIid() common.PKIidType
????//向節點發送消息
????Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)
????//探測遠程節點是否有響應
????Probe(peer *RemotePeer) error
????//握手驗證遠程節點
????Handshake(peer *RemotePeer) (api.PeerIdentityType, error)
????Accept(common.MessageAcceptor) <-chan proto.ReceivedMessage
????//獲取懷疑脫機節點的只讀通道
????PresumedDead() <-chan common.PKIidType
????//關閉到某個節點的連接
????CloseConn(peer *RemotePeer)
????//關閉
????Stop()
}
//代碼在gossip/comm/comm.go
4.2、connFactory接口定義
type connFactory interface {
????createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
}
//代碼在gossip/comm/conn.go
5、commImpl結構體及方法(同時實現GossipServer接口/Comm接口/connFactory接口)
5.1、commImpl結構體定義
type commImpl struct {
????selfCertHash []byte
????peerIdentity api.PeerIdentityType
????idMapper identity.Mapper
????logger *logging.Logger
????opts []grpc.DialOption
????secureDialOpts func() []grpc.DialOption
????connStore *connectionStore
????PKIID []byte
????deadEndpoints chan common.PKIidType
????msgPublisher *ChannelDeMultiplexer
????lock *sync.RWMutex
????lsnr net.Listener
????gSrv *grpc.Server
????exitChan chan struct{}
????stopWG sync.WaitGroup
????subscriptions []chan proto.ReceivedMessage
????port int
????stopping int32
}
//代碼在gossip/comm/comm_impl.go
區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端一