沈阳网站建站,想要做网站,软件外包合同模板,长沙人才招聘网最新招聘2024前言
这是系列最后一篇文章了#xff0c;最后我们来为我们的rpc框架实现一个http gateway。这个功能实际上受到了rpcx的启发#xff0c;基于这种方式实现一个简单的类似service mesh中的sidecar。
原理
http gateway可以接收来自客户端的http请求并将其转换为rpc请求然后交…前言
这是系列最后一篇文章了最后我们来为我们的rpc框架实现一个http gateway。这个功能实际上受到了rpcx的启发基于这种方式实现一个简单的类似service mesh中的sidecar。
原理
http gateway可以接收来自客户端的http请求并将其转换为rpc请求然后交给服务端处理再将服务端处理过后的结果通过http响应返回给客户端。 http gateway的大致原理就是将我们的RPC协议中header部分放到http header中然后RPC协议中的body部分放到http body即可。
实现
首先我们需要定义http header中各个字段的名称
const (HEADER_SEQ rpc-header-seq //序号, 用来唯一标识请求或响应HEADER_MESSAGE_TYPE rpc-header-message_type //消息类型用来标识一个消息是请求还是响应HEADER_COMPRESS_TYPE rpc-header-compress_type //压缩类型用来标识一个消息的压缩方式HEADER_SERIALIZE_TYPE rpc-header-serialize_type //序列化类型用来标识消息体采用的编码方式HEADER_STATUS_CODE rpc-header-status_code //状态类型用来标识一个请求是正常还是异常HEADER_SERVICE_NAME rpc-header-service_name //服务名HEADER_METHOD_NAME rpc-header-method_name //方法名HEADER_ERROR rpc-header-error //方法调用发生的异常HEADER_META_DATA rpc-header-meta_data //其他元数据
)然后我们需要启动一个http server用来接收http请求。这里我们使用go自带的api默认使用5080端口如果发现端口已经被占用了就递增端口。
func (s *SGServer) startGateway() {port : 5080ln, err : net.Listen(tcp, : strconv.Itoa(port))for err ! nil strings.Contains(err.Error(), address already in use) {portln, err net.Listen(tcp, : strconv.Itoa(port))}if err ! nil {log.Printf(error listening gateway: %s, err.Error())}log.Printf(gateway listenning on strconv.Itoa(port))//避免阻塞使用新的goroutine来执行http servergo func() {err : http.Serve(ln, s)if err ! nil {log.Printf(error serving http %s, err.Error())}}()
}接下来我们需要实现ServeHTTP函数
func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {//如果url不对则直接返回if r.URL.Path ! /invoke { rw.WriteHeader(404)return}//如果method不对则直接返回if r.Method ! POST {rw.WriteHeader(405)return}//构造新的请求request : protocol.NewMessage(s.Option.ProtocolType)//根据http header填充request的headerrequest, err : parseHeader(request, r)if err ! nil {rw.WriteHeader(400)}//根据http body填充request的datarequest, err parseBody(request, r)if err ! nil {rw.WriteHeader(400)}//构造contextctx : metadata.WithMeta(context.Background(), request.MetaData)response : request.Clone()response.MessageType protocol.MessageTypeResponse//处理请求response s.process(ctx, request, response)//返回相应s.writeHttpResponse(response, rw, r)
}func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) {data, err : ioutil.ReadAll(request.Body)if err ! nil {return nil, err}message.Data datareturn message, nil
}func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) {headerSeq : request.Header.Get(HEADER_SEQ)seq, err : strconv.ParseUint(headerSeq, 10, 64)if err ! nil {return nil, err}message.Seq seqheaderMsgType : request.Header.Get(HEADER_MESSAGE_TYPE)msgType, err : protocol.ParseMessageType(headerMsgType)if err ! nil {return nil, err}message.MessageType msgTypeheaderCompressType : request.Header.Get(HEADER_COMPRESS_TYPE)compressType, err : protocol.ParseCompressType(headerCompressType)if err ! nil {return nil, err}message.CompressType compressTypeheaderSerializeType : request.Header.Get(HEADER_SERIALIZE_TYPE)serializeType, err : codec.ParseSerializeType(headerSerializeType)if err ! nil {return nil, err}message.SerializeType serializeTypeheaderStatusCode : request.Header.Get(HEADER_STATUS_CODE)statusCode, err : protocol.ParseStatusCode(headerStatusCode)if err ! nil {return nil, err}message.StatusCode statusCodeserviceName : request.Header.Get(HEADER_SERVICE_NAME)message.ServiceName serviceNamemethodName : request.Header.Get(HEADER_METHOD_NAME)message.MethodName methodNameerrorMsg : request.Header.Get(HEADER_ERROR)message.Error errorMsgheaderMeta : request.Header.Get(HEADER_META_DATA)meta : make(map[string]interface{})err json.Unmarshal([]byte(headerMeta), meta)if err ! nil {return nil, err}message.MetaData metareturn message, nil
}func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) {header : rw.Header()header.Set(HEADER_SEQ, string(message.Seq))header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String())header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String())header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String())header.Set(HEADER_STATUS_CODE, message.StatusCode.String())header.Set(HEADER_SERVICE_NAME, message.ServiceName)header.Set(HEADER_METHOD_NAME, message.MethodName)header.Set(HEADER_ERROR, message.Error)metaDataJson, _ : json.Marshal(message.MetaData)header.Set(HEADER_META_DATA, string(metaDataJson))_, _ rw.Write(message.Data)
}
最后我们只需要在wrapper中启动http server即可。
func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {return func(network string, addr string, meta map[string]interface{}) error {//省略前面的部分...//启动gateways.startGateway()return serveFunc(network, addr, meta)}
}客户端测试代码
func MakeHttpCall() {//声明参数并序列化放到http请求的body中arg : service.Args{A: rand.Intn(200), B: rand.Intn(100)}data, _ : msgpack.Marshal(arg)body : bytes.NewBuffer(data)req, err : http.NewRequest(POST, http://localhost:5080/invoke, body)if err ! nil {log.Println(err)return}req.Header.Set(server.HEADER_SEQ, 1)req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String())req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String())req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String())req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String())req.Header.Set(server.HEADER_SERVICE_NAME,Arith)req.Header.Set(server.HEADER_METHOD_NAME,Add)req.Header.Set(server.HEADER_ERROR,)meta : map[string]interface{}{key:value}metaJson, _ : json.Marshal(meta)req.Header.Set(server.HEADER_META_DATA,string(metaJson))response, err : http.DefaultClient.Do(req)if err ! nil {log.Println(err)return}if response.StatusCode ! 200 {log.Println(response)} else if response.Header.Get(server.HEADER_ERROR) ! {log.Println(response.Header.Get(server.HEADER_ERROR))} else {data, err ioutil.ReadAll(response.Body)result : service.Reply{}msgpack.Unmarshal(data, result)fmt.Println(result.C)}
}结语
这个系列到此就告一段落了但是还有很多需要改进和丰富的地方甚至是错误后续再以单独文章的形式更新。