castile-rpc框架:通信协议的设计和编解码器的实现
现在需要建立客户端和服务端之间的通信机制了,主要内容有:
- 服务消费者实现协议编码,向服务提供者发送调用数据。
- 服务提供者收到数据后解码,然后向服务消费者发送响应数据,暂时忽略 RPC 请求是如何被调用的。
- 服务消费者收到响应数据后成功返回。
1、RPC 通信方案设计
2、自定义RPC通信协议
协议是服务消费者和服务提供者之间通信的基础,主流的 RPC 框架都会自定义通信协议,相比于 HTTP、HTTPS、JSON 等通用的协议,自定义协议可以实现更好的性能、扩展性以及安全性。
自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 状态: 失败还是成功? 可选
- 消息类型,是请求、响应?这个 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
1 2 3 4 5 6 7 8 9 10 11 12 13
| +---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte |
+---------------------------------------------------------------+
| 状态 1byte | 消息 ID 8byte | 数据长度 4byte |
+---------------------------------------------------------------+
| 数据内容 (长度不定) |
+---------------------------------------------------------------+
|
我们把协议分为协议头 Header 和协议体 Body 两个部分。协议头 Header 包含魔数、协议版本号、序列化算法、报文类型、状态、消息 ID、数据长度,协议体 Body 只包含数据内容部分,数据内容的长度是不固定的。RPC 请求和响应都可以使用该协议进行通信,对应协议实体类的定义如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Data public class CastileRpcProtocol<T> implements Serializable {
private MessageHeader messageHeader;
private T body; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Data public class MessageHeader implements Serializable {
private short magic; private byte version; private byte serialization; private byte msgType; private byte status; private long requestId; private int msgLen; }
|
3、序列化算法选型
目前比较常用的序列化算法包括 Json、Kryo、Hessian、Protobuf 等,这些第三方序列化算法都比 Java 原生的序列化操作都更加高效。 我们设计了一个 RPC 序列化顶层接口, 所有的序列化算法都需要实现这个接口;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public interface RpcSerialization {
<T> byte[] serialize(T obj) throws IOException;
<T> T deserialize(byte[] buf, Class<T> clazz) throws IOException;
}
|
我们为 RpcSerialization 提供了 HessianSerialization 和 JsonSerialization 两种类型的实现,为此,可以提供一个序列化工厂来切换不同的序列化算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class SerializationFactory { public static RpcSerialization getRpcSerialization(byte type){
SerializationTypeEnum typeEnum = SerializationTypeEnum.findSerializationType(type);
switch (typeEnum){ case HESSIAN: return new HessianSerialization(); case JSON: return new JsonSerialization(); default: throw new IllegalArgumentException("serialization type is illegal, " + type); } } }
|
4、通信协议的编码器
Netty 提供了两个最为常用的编解码抽象基类 MessageToByteEncoder 和 ByteToMessageDecoder,帮助我们很方便地扩展实现自定义协议。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class MessageEncoder extends MessageToByteEncoder<CastileRpcProtocol> {
@Override protected void encode(ChannelHandlerContext channelHandlerContext, CastileRpcProtocol message, ByteBuf byteBuf) throws Exception { MessageHeader messageHeader = message.getMessageHeader(); byteBuf.writeShort(messageHeader.getMagic()); byteBuf.writeByte(messageHeader.getVersion()); byteBuf.writeByte(messageHeader.getSerialization()); byteBuf.writeByte(messageHeader.getMsgType()); byteBuf.writeByte(messageHeader.getStatus()); byteBuf.writeLong(messageHeader.getRequestId());
RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(messageHeader.getSerialization()); byte[] body = rpcSerialization.serialize(message.getBody()); byteBuf.writeInt(body.length); byteBuf.writeBytes(body); } }
|
在服务消费者或者服务提供者调用 writeAndFlush() 将数据写给对方前,都已经封装成 RpcRequest 或者 RpcResponse,所以可以采用 CastileRpcProtocol作为 RPC Encoder 编码器能够支持的编码类型。
5、 通信协议的解码器
解码器 相比于编码器 要复杂很多,解码器的目标是将字节流数据解码为消息对象,并传递给下一个 Inbound 处理器。整个解码过程有几个要点要特别注意:
只有当 ByteBuf 中内容大于协议头 Header 的固定的 18 字节时,才开始读取数据。
即使已经可以完整读取出协议头 Header,但是协议体 Body 有可能还未就绪。所以在刚开始读取数据时,需要使用 markReaderIndex() 方法标记读指针位置,当 ByteBuf 中可读字节长度小于协议体 Body 的长度时,再使用 resetReaderIndex() 还原读指针位置,说明现在 ByteBuf 中可读字节还不够一个完整的数据包。
这个其实也可以使用LengthFieldBasedFrameDecoder来处理粘包和半包问题
根据不同的报文类型 MsgType,需要反序列化出不同的协议体对象。在 RPC 请求调用的场景下,服务提供者需要将协议体内容反序列化成 MiniRpcRequest 对象;在 RPC 结果响应的场景下,服务消费者需要将协议体内容反序列化成 MiniRpcResponse 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @Slf4j public class MessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (byteBuf.readableBytes() < ProtocolConstants.HEADER_TOTAL_LEN) { log.error("message length valid failed! please check request data"); return; } byteBuf.markReaderIndex(); short magic = byteBuf.readShort(); if (magic != ProtocolConstants.MAGIC) { throw new IllegalArgumentException("magic number is illegal, " + magic); } byte version = byteBuf.readByte(); byte serializeType = byteBuf.readByte(); byte msgType = byteBuf.readByte(); byte status = byteBuf.readByte(); long requestId = byteBuf.readLong(); int dataLength = byteBuf.readInt(); if (byteBuf.readableBytes() < dataLength) { log.error("data readableBytes less than data length!"); byteBuf.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; byteBuf.readBytes(data); MsgType byTpye = MsgType.findByType(msgType); if (byTpye == null) { throw new IllegalArgumentException("msgType number is illegal, " + msgType); } MessageHeader header = new MessageHeader(); header.setMagic(magic); header.setVersion(version); header.setSerialization(serializeType); header.setStatus(status); header.setRequestId(requestId); header.setMsgType(msgType); header.setMsgLen(dataLength);
RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(serializeType); switch (byTpye) { case REQUEST: RpcRequest rpcRequest = rpcSerialization.deserialize(data, RpcRequest.class); if (rpcRequest != null) { CastileRpcProtocol<RpcRequest> castileRpcProtocol = new CastileRpcProtocol<>(); castileRpcProtocol.setMessageHeader(header); castileRpcProtocol.setBody(rpcRequest); list.add(castileRpcProtocol); } break; case RESPONSE: RpcResponse rpcResponse = rpcSerialization.deserialize(data, RpcResponse.class); if (rpcResponse != null) { CastileRpcProtocol<RpcResponse> castileRpcProtocol = new CastileRpcProtocol<>(); castileRpcProtocol.setMessageHeader(header); castileRpcProtocol.setBody(rpcResponse); list.add(castileRpcProtocol); } case HEARTBEAT: break; } } }
|
6、请求和响应处理
消费者调用RPC请求后,服务端通过解码器将二进制的数据解码成CastileRpcProtocol对象,再传递给RpcRequestHandler处理器执行rpc调用。 RpcRequestHandler 也是一个 Inbound 处理器,它并不需要承担解码工作,所以 RpcRequestHandler 直接继承 SimpleChannelInboundHandler 即可,然后重写 channelRead0() 方法,具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @Slf4j public class RpcRequestHandler extends SimpleChannelInboundHandler<CastileRpcProtocol<RpcRequest>> { private final Map<String, Object> rpcServiceMap;
public RpcRequestHandler(Map<String, Object> rpcServiceMap) { this.rpcServiceMap = rpcServiceMap; }
@Override protected void channelRead0(ChannelHandlerContext ctx, CastileRpcProtocol<RpcRequest> msg) throws Exception { RpcRequestProcessor.submitRequest(() -> { CastileRpcProtocol<RpcResponse> rpcProtocol = new CastileRpcProtocol<>(); RpcResponse rpcResponse = new RpcResponse(); MessageHeader messageHeader = msg.getMessageHeader(); messageHeader.setMsgType((byte) MsgType.RESPONSE.getType()); try { RpcRequest request = msg.getBody(); String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
Object serviceBean = rpcServiceMap.get(serviceKey); if (serviceBean == null) { throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName())); } Class<?> serviceClazz = serviceBean.getClass(); String methodName = request.getMethodName(); Object[] params = request.getParams(); Class<?>[] parameterTypes = request.getParameterTypes(); FastClass fastClass = FastClass.create(serviceClazz); int index = fastClass.getIndex(methodName, parameterTypes); Object result = fastClass.invoke(index, serviceBean, params);
rpcResponse.setData(result); messageHeader.setStatus((byte) MsgStatus.SUCCESS.getCode()); rpcProtocol.setBody(rpcResponse); rpcProtocol.setMessageHeader(messageHeader); } catch (Throwable throwable) { messageHeader.setStatus((byte) MsgStatus.FAIL.getCode()); rpcResponse.setMessage(throwable.toString()); log.error("process request {} error", messageHeader.getRequestId(), throwable); }
ctx.writeAndFlush(rpcProtocol); });
} }
|
服务消费者在发起调用时,维护了请求 requestId 和 RpcFuture的映射关系,RpcResponseHandler 会根据请求的 requestId 找到对应发起调用的 RpcFuture,然后为 RpcFuture 设置响应结果。
1 2 3 4 5 6 7 8 9
| public class RpcResponseHandler extends SimpleChannelInboundHandler<CastileRpcProtocol<RpcResponse>> { @Override protected void channelRead0(ChannelHandlerContext ctx, CastileRpcProtocol<RpcResponse> msg) throws Exception { long requestId = msg.getMessageHeader().getRequestId(); RpcFuture<RpcResponse> responseRpcFuture = RpcRequestHolder.REQUEST_MAP.remove(requestId); responseRpcFuture.getPromise().setSuccess(msg.getBody());
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Data public class RpcFuture<T> {
private Promise<T> promise;
private long timeout;
public RpcFuture(Promise<T> promise, long timeout) { this.promise = promise; this.timeout = timeout; } }
|