以太坊原始碼深入分析(4)-- 以太坊RPC通訊例項和原理程式碼分析(下)
阿新 • • 發佈:2019-01-28
上一節我們試著寫了一個RPC的請求例項,通過分析原始碼知道了RPC服務的建立流程,以及Http RPC server建立過程,Http RPC Client的請求流程。
這一節,先分析一下Http RPC server如何處理client的請求。然後再分析一下IPC RPC的處理流程。
先過濾掉非法的請求,對接收到的請求body體,進行JSONCodec封裝。
然後交由 srv.ServeSingleRequest(codec, OptionMethodInvocation)處理。
接著呼叫 s.serveRequest(codec, true, options)
singleShot引數是控制請求時同步還是非同步。如果singleShot為true,那麼請求的處理是同步的,需要等待處理結果之後才能退出。 singleShot為false,把處理請求的方法交由goroutine非同步處理。
Http RPC的處理是使用同步方式。
將codec add進s.codecs,codecs是一個set。
處理完請求資料,返回時需要從s.codecs remove 這個codec
對s.codecs的add 和 remove需要新增互斥鎖,保證s.codecs的執行緒安全。
s.readRequest(codec) 處理請求的codec資料。
readRequest(codec ServerCodec)方法對rpcRequest再處理加工一下,然後返回。
回到serveRequest方法,繼續分析s.exec(ctx, codec, reqs[0])的實現
reply := req.callb.method.Func.Call(arguments) 執行了RPC方法並返回結果reply。
codec.CreateResponse(req.id, reply[0].Interface())是rpc.json.go對返回結果的封裝。
回到exec(ctx context.Context, codec ServerCodec, req *serverRequest)方法。codec.Write(response)對返回結果json序列化。
如果請求方法是訂閱執行有回撥callback()。
1,上一節分析的DialHTTPWithClient()方法,RPC的Http服務,建立了一個httpConn通道。
2,RPC的WebSocket服務,撥號的實現方法:
3,RPC的InProc服務,撥號的實現方法
4,RPC的IPC服務,撥號的實現方法
如果是windows系統用第三方防範,建立了一個net.conn通道
dispatch的 select case batch := <-c.readResp: receive到c.readResp。如果這個請求的是通知,走通知的響應,否則走c.handleResponse(msg)
這一節,先分析一下Http RPC server如何處理client的請求。然後再分析一下IPC RPC的處理流程。
一,Http RPC server處理Client的請求。
回到上一節startHTTP() 裡面HTTPServer初始化的方法實現了http.server的 ServeHTTP(w http.ResponseWriter, r *http.Request)方法。func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) return &http.Server{Handler: handler} } // ServeHTTP serves JSON-RPC requests over HTTP. func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Permit dumb empty requests for remote health-checks (AWS) if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { return } if code, err := validateRequest(r); err != nil { http.Error(w, err.Error(), code) return } // All checks passed, create a codec that reads direct from the request body // untilEOF and writes the response to w and order the server to process a // single request. codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) defer codec.Close() w.Header().Set("content-type", contentType) srv.ServeSingleRequest(codec, OptionMethodInvocation) }
先過濾掉非法的請求,對接收到的請求body體,進行JSONCodec封裝。
然後交由 srv.ServeSingleRequest(codec, OptionMethodInvocation)處理。
接著呼叫 s.serveRequest(codec, true, options)
singleShot引數是控制請求時同步還是非同步。如果singleShot為true,那麼請求的處理是同步的,需要等待處理結果之後才能退出。 singleShot為false,把處理請求的方法交由goroutine非同步處理。
Http RPC的處理是使用同步方式。
當server啟動 s.run的值就為1,直到server stop。func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error { var pend sync.WaitGroup defer func() { if err := recover(); err != nil { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] log.Error(string(buf)) } s.codecsMu.Lock() s.codecs.Remove(codec) s.codecsMu.Unlock() }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // if the codec supports notification include a notifier that callbacks can use // to send notification to clients. It is thight to the codec/connection. If the // connection is closed the notifier will stop and cancels all active subscriptions. if options&OptionSubscriptions == OptionSubscriptions { ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped s.codecsMu.Unlock() return &shutdownError{} } s.codecs.Add(codec) s.codecsMu.Unlock() // test if the server is ordered to stop for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) if err != nil { // If a parsing error occurred, send an error if err.Error() != "EOF" { log.Debug(fmt.Sprintf("read error %v\n", err)) codec.Write(codec.CreateErrorResponse(nil, err)) } // Error or end of stream, wait for requests and tear down pend.Wait() return nil } // check if server is ordered to shutdown and return an error // telling the client that his request failed. if atomic.LoadInt32(&s.run) != 1 { err = &shutdownError{} if batch { resps := make([]interface{}, len(reqs)) for i, r := range reqs { resps[i] = codec.CreateErrorResponse(&r.id, err) } codec.Write(resps) } else { codec.Write(codec.CreateErrorResponse(&reqs[0].id, err)) } return nil } // If a single shot request is executing, run and return immediately if singleShot { if batch { s.execBatch(ctx, codec, reqs) } else { s.exec(ctx, codec, reqs[0]) } return nil } // For multi-shot connections, start a goroutine to serve and loop back pend.Add(1) go func(reqs []*serverRequest, batch bool) { defer pend.Done() if batch { s.execBatch(ctx, codec, reqs) } else { s.exec(ctx, codec, reqs[0]) } }(reqs, batch) } return nil }
將codec add進s.codecs,codecs是一個set。
處理完請求資料,返回時需要從s.codecs remove 這個codec
對s.codecs的add 和 remove需要新增互斥鎖,保證s.codecs的執行緒安全。
s.readRequest(codec) 處理請求的codec資料。
codec.ReadRequestHeaders()解析了請求資料func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) { reqs, batch, err := codec.ReadRequestHeaders() if err != nil { return nil, batch, err } requests := make([]*serverRequest, len(reqs)) // verify requests for i, r := range reqs { var ok bool var svc *service if r.err != nil { requests[i] = &serverRequest{id: r.id, err: r.err} continue } if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { requests[i].args = args } else { requests[i].err = &invalidParamsError{err.Error()} } continue } if svc, ok = s.services[r.service]; !ok { // rpc method isn't available requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} continue } if r.isPubSub { // eth_subscribe, r.method contains the subscription method name if callb, ok := svc.subscriptions[r.method]; ok { requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { argTypes := []reflect.Type{reflect.TypeOf("")} argTypes = append(argTypes, callb.argTypes...) if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { requests[i].args = args[1:] // first one is service.method name which isn't an actual argument } else { requests[i].err = &invalidParamsError{err.Error()} } } } else { requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} } continue } if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { requests[i].args = args } else { requests[i].err = &invalidParamsError{err.Error()} } } continue } requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} } return requests, batch, nil }
func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
c.decMu.Lock()
defer c.decMu.Unlock()
var incomingMsg json.RawMessage
if err := c.d.Decode(&incomingMsg); err != nil {
return nil, false, &invalidRequestError{err.Error()}
}
if isBatch(incomingMsg) {
return parseBatchRequest(incomingMsg)
}
return parseRequest(incomingMsg)
}
如果請求的資料是一組req陣列用parseBatchRequest(incomingMsg)解析,否則用 parseRequest(incomingMsg)。兩者處理大同小異。func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
var in jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
if err := checkReqId(in.Id); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
// subscribe are special, they will always use `subscribeMethod` as first param in the payload
if strings.HasSuffix(in.Method, subscribeMethodSuffix) {
reqs := []rpcRequest{{id: &in.Id, isPubSub: true}}
if len(in.Payload) > 0 {
// first param must be subscription name
var subscribeMethod [1]string
if err := json.Unmarshal(in.Payload, &subscribeMethod); err != nil {
log.Debug(fmt.Sprintf("Unable to parse subscription method: %v\n", err))
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}
reqs[0].service, reqs[0].method = strings.TrimSuffix(in.Method, subscribeMethodSuffix), subscribeMethod[0]
reqs[0].params = in.Payload
return reqs, false, nil
}
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}
if strings.HasSuffix(in.Method, unsubscribeMethodSuffix) {
return []rpcRequest{{id: &in.Id, isPubSub: true,
method: in.Method, params: in.Payload}}, false, nil
}
elems := strings.Split(in.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, false, &methodNotFoundError{in.Method, ""}
}
// regular RPC call
if len(in.Payload) == 0 {
return []rpcRequest{{service: elems[0], method: elems[1], id: &in.Id}}, false, nil
}
return []rpcRequest{{service: elems[0], method: elems[1], id: &in.Id, params: in.Payload}}, false, nil
}
解析出service名字,方法名,id,請求引數組裝成rpcRequest物件,並返回。readRequest(codec ServerCodec)方法對rpcRequest再處理加工一下,然後返回。
回到serveRequest方法,繼續分析s.exec(ctx, codec, reqs[0])的實現
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
var response interface{}
var callback func()
if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err)
} else {
response, callback = s.handle(ctx, codec, req)
}
if err := codec.Write(response); err != nil {
log.Error(fmt.Sprintf("%v\n", err))
codec.Close()
}
// when request was a subscribe request this allows these subscriptions to be actived
if callback != nil {
callback()
}
}
交由s.handle(ctx, codec, req)處理func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
if req.err != nil {
return codec.CreateErrorResponse(&req.id, req.err), nil
}
if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
notifier, supported := NotifierFromContext(ctx)
if !supported { // interface doesn't support subscriptions (e.g. http)
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}
subid := ID(req.args[0].String())
if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
return codec.CreateResponse(req.id, true), nil
}
return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
}
if req.callb.isSubscribe {
subid, err := s.createSubscription(ctx, codec, req)
if err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
notifier.activate(subid, req.svcname)
}
return codec.CreateResponse(req.id, subid), activateSub
}
// regular RPC call, prepare arguments
if len(req.args) != len(req.callb.argTypes) {
rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
req.svcname, serviceMethodSeparator, req.callb.method.Name,
len(req.callb.argTypes), len(req.args))}
return codec.CreateErrorResponse(&req.id, rpcErr), nil
}
arguments := []reflect.Value{req.callb.rcvr}
if req.callb.hasCtx {
arguments = append(arguments, reflect.ValueOf(ctx))
}
if len(req.args) > 0 {
arguments = append(arguments, req.args...)
}
// execute RPC method and return result
reply := req.callb.method.Func.Call(arguments)
if len(reply) == 0 {
return codec.CreateResponse(req.id, nil), nil
}
if req.callb.errPos >= 0 { // test if method returned an error
if !reply[req.callb.errPos].IsNil() {
e := reply[req.callb.errPos].Interface().(error)
res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
return res, nil
}
}
return codec.CreateResponse(req.id, reply[0].Interface()), nil
}
跳過對訂閱和取消訂閱的請求處理。reply := req.callb.method.Func.Call(arguments) 執行了RPC方法並返回結果reply。
codec.CreateResponse(req.id, reply[0].Interface())是rpc.json.go對返回結果的封裝。
回到exec(ctx context.Context, codec ServerCodec, req *serverRequest)方法。codec.Write(response)對返回結果json序列化。
如果請求方法是訂閱執行有回撥callback()。
// 往client寫resp
func (c *jsonCodec) Write(res interface{}) error {
c.encMu.Lock()
defer c.encMu.Unlock()
return c.e.Encode(res)
}
c.e.Encode(res)會呼叫enc.w.Write(b),這個w就是func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)方法傳入的http.ResponseWriter。借用這個writer來實現server和client的通訊。二,其他RPC 撥號的實現方法
RPC Client撥號的過程實質是建立client和server的讀寫通道。1,上一節分析的DialHTTPWithClient()方法,RPC的Http服務,建立了一個httpConn通道。
2,RPC的WebSocket服務,撥號的實現方法:
func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
var conn net.Conn
var err error
switch config.Location.Scheme {
case "ws":
conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
case "wss":
dialer := contextDialer(ctx)
conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
default:
err = websocket.ErrBadScheme
}
if err != nil {
return nil, err
}
ws, err := websocket.NewClient(config, conn)
if err != nil {
conn.Close()
return nil, err
}
return ws, err
}
dialContext建立了一個ws的net.conn,tls.DialWithDialer建立了一個wss的net.conn3,RPC的InProc服務,撥號的實現方法
func DialInProc(handler *Server) *Client {
initctx := context.Background()
c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
p1, p2 := net.Pipe()
go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
return p2, nil
})
return c
}
建立了一個net.Pipe通道4,RPC的IPC服務,撥號的實現方法
func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
return newIPCConnection(ctx, endpoint)
})
}
//unix
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
return dialContext(ctx, "unix", endpoint)
}
//windows
// newIPCConnection will connect to a named pipe with the given //endpoint as name.
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
timeout := defaultPipeDialTimeout
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout < 0 {
timeout = 0
}
}
return npipe.DialTimeout(endpoint, timeout)
}
如果是unix系統走的是websocket的建立方式,建立一個net.conn通道,如果是windows系統用第三方防範,建立了一個net.conn通道
三,其他RPC Client如何傳送請求
Rpc/client.go 的CallContext()方法,如果不是http請求,選擇走c.send(ctx, op, msg)方法。之所以會這樣是因為,http是一個短連線,每次請求是同步的,直接返回請求結果。其他的比如IPC、InProc、 websocket都是長連線,每次請求都是非同步的,需要在網路執行緒外監聽請求返回的結果。// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.requestOp <- op:
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("sending ", msg)
}})
err := c.write(ctx, msg)
c.sendDone <- err
return err
case <-ctx.Done():
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return ctx.Err()
case <-c.didQuit:
return ErrClientQuit
}
這時候請求被select阻塞住,直到c.requestOp receive到op,或者receive 到 ctx.Done(),或receive到 c.didQuit。c.requestOp拿到op,呼叫write方法把請求的內容寫到conn通道去。然後傳送給sendDone chan,client的dispactch方法會收到這個結果。func (c *Client) dispatch(conn net.Conn) {
// Spawn the initial read loop.
go c.read(conn)
var (
lastOp *requestOp // tracks last send operation
requestOpLock = c.requestOp // nil while the send lock is held
reading = true // if true, a read loop is running
)
defer close(c.didQuit)
defer func() {
c.closeRequestOps(ErrClientQuit)
conn.Close()
if reading {
// Empty read channels until read is dead.
for {
select {
case <-c.readResp:
case <-c.readErr:
return
}
}
}
}()
for {
select {
case <-c.close:
return
// Read path.
case batch := <-c.readResp:
for _, msg := range batch {
switch {
case msg.isNotification():
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: notification ", msg)
}})
c.handleNotification(msg)
case msg.isResponse():
log.Trace("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: response ", msg)
}})
c.handleResponse(msg)
default:
log.Debug("", "msg", log.Lazy{Fn: func() string {
return fmt.Sprint("<-readResp: dropping weird message", msg)
}})
// TODO: maybe close
}
}
case err := <-c.readErr:
log.Debug(fmt.Sprintf("<-readErr: %v", err))
c.closeRequestOps(err)
conn.Close()
reading = false
case newconn := <-c.reconnected:
log.Debug(fmt.Sprintf("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr()))
if reading {
// Wait for the previous read loop to exit. This is a rare case.
conn.Close()
<-c.readErr
}
go c.read(newconn)
reading = true
conn = newconn
// Send path.
case op := <-requestOpLock:
// Stop listening for further send ops until the current one is done.
requestOpLock = nil
lastOp = op
for _, id := range op.ids {
c.respWait[string(id)] = op
}
case err := <-c.sendDone:
if err != nil {
// Remove response handlers for the last send. We remove those here
// because the error is already handled in Call or BatchCall. When the
// read loop goes down, it will signal all other current operations.
for _, id := range lastOp.ids {
delete(c.respWait, string(id))
}
}
// Listen for send ops again.
requestOpLock = c.requestOp
lastOp = nil
}
}
}
這個dispatch()方法也是配套給非http請求用的。通過goroutine c.read(conn)。來讀server通過conn返回的資料。func (c *Client) read(conn net.Conn) error {
var (
buf json.RawMessage
dec = json.NewDecoder(conn)
)
readMessage := func() (rs []*jsonrpcMessage, err error) {
buf = buf[:0]
if err = dec.Decode(&buf); err != nil {
return nil, err
}
if isBatch(buf) {
err = json.Unmarshal(buf, &rs)
} else {
rs = make([]*jsonrpcMessage, 1)
err = json.Unmarshal(buf, &rs[0])
}
return rs, err
}
for {
resp, err := readMessage()
if err != nil {
c.readErr <- err
return err
}
c.readResp <- resp
}
}
然後把server返回資料send 到c.readResp chan。dispatch的 select case batch := <-c.readResp: receive到c.readResp。如果這個請求的是通知,走通知的響應,否則走c.handleResponse(msg)
func (c *Client) handleResponse(msg *jsonrpcMessage) {
op := c.respWait[string(msg.ID)]
if op == nil {
log.Debug(fmt.Sprintf("unsolicited response %v", msg))
return
}
delete(c.respWait, string(msg.ID))
// For normal responses, just forward the reply to Call/BatchCall.
if op.sub == nil {
op.resp <- msg
return
}
// For subscription responses, start the subscription if the server
// indicates success. EthSubscribe gets unblocked in either case through
// the op.resp channel.
defer close(op.resp)
if msg.Error != nil {
op.err = msg.Error
return
}
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
go op.sub.start()
c.subs[op.sub.subid] = op.sub
}
}
這時候把返回資料send給op.resp <- msg。 後續處理和http RPC的處理一致,走到CallContext方法的 resp, err := op.wait(ctx)。
四,總結
go-ethereum有四種RPC。HTTP RPC、Inproc RPC、IPC RPC、WS RPC。它們主要的實現邏輯都在rpc/server.go和rpc/client.go。各自根據自己的實現方式派生自己的client例項,建立各自的net.conn通道。由於HTTP RPC是基於短連結請求,實現方式和其他的不太一樣。