Skip to content

Latest commit

 

History

History
635 lines (535 loc) · 17 KB

b6cb9fd341765363f306fef479e7f2d9.md

File metadata and controls

635 lines (535 loc) · 17 KB

transport层之jsonrpc

在http中还提供了一种通过http协议以json为数据格式进行rpc调用的封装。其封装方式和之前的方式类似。

数据格式

在jsonrpc中,请求和响应的格式是预先定义好的。

请求格式:

// Request defines a JSON RPC request from the spec
// http://www.jsonrpc.org/specification#request_object
type Request struct {
	JSONRPC string          `json:"jsonrpc"`
	Method  string          `json:"method"`
	Params  json.RawMessage `json:"params"`
	ID      *RequestID      `json:"id"`
}

一个以post方式的jsonrpc请求body中的格式类似与下面这样:

{
	"id": 123,
    "jsonrpc": "2.0",
    "method": "sum",
    "params": {
        "A": 2,
		"B": 2
	}
}

响应格式:

// Response defines a JSON RPC response from the spec
// http://www.jsonrpc.org/specification#response_object
type Response struct {
	JSONRPC string          `json:"jsonrpc"`
	Result  json.RawMessage `json:"result,omitempty"`
	Error   *Error          `json:"error,omitempty"`
	ID      *RequestID      `json:"id"`
}

一个响应的body类似如下:

{
    "jsonrpc": "2.0",
    "result": 4
}

Server

// Server wraps an endpoint and implements http.Handler.
type Server struct {
	ecm          EndpointCodecMap
	before       []httptransport.RequestFunc
	after        []httptransport.ServerResponseFunc
	errorEncoder httptransport.ErrorEncoder
	finalizer    httptransport.ServerFinalizerFunc
	logger       log.Logger
}

和之前不同的是,这里的EndpointCodecMap是一个map,保存了请求方法到EndpointCodec的映射,而EndpointCodec正是endpoint.Endpoint,DecodeRequestFunc和EncodeResponseFunc的组合。

// EndpointCodecMap maps the Request.Method to the proper EndpointCodec
type EndpointCodecMap map[string]EndpointCodec

// EndpointCodec defines a server Endpoint and its associated codecs
type EndpointCodec struct {
	Endpoint endpoint.Endpoint
	Decode   DecodeRequestFunc
	Encode   EncodeResponseFunc
}

由此可以看出,这个map中注册的所有的rpc方法,共用一套before,after,finalizer,错误处理和日志处理函数。

创建server的方式和之前一样:

// NewServer constructs a new server, which implements http.Server.
func NewServer(
	ecm EndpointCodecMap,
	options ...ServerOption,
) *Server {
	s := &Server{
		ecm:          ecm,
		errorEncoder: DefaultErrorEncoder,
		logger:       log.NewNopLogger(),
	}
	for _, option := range options {
		option(s)
	}
	return s
}

一样采用了选项模式对server属性进行设置

// ServerOption sets an optional parameter for servers.
type ServerOption func(*Server)

// ServerBefore functions are executed on the HTTP request object before the
// request is decoded.
func ServerBefore(before ...httptransport.RequestFunc) ServerOption {
	return func(s *Server) { s.before = append(s.before, before...) }
}

// ServerAfter functions are executed on the HTTP response writer after the
// endpoint is invoked, but before anything is written to the client.
func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption {
	return func(s *Server) { s.after = append(s.after, after...) }
}

// ServerErrorEncoder is used to encode errors to the http.ResponseWriter
// whenever they're encountered in the processing of a request. Clients can
// use this to provide custom error formatting and response codes. By default,
// errors will be written with the DefaultErrorEncoder.
func ServerErrorEncoder(ee httptransport.ErrorEncoder) ServerOption {
	return func(s *Server) { s.errorEncoder = ee }
}

// ServerErrorLogger is used to log non-terminal errors. By default, no errors
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom ServerErrorEncoder or ServerFinalizer, both of which have access to
// the context.
func ServerErrorLogger(logger log.Logger) ServerOption {
	return func(s *Server) { s.logger = logger }
}

// ServerFinalizer is executed at the end of every HTTP request.
// By default, no finalizer is registered.
func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption {
	return func(s *Server) { s.finalizer = f }
}

具体的处理流程:

// ServeHTTP implements http.Handler.
func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
		w.WriteHeader(http.StatusMethodNotAllowed)
		_, _ = io.WriteString(w, "405 must POST\n")
		return
	}
	ctx := r.Context()

	if s.finalizer != nil {
		iw := &interceptingWriter{w, http.StatusOK}
		defer func() { s.finalizer(ctx, iw.code, r) }()
		w = iw
	}

	for _, f := range s.before {
		ctx = f(ctx, r)
	}

	// Decode the body into an  object
	var req Request
	err := json.NewDecoder(r.Body).Decode(&req)
	if err != nil {
		rpcerr := parseError("JSON could not be decoded: " + err.Error())
		s.logger.Log("err", rpcerr)
		s.errorEncoder(ctx, rpcerr, w)
		return
	}

	// Get the endpoint and codecs from the map using the method
	// defined in the JSON  object
	ecm, ok := s.ecm[req.Method]
	if !ok {
		err := methodNotFoundError(fmt.Sprintf("Method %s was not found.", req.Method))
		s.logger.Log("err", err)
		s.errorEncoder(ctx, err, w)
		return
	}

	// Decode the JSON "params"
	reqParams, err := ecm.Decode(ctx, req.Params)
	if err != nil {
		s.logger.Log("err", err)
		s.errorEncoder(ctx, err, w)
		return
	}

	// Call the Endpoint with the params
	response, err := ecm.Endpoint(ctx, reqParams)
	if err != nil {
		s.logger.Log("err", err)
		s.errorEncoder(ctx, err, w)
		return
	}

	for _, f := range s.after {
		ctx = f(ctx, w)
	}

	res := Response{
		ID:      req.ID,
		JSONRPC: Version,
	}

	// Encode the response from the Endpoint
	resParams, err := ecm.Encode(ctx, response)
	if err != nil {
		s.logger.Log("err", err)
		s.errorEncoder(ctx, err, w)
		return
	}

	res.Result = resParams

	w.Header().Set("Content-Type", ContentType)
	_ = json.NewEncoder(w).Encode(res)
}

首先,采用jsonrpc的方式调用只支持http的post方式,第一步对此进行了判断。然后基于http.ResponseWriter封装了一个interceptingWriter,用于收集后面的响应信息传入finalizer。

// interceptingWriter intercepts calls to WriteHeader, so that a finalizer
// can be given the correct status code.
type interceptingWriter struct {
	http.ResponseWriter
	code int
}

// WriteHeader may not be explicitly called, so care must be taken to
// initialize w.code to its default value of http.StatusOK.
func (w *interceptingWriter) WriteHeader(code int) {
	w.code = code
	w.ResponseWriter.WriteHeader(code)
}

接下来进入正式的请求处理流程:

before -> json_decode(获取method) -> dec(获取params) -> endpoint -> after -> enc(写入body)

默认帮我们实现了一个错误处理函数

// DefaultErrorEncoder writes the error to the ResponseWriter,
// as a json-rpc error response, with an InternalError status code.
// The Error() string of the error will be used as the response error message.
// If the error implements ErrorCoder, the provided code will be set on the
// response error.
// If the error implements Headerer, the given headers will be set.
func DefaultErrorEncoder(_ context.Context, err error, w http.ResponseWriter) {
	w.Header().Set("Content-Type", ContentType)
	if headerer, ok := err.(httptransport.Headerer); ok {
		for k := range headerer.Headers() {
			w.Header().Set(k, headerer.Headers().Get(k))
		}
	}

	e := Error{
		Code:    InternalError,
		Message: err.Error(),
	}
	if sc, ok := err.(ErrorCoder); ok {
		e.Code = sc.ErrorCode()
	}

	w.WriteHeader(http.StatusOK)
	_ = json.NewEncoder(w).Encode(Response{
		JSONRPC: Version,
		Error:   &e,
	})
}

这其中会对err进行ErrorCoder类型断言,如果实现了ErrorCoder,将调用其ErrorCode()方法对错误码进行设置

// By default, InternalError (-32603) is used.
type ErrorCoder interface {
	ErrorCode() int
}

Client

// Client wraps a JSON RPC method and provides a method that implements endpoint.Endpoint.
type Client struct {
	client httptransport.HTTPClient

	// JSON RPC endpoint URL
	tgt *url.URL

	// JSON RPC method name.
	method string

	enc            EncodeRequestFunc
	dec            DecodeResponseFunc
	before         []httptransport.RequestFunc
	after          []httptransport.ClientResponseFunc
	finalizer      httptransport.ClientFinalizerFunc
	requestID      RequestIDGenerator
	bufferedStream bool
}

这里的封装和http部分的Client封装几乎一致,requestID是请求的唯一标识。

// RequestIDGenerator returns an ID for the request.
type RequestIDGenerator interface {
	Generate() interface{}
}

这里默认帮我们实现了一个UUID生成器

// autoIncrementID is a RequestIDGenerator that generates
// auto-incrementing integer IDs.
type autoIncrementID struct {
	v *uint64
}

// NewAutoIncrementID returns an auto-incrementing request ID generator,
// initialised with the given value.
func NewAutoIncrementID(init uint64) RequestIDGenerator {
	// Offset by one so that the first generated value = init.
	v := init - 1
	return &autoIncrementID{v: &v}
}

// Generate satisfies RequestIDGenerator
func (i *autoIncrementID) Generate() interface{} {
	id := atomic.AddUint64(i.v, 1)
	return id
}

客户端的构造方法:

// NewClient constructs a usable Client for a single remote method.
func NewClient(
	tgt *url.URL,
	method string,
	options ...ClientOption,
) *Client {
	c := &Client{
		client:         http.DefaultClient,
		method:         method,
		tgt:            tgt,
		enc:            DefaultRequestEncoder,
		dec:            DefaultResponseDecoder,
		before:         []httptransport.RequestFunc{},
		after:          []httptransport.ClientResponseFunc{},
		requestID:      NewAutoIncrementID(0),
		bufferedStream: false,
	}
	for _, option := range options {
		option(c)
	}
	return c
}

由于是json方式的rpc,所以这里用内置的json包帮我们实现了json的序列化和反序列化

// DefaultRequestEncoder marshals the given request to JSON.
func DefaultRequestEncoder(_ context.Context, req interface{}) (json.RawMessage, error) {
	return json.Marshal(req)
}

// DefaultResponseDecoder unmarshals the result to interface{}, or returns an
// error, if found.
func DefaultResponseDecoder(_ context.Context, res Response) (interface{}, error) {
	if res.Error != nil {
		return nil, *res.Error
	}
	var result interface{}
	err := json.Unmarshal(res.Result, &result)
	if err != nil {
		return nil, err
	}
	return result, nil
}

同样使用选项模式设置属性:

// ClientOption sets an optional parameter for clients.
type ClientOption func(*Client)

// SetClient sets the underlying HTTP client used for requests.
// By default, http.DefaultClient is used.
func SetClient(client httptransport.HTTPClient) ClientOption {
	return func(c *Client) { c.client = client }
}

// ClientBefore sets the RequestFuncs that are applied to the outgoing HTTP
// request before it's invoked.
func ClientBefore(before ...httptransport.RequestFunc) ClientOption {
	return func(c *Client) { c.before = append(c.before, before...) }
}

// ClientAfter sets the ClientResponseFuncs applied to the server's HTTP
// response prior to it being decoded. This is useful for obtaining anything
// from the response and adding onto the context prior to decoding.
func ClientAfter(after ...httptransport.ClientResponseFunc) ClientOption {
	return func(c *Client) { c.after = append(c.after, after...) }
}

// ClientFinalizer is executed at the end of every HTTP request.
// By default, no finalizer is registered.
func ClientFinalizer(f httptransport.ClientFinalizerFunc) ClientOption {
	return func(c *Client) { c.finalizer = f }
}

// ClientRequestEncoder sets the func used to encode the request params to JSON.
// If not set, DefaultRequestEncoder is used.
func ClientRequestEncoder(enc EncodeRequestFunc) ClientOption {
	return func(c *Client) { c.enc = enc }
}

// ClientResponseDecoder sets the func used to decode the response params from
// JSON. If not set, DefaultResponseDecoder is used.
func ClientResponseDecoder(dec DecodeResponseFunc) ClientOption {
	return func(c *Client) { c.dec = dec }
}

// ClientRequestIDGenerator is executed before each request to generate an ID
// for the request.
// By default, AutoIncrementRequestID is used.
func ClientRequestIDGenerator(g RequestIDGenerator) ClientOption {
	return func(c *Client) { c.requestID = g }
}

// BufferedStream sets whether the Response.Body is left open, allowing it
// to be read from later. Useful for transporting a file as a buffered stream.
func BufferedStream(buffered bool) ClientOption {
	return func(c *Client) { c.bufferedStream = buffered }
}

进入正式的处理流程:

// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (c Client) Endpoint() endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

		var (
			resp *http.Response
			err  error
		)
		if c.finalizer != nil {
			defer func() {
				if resp != nil {
					ctx = context.WithValue(ctx, httptransport.ContextKeyResponseHeaders, resp.Header)
					ctx = context.WithValue(ctx, httptransport.ContextKeyResponseSize, resp.ContentLength)
				}
				c.finalizer(ctx, err)
			}()
		}

		var params json.RawMessage
		if params, err = c.enc(ctx, request); err != nil {
			return nil, err
		}
		rpcReq := clientRequest{
			JSONRPC: "",
			Method:  c.method,
			Params:  params,
			ID:      c.requestID.Generate(),
		}

		req, err := http.NewRequest("POST", c.tgt.String(), nil)
		if err != nil {
			return nil, err
		}

		req.Header.Set("Content-Type", "application/json; charset=utf-8")
		var b bytes.Buffer
		req.Body = ioutil.NopCloser(&b)
		err = json.NewEncoder(&b).Encode(rpcReq)
		if err != nil {
			return nil, err
		}

		for _, f := range c.before {
			ctx = f(ctx, req)
		}

		resp, err = c.client.Do(req.WithContext(ctx))
		if err != nil {
			return nil, err
		}

		if !c.bufferedStream {
			defer resp.Body.Close()
		}

		// Decode the body into an object
		var rpcRes Response
		err = json.NewDecoder(resp.Body).Decode(&rpcRes)
		if err != nil {
			return nil, err
		}

		for _, f := range c.after {
			ctx = f(ctx, resp)
		}

		return c.dec(ctx, rpcRes)
	}
}

首先整合响应信息,以ctx传入finalizer中,待请求处理结束时调用,常用于日志记录,链路追踪。

接着根据请求信息,构造标准的请求结构,设置请求头,请求体,之后流程如下:

before -> client.Do -> decode-> after -> dec

Error

error.go中封装了常见的错误,并进行了分类

// Error defines a JSON RPC error that can be returned
// in a Response from the spec
// http://www.jsonrpc.org/specification#error_object
type Error struct {
	Code    int         `json:"code"`
	Message string      `json:"message"`
	Data    interface{} `json:"data,omitempty"`
}

// Error implements error.
func (e Error) Error() string {
	if e.Message != "" {
		return e.Message
	}
	return errorMessage[e.Code]
}

// ErrorCode returns the JSON RPC error code associated with the error.
func (e Error) ErrorCode() int {
	return e.Code
}

对jsonrpc进行错误分类:

const (
	// ParseError defines invalid JSON was received by the server.
	// An error occurred on the server while parsing the JSON text.
	ParseError int = -32700

	// InvalidRequestError defines the JSON sent is not a valid Request object.
	InvalidRequestError int = -32600

	// MethodNotFoundError defines the method does not exist / is not available.
	MethodNotFoundError int = -32601

	// InvalidParamsError defines invalid method parameter(s).
	InvalidParamsError int = -32602

	// InternalError defines a server error
	InternalError int = -32603
)

var errorMessage = map[int]string{
	ParseError:          "An error occurred on the server while parsing the JSON text.",
	InvalidRequestError: "The JSON sent is not a valid Request object.",
	MethodNotFoundError: "The method does not exist / is not available.",
	InvalidParamsError:  "Invalid method parameter(s).",
	InternalError:       "Internal JSON-RPC error.",
}

// ErrorMessage returns a message for the JSON RPC error code. It returns the empty
// string if the code is unknown.
func ErrorMessage(code int) string {
	return errorMessage[code]
}

type parseError string

func (e parseError) Error() string {
	return string(e)
}
func (e parseError) ErrorCode() int {
	return ParseError
}

type invalidRequestError string

func (e invalidRequestError) Error() string {
	return string(e)
}
func (e invalidRequestError) ErrorCode() int {
	return InvalidRequestError
}

type methodNotFoundError string

func (e methodNotFoundError) Error() string {
	return string(e)
}
func (e methodNotFoundError) ErrorCode() int {
	return MethodNotFoundError
}

type invalidParamsError string

func (e invalidParamsError) Error() string {
	return string(e)
}
func (e invalidParamsError) ErrorCode() int {
	return InvalidParamsError
}

type internalError string

func (e internalError) Error() string {
	return string(e)
}
func (e internalError) ErrorCode() int {
	return InternalError
}