四. 优化与源码 1. 优化 1.1 扩展序列化算法 序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[]) 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理 目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 byte [] body = new byte [bodyLength];byteByf.readBytes(body); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body)); Message message = (Message) in.readObject(); message.setSequenceId(sequenceId); ByteArrayOutputStream out = new ByteArrayOutputStream(); new ObjectOutputStream(out).writeObject(message);byte [] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
1 2 3 4 5 6 7 8 9 public interface Serializer { <T> T deserialize (Class<T> clazz, byte [] bytes) ; <T> byte [] serialize(T object); }
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 enum SerializerAlgorithm implements Serializer { Java { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); Object object = in.readObject(); return (T) object; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误" , e); } } @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); new ObjectOutputStream(out).writeObject(object); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException("SerializerAlgorithm.Java 序列化错误" , e); } } }, Json { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz); } @Override public <T> byte [] serialize(T object) { return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8); } }; public static SerializerAlgorithm getByInt (int type) { SerializerAlgorithm[] array = SerializerAlgorithm.values(); if (type < 0 || type > array.length - 1 ) { throw new IllegalArgumentException("超过 SerializerAlgorithm 范围" ); } return array[type]; } }
增加配置类和配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public abstract class Config { static Properties properties; static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties(); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError(e); } } public static int getServerPort () { String value = properties.getProperty("server.port" ); if (value == null ) { return 8080 ; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm () { String value = properties.getProperty("serializer.algorithm" ); if (value == null ) { return Serializer.Algorithm.Java; } else { return Serializer.Algorithm.valueOf(value); } } }
配置文件
1 serializer.algorithm =Json
修改编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class MessageCodecSharable extends MessageToMessageCodec <ByteBuf , Message > { @Override public void encode (ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(Config.getSerializerAlgorithm().ordinal()); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); byte [] bytes = Config.getSerializerAlgorithm().serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); outList.add(out); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerAlgorithm = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm]; Class<? extends Message> messageClass = Message.getMessageClass(messageType); Message message = algorithm.deserialize(messageClass, bytes); out.add(message); } }
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Data public abstract class Message implements Serializable { public static Class<? extends Message> getMessageClass(int messageType) { return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType () ; public static final int LoginRequestMessage = 0 ; public static final int LoginResponseMessage = 1 ; public static final int ChatRequestMessage = 2 ; public static final int ChatResponseMessage = 3 ; public static final int GroupCreateRequestMessage = 4 ; public static final int GroupCreateResponseMessage = 5 ; public static final int GroupJoinRequestMessage = 6 ; public static final int GroupJoinResponseMessage = 7 ; public static final int GroupQuitRequestMessage = 8 ; public static final int GroupQuitResponseMessage = 9 ; public static final int GroupChatRequestMessage = 10 ; public static final int GroupChatResponseMessage = 11 ; public static final int GroupMembersRequestMessage = 12 ; public static final int GroupMembersResponseMessage = 13 ; public static final int PingMessage = 14 ; public static final int PongMessage = 15 ; private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>(); static { messageClasses.put(LoginRequestMessage, LoginRequestMessage.class); messageClasses.put(LoginResponseMessage, LoginResponseMessage.class); messageClasses.put(ChatRequestMessage, ChatRequestMessage.class); messageClasses.put(ChatResponseMessage, ChatResponseMessage.class); messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class); messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class); messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class); messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class); messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class); messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class); messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class); messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class); messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class); messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class); } }
1.2 参数调优 1)CONNECT_TIMEOUT_MILLIS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class TestConnectionTimeout { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300 ) .channel(NioSocketChannel.class) .handler(new LoggingHandler()); ChannelFuture future = bootstrap.connect("127.0.0.1" , 8080 ); future.sync().channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.debug("timeout" ); } finally { group.shutdownGracefully(); } } }
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public final void connect ( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0 ) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } }
2)SO_BACKLOG 属于 ServerSocketChannal 参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue 其中
netty 中
可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
1 2 3 4 5 6 public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private volatile int backlog = NetUtil.SOMAXCONN; }
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
1 2 3 4 5 6 7 8 public class Server { public static void main (String[] args) throws IOException { ServerSocket ss = new ServerSocket(8888 , 2 ); Socket accept = ss.accept(); System.out.println(accept); System.in.read(); } }
客户端启动 4 个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Client { public static void main (String[] args) throws IOException { try { Socket s = new Socket(); System.out.println(new Date()+" connecting..." ); s.connect(new InetSocketAddress("localhost" , 8888 ),1000 ); System.out.println(new Date()+" connected..." ); s.getOutputStream().write(1 ); System.in.read(); } catch (IOException e) { System.out.println(new Date()+" connecting timeout..." ); e.printStackTrace(); } } }
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
1 2 Tue Apr 21 20 :30 :28 CST 2020 connecting... Tue Apr 21 20 :30 :28 CST 2020 connected...
第 4 个客户端连接时
1 2 3 Tue Apr 21 20:53:58 CST 2020 connecting... Tue Apr 21 20:53:59 CST 2020 connecting timeout... java.net.SocketTimeoutException: connect timed out
3)ulimit -n 4)TCP_NODELAY 5)SO_SNDBUF & SO_RCVBUF SO_SNDBUF 属于 SocketChannal 参数 SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上) 6)ALLOCATOR 属于 SocketChannal 参数 用来分配 ByteBuf, ctx.alloc() 7)RCVBUF_ALLOCATOR 属于 SocketChannal 参数 控制 netty 接收缓冲区大小 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定 1.3 RPC 框架 1)准备工作 这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data public abstract class Message implements Serializable { public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } }
响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } }
服务器架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class RpcServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
客户端架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
服务器端的 service 获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ServicesFactory { static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap<>(); static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties(); properties.load(in); Set<String> names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service" )) { Class<?> interfaceClass = Class.forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError(e); } } public static <T> T getService (Class<T> interfaceClass) { return (T) map.get(interfaceClass); } }
相关配置 application.properties
1 2 serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
2)服务器 handler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage > { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message) { RpcResponseMessage response = new RpcResponseMessage(); response.setSequenceId(message.getSequenceId()); try { HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName())); Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); Object invoke = method.invoke(service, message.getParameterValue()); response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(e); } ctx.writeAndFlush(response); } }
3)客户端代码第一版 只发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage( 1 , "cn.itcast.server.service.HelloService" , "sayHello" , String.class, new Class[]{String.class}, new Object[]{"张三" } )).addListener(promise -> { if (!promise.isSuccess()) { Throwable cause = promise.cause(); log.error("error" , cause); } }); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
4)客户端 handler 第一版 1 2 3 4 5 6 7 8 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage > { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); } }
5)客户端代码 第二版 包括 channel 管理,代理,接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 @Slf4j public class RpcClientManager { public static void main (String[] args) { HelloService service = getProxyService(HelloService.class); System.out.println(service.sayHello("zhangsan" )); } public static <T> T getProxyService (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class[]{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage( sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(msg); DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop()); RpcResponseMessageHandler.PROMISES.put(sequenceId, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException(promise.cause()); } }); return (T) o; } private static Channel channel = null ; private static final Object LOCK = new Object(); public static Channel getChannel () { if (channel != null ) { return channel; } synchronized (LOCK) { if (channel != null ) { return channel; } initChannel(); return channel; } } private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (Exception e) { log.error("client error" , e); } } }
6)客户端 handler 第二版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage > { public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>(); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); if (promise != null ) { Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); if (exceptionValue != null ) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } } }
2. 源码分析 2.1 启动剖析 我们就来看看 netty 中对下面的代码是怎样进行处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector selector = Selector.open(); NioServerSocketChannel attachment = new NioServerSocketChannel(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); SelectionKey selectionKey = serverSocketChannel.register(selector, 0 , attachment); serverSocketChannel.bind(new InetSocketAddress(8080 )); selectionKey.interestOps(SelectionKey.OP_ACCEPT);
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { } return regFuture; }
关键代码 io.netty.bootstrap.ServerBootstrap#init
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void init (Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0 )); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0 )); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
关键代码 io.netty.channel.ChannelInitializer#initChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } return true ; } return false ; }
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
1 2 3 4 5 6 7 protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
2.2 NioEventLoop 剖析 NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void execute (Runnable task) { if (task == null ) { throw new NullPointerException("task" ); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
1 2 3 4 5 6 @Override protected void wakeup (boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false , true )) { selector.wakeup(); } }
启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doStartThread () { assert thread == null ; executor.execute(new Runnable() { @Override public void run () { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false ; updateLastExecutionTime(); try { SingleThreadEventExecutor.this .run(); success = true ; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: " , t); } finally { } } }); }
io.netty.channel.nio.NioEventLoop#run
主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 protected void run () { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue ; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: boolean oldWakenUp = wakenUp.getAndSet(false ); select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } default : } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue ; } cancelledKeys = 0 ; needsToSelectAgain = false ; final int ioRatio = this .ioRatio; if (ioRatio == 100 ) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return ; } } } catch (Throwable t) { handleLoopException(t); } } }
⚠️ 注意 这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作 参考下图
io.netty.channel.nio.NioEventLoop#select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private void select (boolean oldWakenUp) throws IOException { Selector selector = this .selector; try { int selectCnt = 0 ; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ; if (timeoutMillis <= 0 ) { if (selectCnt == 0 ) { selector.selectNow(); selectCnt = 1 ; } break ; } if (hasTasks() && wakenUp.compareAndSet(false , true )) { selector.selectNow(); selectCnt = 1 ; break ; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break ; } if (Thread.interrupted()) { selectCnt = 1 ; break ; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1 ; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { selector = selectRebuildSelector(selectCnt); selectCnt = 1 ; break ; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { } } catch (CancelledKeyException e) { } }
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
1 2 3 4 5 6 7 8 9 private void processSelectedKeys () { if (selectedKeys != null ) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
io.netty.channel.nio.NioEventLoop#processSelectedKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { return ; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
2.3 accept 剖析 nio 中如下代码,在 netty 中的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false ); channel.register(selector, SelectionKey.OP_READ); } }
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void read () { assert eventLoop () .inEventLoop () ; final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0 ) { break ; } if (localRead < 0 ) { closed = true ; break ; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null ) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true ; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
2.4 read 剖析 再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public final void read () { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return ; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null ; boolean close = false ; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0 ) { byteBuf.release(); byteBuf = null ; close = allocHandle.lastBytesRead() < 0 ; if (close) { readPending = false ; } break ; } allocHandle.incMessagesRead(1 ); readPending = false ; pipeline.fireChannelRead(byteBuf); byteBuf = null ; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
1 2 3 4 5 6 7 8 9 10 11 12 public boolean continueReading (UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0 ; }