RpcProject

Netty

概述?

Netty是一个异步的、基于事件驱动的网络应用框架

引入Netty

1
2
3
4
5
dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>

五大部件

  1. EventLoop & EventLoopGroup
  2. Channel & ChannelFuture
  3. Future & Promise
  4. Handler & Pipeline
  5. ByteBuf

EventLoop & EventLoopGroup

EventLoop(事件循环),相当于一个单线程执行器,维护了一个selector

作用:内部有run方法处理channel上源源不断的IO事件

基本API

EventLoop的继承

  • 继承EventExcutor,内部有方法可以查看EventLoop属于的组,以及判断一个线程属不属于此EventLoop

核心API

  • 可以通过EventLoop的next方法获得
  • excute(Runnable)执行任务
  • submit(Runnable)执行任务,返回一个Future对象
  • scheduleAtFixedRate(Runnable, initialDelay, period, TimeUnit)执行定时任务

EventLoopGroup(事件循环组):内部包含一组EventLoop

常用的实现类:

  • NioEventLoopGroup:负责处理 IO 事件、普通任务、定时任务
  • DefaultEventLoopGroup:负责处理普通任务、定时任务

核心 API:

  • new NioEventLoopGroup(int):创建一个 NioEventLoopGroup,参数是 int 类型的值,如果不传,按默认的值来设置
  • next():获得下一个EventLoop对象
1
2
3
4
5
6
7
8
EventLoopGroup group1 = new NioEventLoopGroup(5); //此对象可以处理: io事件、普通任务、定时任务
EventLoopGroup group2 = new DefaultEventLoopGroup(); //此对象可以处理: 普通任务、定时任务
group1.execute(() -> {
System.out.println("执行普通任务");
});
group1.next().scheduleAtFixedRate(() -> {
System.out.println("执行定时任务");
}, 0, 1, TimeUnit.SECONDS);

执行IO请求

NioEventLoopGroup可以处理三种任务:IO事件、普通任务、定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
new ServerBootstrap()
.group(new NioEventLoopGroup())
// 这一步就是创建EventLoopGroup
.channel(NioServerSocketChannel.class)
// 设置通道
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 进行处理
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug(byteBuf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);

任务分工与细化

分工

boss负责ACCEPT请求,worker负责写请求,在Netty的任务可以细分

1
2
3
4
new ServerBootStrap()
//这里可以指定两个EventLoopGroup
//第一个就是boss,第二个就是worker
.group(new NioEventLoopGroup(), new NioEventLoopGroup())

细化

如果有比较重量级的操作,可以单独分配一个EventLoopGroup来专门执行这种重量级操作,以免阻塞其他任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//【细化】让一个group专门去做耗时的工作
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
// 【分工】:这里可以指定两个EventLoopGroup
// 第一个就是boss,第二个就是worker
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.debug(byteBuf.toString(Charset.defaultCharset()));
System.out.println("handler1");
ctx.fireChannelRead(msg);// 这个方法可以传msg到下一个handler
}
}).addLast(group,"handler2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("handler2");
System.out.println(msg);
}
});
}
})
.bind(8080);

Channel & ChannelFuture

channel:一个到实体的开放连接

实体包括一个硬件设备、一个文件、一个Socket等等内容

类似于NIO通道,Netty的Channel本身也是对Channel的封装

核心API

  • close() 可以用来关闭 channel
  • closeFuture():用来处理 channel 的关闭,可以附加其他操作
    • sync 方法作用是同步等待 channel 关闭
    • addListener异步等待 channel 关闭
  • pipeline():方法添加处理器
  • write() 方法将数据写入(写入发送缓冲区,但不一定立即发送,可能达到一定大小,才会发送出去)
  • writeAndFlush():将数据立刻写入并刷出(写入缓冲区并且立即发送)
    • 这个方法相当于调用 write()flush() 两个方法

ChannelFuture对象

ChannelFuture:异步 IO 操作的返回结果(成功、失败、或是取消)

由于Netty中所有IO操作都是异步的,这意味着任何IO调用都将立即返回,但不保证请求的IO操作已经在调用结束时完成

所以就有了channelFuture这个对象,用于在某个时间点确定结果

比如:客户端连接操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
// 获得ChannelFuture对象
channelFuture
//.sync() // 我们现在不去调用同步方法
.channel()
.writeAndFlush("hello world");

执行会发现,不会给服务器发送hello world的信息

?为什么

因为Netty是异步非阻塞的,main线程发起调用,但其实是创建了一个新的NIO线程来执行connect()操作,这个连接操作很费时

而main线程调用完后,会立即向下执行(由于非阻塞),因此获得的channel对象不是成功建立连接后的对象,发消息也就发不出去了

如何解决?

  • 【法一】调用 sync 方法,这个方法会让 main 线程与运行 connect 的线程同步(即阻塞 main 线程直到 channel 创建完成)
  • 【法二】调用 addListener(回调对象),传入一个 GenericFutureListener 接口,我们可以传入其子接口 ChannelFutureListener
1
2
3
4
5
6
7
8
channelFuture.addListener(new ChannelFutureListener() {
@Override
// operationComplete方法会由执行NIO的线程执行完成后调用
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("hello");
}
});

注意:

Channel 对象的关闭与连接一样,都是由另外的线程真正执行的关闭操作,Channel 对象可以调用 sync 同步或者 addListener 异步来执行通道关闭后的操作

Future&Promise

Future是另一种在操作完成时通知APP的方式

(还有一种是回调方法,比如新的连接建立触发;channelActive())

JUC 也有 Future 对象,Netty 的 Future 继承了 JUC 的 Future,Promise 是对 Netty Future 的进一步扩展

一句话:Promise 继承 nettyFuture;netty Future 继承 JUC Future

区别:

  • JDK Future 只能同步等待任务结束,才能得到结果
  • Netty Future 可以同步 / 异步等待任务结束,然后获得结果
  • Promise 不仅有 Future 的功能,而且脱离了任务独立存在,作为两个线程间传递结果的容器

核心API对比

功能 JDK Future Netty Future Promise
cancel 取消任务 继承 继承
isCanceled 判断任务是否取消 继承 继承
isDone 判断任务是否结束(成功 / 失败) 继承 继承
get 获得结果(阻塞等待) 继承 继承
getNow 获取任务结果,非阻塞,会先立即返回 null 继承
await 等待任务结束(任务失败不会抛出异常 继承
sync 等待任务结束(任务失败抛出异常 继承
isSuccess 判断任务是否成功 继承
cause 获取失败信息(非阻塞),如果没有失败返回 null 继承
addListener 添加回调,异步接收结果 继承
setSuccess 设置成功返回的结果
setFailure 设置失败返回的结果

事件&ChannelHandler&ChannelPipeLine

事件:某些动作完成时发送的消息

入站端的事件比如有:连接已激活或失活、数据读取、用户事件、错误事件

出站事件:打开关闭远程节点的连接、数据写到或冲刷到套接字

ChannelHandler:处理 IO 事件或拦截 IO 操作,并将其转发到其 PipeLine 中的下一个 handler(可以理解为一道工序)

简单的我们可以理解为:是一个可以响应不同事件的回调方法

这里有个图,很好的表示了事件和 ChannelHandler 的关系

img

如图所示,在一个事件发生后,会经过一系列的处理器(即 Handler)最后成功入站或出站

注意:Netty 服务器至少需要两部分:

  • 至少一个 ChannelHandler
  • 引导:客户端 Bootstrap、服务端 ServerBootstrap

引导的作用:为了配置服务器,比如连接服务器到指定的端口

ChannelPipeLine:一个 Handler 的列表(可以理解为一条流水线)或者说是 ChannelHander 的容器

InboundHandler

有两种 ChannelHandler,分为入站、出站两种(注意是站,不是栈)

ChannelInboundHandlerAdapter

ChannelHandler 是一个父接口,使用需要重写很多方法,所以实现时,我们可以使用他们的适配器 Adapter 的子类,来很快的达到目的

入站处理器通常是 ChannelInboundHandlerAdapter 的子类,下面是它的相关事件的回调

(所谓回调:当触发对应事件的时候,会调用相关方法)

  • channelRead():每个消息传入都会调用此方法
  • channelReadComplete():当前批量读取的最后一条消息会触发此方法
  • exceptionCaught():读取操作发生异常会调用此方法

SimpleChannelInboundHandler

此类是 ChannelInboundHandlerAdapter 的子类,它的相关回调有:

  • channelActive():当连接建立后将被调用
  • channelRead0():收到一条消息就被调用
  • exceptionCaught():读取操作发生异常会调用此方法

注意:channelRead0 可能会执行很多次,因为由服务器发送的消息可能会被分块接收。

也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被一次性接收

那么 SimpleChannelInboundHandler 和他的父类 ChannelInboundHandlerAdapter 有什么区别呢?

前者会帮我们释放关于 ByteBuf 的内存引用,而后者没有实现。

OutboundHandler

出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要用来对写回数据进行加工

通过通道获取 PipeLine,通过 PipeLine 的 addLast 方法添加处理器

ChannelPipeline 与 Handler 的执行流程

PipeLine(或者说 Handler 的执行流程)的结构就是一个双向链表

两个特殊的 Handler:一个 head,一个 tail

img

注意:

  • pipeline.addLast(xxx):会将 handler 加到 tail 之前,并不是真正的最后
  • Inbound 负责执行读操作,是从 head 开始的
  • OutBound 负责执行写操作,是从 tail 开始的

Inboundhandler 内一个 handler 如何传递对象给下一个 handler

可以通过调用 super.channelRead(ctx,msg); 方法,将想要传递的对象作为 msg 传递给下一个 handler

内部其实调用的是 fireChannelRead 方法

1
2
3
4
5
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

如果某一个 handler 没有调用此方法,那么传递链就会断开

channelwritectxwrite 的区别!(重要)

这两个容易搞混,如图示:

  • channelwrite 是从 tail 开始向前找到第一个 Outboundhandler(注意:是从 tail 开始!找的 OutBoundHandler,不找 InboundHandler
  • ctxwrite 是从当前 handler 开始向前找 Outboundhandler(注意:是从当前 handler 开始,也是找的 outboundhandler

ByteBuf

ByteBuf 是 netty 对 NIO ByteBuffer 的一个增强

创建

创建一个 ByteBuf 不能直接 new,需要 ByteBufAllocator 来创建

  • 使用 ByteBufAllocator.DEFAULT.buffer()
  • 默认字节数组长度 256 字节,可以自己指定
  • 如果存满,会自动扩容(NIO 会报错)
1
2
3
4
5
6
7
8
9
10
11
12
13
// 可以传入一个Capacity,默认是256字节,而且可以扩容
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(byteBuf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("2");
}
byteBuf.writeBytes(sb.toString().getBytes());
System.out.println(byteBuf);
/* 输出:(ridx读指针、widx写指针)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
*/
  • 默认创建的就是直接内存,也可以创建堆内存
1
2
3
ByteBufAllocator.DEFAULT.buffer();// 默认就创建直接内存
ByteBufAllocator.DEFAULT.directBuffer();// 创建直接内存
ByteBufAllocator.DEFAULT.heapBuffer();// 创建堆内存

直接内存与堆内存的区别:

1、管理机制不同:直接内存由 OS 管理,堆内存由 JVM 管理

2、创建速度:直接内存创建比较麻烦,堆内存创建十分迅速

综上:

直接内存的优点是读写性能好(会少一次数据 Copy 的过程),但是创建比较慢,而且需要自己进行释放

而堆内存的优点是创建速度快,但是读写性能会低一点

池化思想

为什么ByteBuf 引入池化思想?

Netty采用了直接内存,但是直接内存的缺点是创建比较慢,索引引入了池化思想,优化池功能在4.1前的不成熟,默认是非池化的

4.1之后,默认是池化创建(非Android平台)

可以配置参数设置是否开启池化

组成

四部分:废弃部分、可读部分、可写部分、可扩容部分

image-20240912190457295

  • 有读指针和写指针
  • 最大容量代表int类型最大值

相比NIO Buffer, ByteBuffer的优势

  1. 可以扩容(有扩容)
  2. 不必来回切换读写模式(有读指针和写指针)

扩容规则

写入后容量大小是否超过512

  • 超过:选择下一个2^n

  • 未超过:选择下一个16的整数倍

(扩容不能超过最大容量)

核心API

写操作

ByteBuf 给每一个基本数据类型都提供了写方法,此处捡重点说一下

  • writeBoolean(boolean):ByteBuf 内部存储时,使用 0 表示 false,1 表示 true

  • writeInt(int) & writeIntLE(int)
    

    :这两个方法有什么区别呢?

    • writeInt(int) 就是常用的方法,他是大端存储
    • writeIntLE(int) 是小端存储的

大端存储 6,其结构就是 0000 0110,而小端存储为 0110 0000

(一般内存设计都是大端存储的,极个别的内存厂商使用小端存储)

  • writeBytes(xxx):这个方法可传入的参数非常多,甚至包括 NIO 的 ByteBuf
  • writeCharSequence(CharSequence子类, 字符集):这个方法可以传入字符串等等 CharSequence 的子类

读操作

读操作主要的方法有三个

  • readByte():读取 1 个字节
  • readInt():读取 4 个字节
  • markReaderIndex():标记当前的读位置
  • resetReaderIndex():重置读位置到标记的位置

RPCProject

什么是RPC? 有一篇RPC简单讲述了RPC

角色

RPC是构建现代分布式系统和微服务架构的关键技术之一,使得构建跨网络和跨平台的应用程序变得更加容易和高效。

构成

  • Netty搭建的服务端
  • Netty搭建的客户端
  • 一些通用类

服务启动流程

创建RpcNettyServer

  • 启动需要两个地址,两个地址用于创建父类NettyServer

    • 服务IP地址
    • Zookeeper服务器地址(服务注册地址) 2181
  • 实现了ApplicationContextAware

    重写setApplicationContext方法扫描所有带有注解@RpcService的方法

  • 实现了InitializingBean,afterPropertiesSet方法中启动父类NettyServer

  • 实现了DisposableBean 在服务关闭的时候,关闭NettyServer

NettyServer启动

  • RpcNettyServer调用了父类NettyServer的start方法,启动Netty服务器
  • 创建NettyServerInitializer
  • Netty启动ServerBootStrap
    • 相关配置
      • Netty容器
        1. boosGroup 处理Accept请求
        2. workerGroup 负责READ、WRITE请求
      • 处理器
        1. 入站处理器
          • IdleStateHandler 心跳状态感知
          • LengthFieldBasedFrameDecoder 解决TCP存在的粘包半粘包问题(负责处理基于长度字段的数据帧,RpcRequest制定了数据帧的格式)
          • RpcMsgDecoder 反序列化,可以接入不同的序列化工具本项目使用的是Protostuff
          • RpcServerHandler 负责执行客户端传来的请求,执行RPC调用
        2. 出站处理器
          • RpcMsgEncoder 序列化

服务注册流程

扫描@RpcService

扫描所有的@RpcService服务,将它们放入一个名叫serviceMap的服务,并将他们放入一个名叫serviceMap的HashMap

  • key 是接口名 +版本
  • value是对应的实现类

遍历serviceMap

  • 将serviceMap里的数据拆开,封装为RpcServiceInfo对象(RpcServiceInfo对象的属性有serviceName, version)
  • 将所有的RpcServiceInfo封装到一个RpcProtocol对象内(RpcProtocol的属性host, port, List<RpcServiceInfo>
  • 调用JsonUtil的序列化方法,使用Jackson将RpcProtocol转换为一个Json字符串
  • 使用Zookeeper的连接curatorClient
    • 创建对应的Zookeeper路径节点创建临时顺序节点
    • 给这个节点加一个监听器,如果连接状态变为RECONNECTED,那么重新进行服务注册

客户端启动

使用一个IP启动RpcClient,会启动服务发现ServiceDiscovery,开始服务发现,服务发现结束客户端启动过程就结束

服务发现

客户端启动时,会根据IP创建一个服务发现类

服务发现类创建后进行服务发现操作:

  1. 通过curatorClient,获取Zookeeper路径上的所有临时节点,读到的是一个List<String>
  2. 遍历这个List,将所有的String序列化为RpcProtocol
  3. 转换后的RpcProtocol需要存放在本地,这里涉及到一个新的类ConnectionManager
    • ConnectionManager是单例的,每个客户端只有一个
    • ConnectionManager关键使用到了两个Map来保存数据
      • 类型为CopyOnWriteArraySet的rpcProtocolSet的rpcProtocolSet
        1. 采用cow的思想,适合用在读多写少的情况
        2. 存放当前所有能连接到Server的RpcProtocol
      • 类型为concurrenthashMap的connectedServerNodeMap
        1. 使用了concurrent的hasMap
        2. key存放这个RpcProtocol
        3. value存放连接中获取到的RpcClienthandler
  4. 将RpcProtocol作为参数,调用ConnectionManager的方法,获取与更新服务
    • 将RpcProtocol中的List<RpcServiceInfo>,取出来,放入set去重,然后遍历这个set
      判断rpcProtocolSet中是否存在,如果不存在,就尝试连接这个Server节点,并将与Server的连接存放在connectedServerNodeMap,将这个RpcProtocol存放在rpcProtocolSet
      • 启动Netty客户端,根据RpcProtocol中的host与IP创建与Server的连接
      • 客户端的Netty的处理器配置为
        image-20240912151349099
      • 使用Netty的异步ChannelFuture,连接如果完成,将可以获得到此次连接用到的处理器RpcClientHandler
      • 将RpcProtocol和获取到的RpcClientHandler存放到connectedServerNodeMap
    • 遍历rpcProtocolSet,找出set中不存在的那些RpcProtocol,说明这些服务已经停止提供,删除他们的连接
      • connectionServerNodeMap删除对应信息
      • rpcProtocolSet删除对应信息
  5. 通过curatorClient添加对应的监听器,获取事件的类型与数据,分情况进行处理
    • CONNECTION_RECONNECTION 重新进行服务发现
    • CHILD_ADDED 存储这些新添加的
    • CHILD_UPDATED
    • CHILD_REMOVED
  6. 服务发现结束

Rpc过程

client过程

  1. 客户端提供了异步通信的方式,假如客户现在要进行一个远程方法调用(即客户只知道接口,但是没有实现类)

    需要传输

    • 要使用接口的class
    • 版本信息
  2. 使用动态代理ObjectProxy提交信息

    • 类ObjectProxy是一个实现了InvocationHandler的类(JDK动态代理)
      • 实现了InvocationHandler,需要重写Invoke方法,三个参数,proxy,方法,参数
      • Invoke 方法会在代理对象的其他方法执行时自动执行
      • invoke具体过程
        1. 封装一个RpcRequest
          • String requestId:UUID生成
          • String className: 当前执行方法所属class名子,通过反射的getDeclaringCalss获得
          • String methodName: 当前方法名
          • Class<?>[] parameterTypes: 当前的参数类型
          • Object version: 版本号
        2. 使用类名和版本号这两个信息从ConnectionManager的connectedServerNodeMap中通过RpcProtocol找到RpcClientHandler (找的过程使用了负载均衡)
        3. 调用RpcClientHandler的方法发送第一步生成的RpcRequest,客户端会得到一个RpcFuture,这是一个Futrue对象,可以异步获取结果

Server过程

  1. client发送过来的RpcRequest,将进入server端的入站处理器
  2. server端过滤掉心跳信息
  3. 通过传过来的rpcRequest,从serviceMap中找到执行这一个方法的class对象
    • 通过Class对象获取到Method对象
    • 利用反射机制执行method.invoke(serviceBean, parameters)
    • 将执行完成的结果封装到RpcResponse内,返回给客户端

客户端收到RpcFuture对象后,执行get(3000, TimeUnit.MILLSECONDS)获得执行的结果

两个Handler

RpcClientHandler

  1. 发送RpcFuture时,会将RpcFuture异步对象放入一个ConcurrentMap的pendingRpcMap内保存
    保存在pendingRpcMap,key为RpcRequest的ID,value是RpcFuture
  2. 收到服务器返回的结果时,通过response获得request的id,然后查pendingRpcMap,并将执行结果存放在RpcFuture内

RpcServerHandler

根据RpcRequest,利用反射执行,然后返回执行的结果RpcResponse