RpcProject
Netty
概述?
Netty是一个异步的、基于事件驱动的网络应用框架
引入Netty
1 | dependency> |
五大部件
- EventLoop & EventLoopGroup
- Channel & ChannelFuture
- Future & Promise
- Handler & Pipeline
- ByteBuf
EventLoop & EventLoopGroup
EventLoop(事件循环),相当于一个单线程执行器,维护了一个selector
作用:内部有run方法处理channel上源源不断的IO事件
基本API
EventLoop的继承
- 继承EventExcutor,内部有方法可以查看EventLoop属于的组,以及判断一个线程属不属于此EventLoop
核心API
- 可以通过EventLoop的next方法获得
- excute(Runnable)执行任务
- submit(Runnable)执行任务,返回一个Future对象
- scheduleAtFixedRate(Runnable, initialDelay, period, TimeUnit)执行定时任务
EventLoopGroup(事件循环组):内部包含一组EventLoop
常用的实现类:
NioEventLoopGroup:负责处理 IO 事件、普通任务、定时任务DefaultEventLoopGroup:负责处理普通任务、定时任务
核心 API:
new NioEventLoopGroup(int):创建一个 NioEventLoopGroup,参数是 int 类型的值,如果不传,按默认的值来设置- next():获得下一个EventLoop对象
1 | EventLoopGroup group1 = new NioEventLoopGroup(5); //此对象可以处理: io事件、普通任务、定时任务 |
执行IO请求
NioEventLoopGroup可以处理三种任务:IO事件、普通任务、定时任务
1 | new ServerBootstrap() |
任务分工与细化
分工
boss负责ACCEPT请求,worker负责写请求,在Netty的任务可以细分
1 | new ServerBootStrap() |
细化
如果有比较重量级的操作,可以单独分配一个EventLoopGroup来专门执行这种重量级操作,以免阻塞其他任务
1 | //【细化】让一个group专门去做耗时的工作 |
Channel & ChannelFuture
channel:一个到实体的开放连接
实体包括一个硬件设备、一个文件、一个Socket等等内容
类似于NIO通道,Netty的Channel本身也是对Channel的封装
核心API
close()可以用来关闭 channelcloseFuture():用来处理 channel 的关闭,可以附加其他操作sync方法作用是同步等待 channel 关闭addListener异步等待 channel 关闭
pipeline():方法添加处理器write()方法将数据写入(写入发送缓冲区,但不一定立即发送,可能达到一定大小,才会发送出去)writeAndFlush():将数据立刻写入并刷出(写入缓冲区并且立即发送)- 这个方法相当于调用
write()与flush()两个方法
- 这个方法相当于调用
ChannelFuture对象
ChannelFuture:异步 IO 操作的返回结果(成功、失败、或是取消)
由于Netty中所有IO操作都是异步的,这意味着任何IO调用都将立即返回,但不保证请求的IO操作已经在调用结束时完成
所以就有了channelFuture这个对象,用于在某个时间点确定结果
比如:客户端连接操作
1 | ChannelFuture channelFuture = new Bootstrap() |
执行会发现,不会给服务器发送hello world的信息
?为什么
因为Netty是异步非阻塞的,main线程发起调用,但其实是创建了一个新的NIO线程来执行connect()操作,这个连接操作很费时
而main线程调用完后,会立即向下执行(由于非阻塞),因此获得的channel对象不是成功建立连接后的对象,发消息也就发不出去了
如何解决?
- 【法一】调用
sync方法,这个方法会让 main 线程与运行 connect 的线程同步(即阻塞main线程直到channel创建完成) - 【法二】调用
addListener(回调对象),传入一个GenericFutureListener接口,我们可以传入其子接口ChannelFutureListener
1 | channelFuture.addListener(new ChannelFutureListener() { |
注意:
Channel对象的关闭与连接一样,都是由另外的线程真正执行的关闭操作,Channel对象可以调用sync同步或者addListener异步来执行通道关闭后的操作
Future&Promise
Future是另一种在操作完成时通知APP的方式
(还有一种是回调方法,比如新的连接建立触发;channelActive())
JUC 也有 Future 对象,Netty 的 Future 继承了 JUC 的 Future,Promise 是对 Netty Future 的进一步扩展
一句话:
Promise继承 nettyFuture;nettyFuture继承 JUCFuture
区别:
- JDK
Future只能同步等待任务结束,才能得到结果 - Netty
Future可以同步 / 异步等待任务结束,然后获得结果 Promise不仅有 Future 的功能,而且脱离了任务独立存在,作为两个线程间传递结果的容器
核心API对比
| 功能 | JDK Future | Netty Future | Promise |
|---|---|---|---|
cancel |
取消任务 | 继承 | 继承 |
isCanceled |
判断任务是否取消 | 继承 | 继承 |
isDone |
判断任务是否结束(成功 / 失败) | 继承 | 继承 |
get |
获得结果(阻塞等待) | 继承 | 继承 |
getNow |
无 | 获取任务结果,非阻塞,会先立即返回 null | 继承 |
await |
无 | 等待任务结束(任务失败不会抛出异常) | 继承 |
sync |
无 | 等待任务结束(任务失败抛出异常) | 继承 |
isSuccess |
无 | 判断任务是否成功 | 继承 |
cause |
无 | 获取失败信息(非阻塞),如果没有失败返回 null | 继承 |
addListener |
无 | 添加回调,异步接收结果 | 继承 |
setSuccess |
无 | 无 | 设置成功返回的结果 |
setFailure |
无 | 无 | 设置失败返回的结果 |
事件&ChannelHandler&ChannelPipeLine
事件:某些动作完成时发送的消息
入站端的事件比如有:连接已激活或失活、数据读取、用户事件、错误事件
出站事件:打开关闭远程节点的连接、数据写到或冲刷到套接字
ChannelHandler:处理 IO 事件或拦截 IO 操作,并将其转发到其 PipeLine 中的下一个 handler(可以理解为一道工序)
简单的我们可以理解为:是一个可以响应不同事件的回调方法
这里有个图,很好的表示了事件和 ChannelHandler 的关系

如图所示,在一个事件发生后,会经过一系列的处理器(即 Handler)最后成功入站或出站
注意:Netty 服务器至少需要两部分:
- 至少一个
ChannelHandler- 引导:客户端 Bootstrap、服务端 ServerBootstrap
引导的作用:为了配置服务器,比如连接服务器到指定的端口
ChannelPipeLine:一个 Handler 的列表(可以理解为一条流水线)或者说是 ChannelHander 的容器
InboundHandler
有两种 ChannelHandler,分为入站、出站两种(注意是站,不是栈)
ChannelInboundHandlerAdapter
ChannelHandler是一个父接口,使用需要重写很多方法,所以实现时,我们可以使用他们的适配器Adapter的子类,来很快的达到目的
入站处理器通常是 ChannelInboundHandlerAdapter 的子类,下面是它的相关事件的回调
(所谓回调:当触发对应事件的时候,会调用相关方法)
channelRead():每个消息传入都会调用此方法channelReadComplete():当前批量读取的最后一条消息会触发此方法exceptionCaught():读取操作发生异常会调用此方法
SimpleChannelInboundHandler
此类是 ChannelInboundHandlerAdapter 的子类,它的相关回调有:
channelActive():当连接建立后将被调用channelRead0():收到一条消息就被调用exceptionCaught():读取操作发生异常会调用此方法
注意:
channelRead0可能会执行很多次,因为由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被一次性接收
那么 SimpleChannelInboundHandler 和他的父类 ChannelInboundHandlerAdapter 有什么区别呢?
前者会帮我们释放关于 ByteBuf 的内存引用,而后者没有实现。
OutboundHandler
出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要用来对写回数据进行加工
通过通道获取 PipeLine,通过 PipeLine 的 addLast 方法添加处理器
ChannelPipeline 与 Handler 的执行流程
PipeLine(或者说 Handler 的执行流程)的结构就是一个双向链表
两个特殊的 Handler:一个 head,一个 tail
注意:
pipeline.addLast(xxx):会将 handler 加到 tail 之前,并不是真正的最后Inbound负责执行读操作,是从head开始的OutBound负责执行写操作,是从tail开始的
Inboundhandler内一个handler如何传递对象给下一个handler?
可以通过调用 super.channelRead(ctx,msg); 方法,将想要传递的对象作为 msg 传递给下一个 handler,
其内部其实调用的是 fireChannelRead 方法
1 | @Skip |
如果某一个 handler 没有调用此方法,那么传递链就会断开
channel的write与ctx的write的区别!(重要)
这两个容易搞混,如图示:
channel的write是从 tail 开始向前找到第一个Outboundhandler(注意:是从 tail 开始!找的OutBoundHandler,不找InboundHandler)ctx的write是从当前handler开始向前找Outboundhandler(注意:是从当前handler开始,也是找的outboundhandler)
ByteBuf
ByteBuf 是 netty 对 NIO ByteBuffer 的一个增强
创建
创建一个 ByteBuf 不能直接 new,需要 ByteBufAllocator 来创建
- 使用
ByteBufAllocator.DEFAULT.buffer() - 默认字节数组长度 256 字节,可以自己指定
- 如果存满,会自动扩容(NIO 会报错)
1 | // 可以传入一个Capacity,默认是256字节,而且可以扩容 |
- 默认创建的就是直接内存,也可以创建堆内存
1 | ByteBufAllocator.DEFAULT.buffer();// 默认就创建直接内存 |
直接内存与堆内存的区别:
1、管理机制不同:直接内存由 OS 管理,堆内存由 JVM 管理
2、创建速度:直接内存创建比较麻烦,堆内存创建十分迅速
综上:
直接内存的优点是读写性能好(会少一次数据 Copy 的过程),但是创建比较慢,而且需要自己进行释放
而堆内存的优点是创建速度快,但是读写性能会低一点
池化思想
为什么ByteBuf 引入池化思想?
Netty采用了直接内存,但是直接内存的缺点是创建比较慢,索引引入了池化思想,优化池功能在4.1前的不成熟,默认是非池化的
4.1之后,默认是池化创建(非Android平台)
可以配置参数设置是否开启池化
组成
四部分:废弃部分、可读部分、可写部分、可扩容部分

- 有读指针和写指针
- 最大容量代表int类型最大值
相比NIO Buffer, ByteBuffer的优势
- 可以扩容(有扩容)
- 不必来回切换读写模式(有读指针和写指针)
扩容规则
写入后容量大小是否超过512
超过:选择下一个2^n
未超过:选择下一个16的整数倍
(扩容不能超过最大容量)
核心API
写操作
ByteBuf 给每一个基本数据类型都提供了写方法,此处捡重点说一下
writeBoolean(boolean):ByteBuf 内部存储时,使用 0 表示 false,1 表示 truewriteInt(int) & writeIntLE(int):这两个方法有什么区别呢?
writeInt(int)就是常用的方法,他是大端存储的writeIntLE(int)是小端存储的
大端存储 6,其结构就是 0000 0110,而小端存储为 0110 0000
(一般内存设计都是大端存储的,极个别的内存厂商使用小端存储)
writeBytes(xxx):这个方法可传入的参数非常多,甚至包括 NIO 的 ByteBufwriteCharSequence(CharSequence子类, 字符集):这个方法可以传入字符串等等 CharSequence 的子类
读操作
读操作主要的方法有三个
readByte():读取 1 个字节readInt():读取 4 个字节markReaderIndex():标记当前的读位置resetReaderIndex():重置读位置到标记的位置
RPCProject
什么是RPC? 有一篇RPC简单讲述了RPC
角色
RPC是构建现代分布式系统和微服务架构的关键技术之一,使得构建跨网络和跨平台的应用程序变得更加容易和高效。
构成
- Netty搭建的服务端
- Netty搭建的客户端
- 一些通用类
服务启动流程
创建RpcNettyServer
启动需要两个地址,两个地址用于创建父类NettyServer
- 服务IP地址
- Zookeeper服务器地址(服务注册地址) 2181
实现了ApplicationContextAware
重写setApplicationContext方法扫描所有带有注解@RpcService的方法
实现了InitializingBean,afterPropertiesSet方法中启动父类NettyServer
实现了DisposableBean 在服务关闭的时候,关闭NettyServer
NettyServer启动
- RpcNettyServer调用了父类NettyServer的start方法,启动Netty服务器
- 创建NettyServerInitializer
- Netty启动ServerBootStrap
- 相关配置
- Netty容器
- boosGroup 处理Accept请求
- workerGroup 负责READ、WRITE请求
- 处理器
- 入站处理器
- IdleStateHandler 心跳状态感知
- LengthFieldBasedFrameDecoder 解决TCP存在的粘包半粘包问题(负责处理基于长度字段的数据帧,RpcRequest制定了数据帧的格式)
- RpcMsgDecoder 反序列化,可以接入不同的序列化工具本项目使用的是Protostuff
- RpcServerHandler 负责执行客户端传来的请求,执行RPC调用
- 出站处理器
- RpcMsgEncoder 序列化
- 入站处理器
- Netty容器
- 相关配置
服务注册流程
扫描@RpcService
扫描所有的@RpcService服务,将它们放入一个名叫serviceMap的服务,并将他们放入一个名叫serviceMap的HashMap
- key 是接口名 +版本
- value是对应的实现类
遍历serviceMap
- 将serviceMap里的数据拆开,封装为RpcServiceInfo对象(RpcServiceInfo对象的属性有serviceName, version)
- 将所有的RpcServiceInfo封装到一个RpcProtocol对象内(RpcProtocol的属性
host,port,List<RpcServiceInfo>) - 调用JsonUtil的序列化方法,使用Jackson将RpcProtocol转换为一个Json字符串
- 使用Zookeeper的连接curatorClient
- 创建对应的Zookeeper路径节点创建临时顺序节点
- 给这个节点加一个监听器,如果连接状态变为RECONNECTED,那么重新进行服务注册
客户端启动
使用一个IP启动RpcClient,会启动服务发现ServiceDiscovery,开始服务发现,服务发现结束客户端启动过程就结束
服务发现
客户端启动时,会根据IP创建一个服务发现类
服务发现类创建后进行服务发现操作:
- 通过curatorClient,获取Zookeeper路径上的所有临时节点,读到的是一个List<String>
- 遍历这个List,将所有的String序列化为RpcProtocol
- 转换后的RpcProtocol需要存放在本地,这里涉及到一个新的类ConnectionManager
- ConnectionManager是单例的,每个客户端只有一个
- ConnectionManager关键使用到了两个Map来保存数据
- 类型为CopyOnWriteArraySet的rpcProtocolSet的rpcProtocolSet
- 采用cow的思想,适合用在读多写少的情况
- 存放当前所有能连接到Server的RpcProtocol
- 类型为concurrenthashMap的connectedServerNodeMap
- 使用了concurrent的hasMap
- key存放这个RpcProtocol
- value存放连接中获取到的RpcClienthandler
- 类型为CopyOnWriteArraySet的rpcProtocolSet的rpcProtocolSet
- 将RpcProtocol作为参数,调用ConnectionManager的方法,获取与更新服务
- 将RpcProtocol中的List<RpcServiceInfo>,取出来,放入set去重,然后遍历这个set
判断rpcProtocolSet中是否存在,如果不存在,就尝试连接这个Server节点,并将与Server的连接存放在connectedServerNodeMap,将这个RpcProtocol存放在rpcProtocolSet- 启动Netty客户端,根据RpcProtocol中的host与IP创建与Server的连接
- 客户端的Netty的处理器配置为
![image-20240912151349099]()
- 使用Netty的异步ChannelFuture,连接如果完成,将可以获得到此次连接用到的处理器RpcClientHandler
- 将RpcProtocol和获取到的RpcClientHandler存放到connectedServerNodeMap
- 遍历rpcProtocolSet,找出set中不存在的那些RpcProtocol,说明这些服务已经停止提供,删除他们的连接
- connectionServerNodeMap删除对应信息
- rpcProtocolSet删除对应信息
- 将RpcProtocol中的List<RpcServiceInfo>,取出来,放入set去重,然后遍历这个set
- 通过curatorClient添加对应的监听器,获取事件的类型与数据,分情况进行处理
- CONNECTION_RECONNECTION 重新进行服务发现
- CHILD_ADDED 存储这些新添加的
- CHILD_UPDATED
- CHILD_REMOVED
- 服务发现结束
Rpc过程
client过程
客户端提供了异步通信的方式,假如客户现在要进行一个远程方法调用(即客户只知道接口,但是没有实现类)
需要传输
- 要使用接口的class
- 版本信息
使用动态代理ObjectProxy提交信息
- 类ObjectProxy是一个实现了InvocationHandler的类(JDK动态代理)
- 实现了InvocationHandler,需要重写Invoke方法,三个参数,proxy,方法,参数
- Invoke 方法会在代理对象的其他方法执行时自动执行
- invoke具体过程
- 封装一个RpcRequest
- String requestId:UUID生成
- String className: 当前执行方法所属class名子,通过反射的getDeclaringCalss获得
- String methodName: 当前方法名
- Class<?>[] parameterTypes: 当前的参数类型
- Object version: 版本号
- 使用类名和版本号这两个信息从ConnectionManager的connectedServerNodeMap中通过RpcProtocol找到RpcClientHandler (找的过程使用了负载均衡)
- 调用RpcClientHandler的方法发送第一步生成的RpcRequest,客户端会得到一个RpcFuture,这是一个Futrue对象,可以异步获取结果
- 封装一个RpcRequest
- 类ObjectProxy是一个实现了InvocationHandler的类(JDK动态代理)
Server过程
- client发送过来的RpcRequest,将进入server端的入站处理器
- server端过滤掉心跳信息
- 通过传过来的rpcRequest,从serviceMap中找到执行这一个方法的class对象
- 通过Class对象获取到Method对象
- 利用反射机制执行method.invoke(serviceBean, parameters)
- 将执行完成的结果封装到RpcResponse内,返回给客户端
客户端收到RpcFuture对象后,执行get(3000, TimeUnit.MILLSECONDS)获得执行的结果
两个Handler
RpcClientHandler
- 发送RpcFuture时,会将RpcFuture异步对象放入一个ConcurrentMap的pendingRpcMap内保存
保存在pendingRpcMap,key为RpcRequest的ID,value是RpcFuture - 收到服务器返回的结果时,通过response获得request的id,然后查pendingRpcMap,并将执行结果存放在RpcFuture内
RpcServerHandler
根据RpcRequest,利用反射执行,然后返回执行的结果RpcResponse

