随笔分类
RemotingServer
这是一个网络层的封装对象,封装着网络连接的细节
我们知道,Netty底层封装着 NIO的细节,因此我们便可以通过对启动器进行相关的配置以来达到连接的建立以及相关的初始化,那么对于 remotingServer而言,其便是对 Netty连接实现等再进行了一层封装,当然,我们知道本来 Netty连接已经是够简易的了,so,单纯的封装并没有太大一样,因此,这块做了很多的扩展
字段
剖析 remotingServer,可以从其字段引入讲起,进而细节剖析其实现
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
// netty服务端启动器
private final ServerBootstrap serverBootstrap;
// workerGroup
private final EventLoopGroup eventLoopGroupSelector;
// bossGroup
private final EventLoopGroup eventLoopGroupBoss;
// netty服务端网络配置
private final NettyServerConfig nettyServerConfig;
// 公共线程池, 注册 processor时如果未指定线程池的话, 则会去使用公共线程池
private final ExecutorService publicExecutor;
// HouseKeepingService, RocketMQ中存在着两个:
// brokerHouseKeepingService - namesrv用来监听与 broker的连接状态
// clientHousekeepingService - broker用来监听与 client的连接状态
private final ChannelEventListener channelEventListener;
// 定时器, 主要来执行 scanResponseTable任务 - 其实主要是来清理过时的 response任务的
private final Timer timer = new Timer("ServerHouseKeepingService", true);
// 当我们向 pipeline中添加 handler有去指定 group的话, 那么当网络事件传播到当前 handler时,
// 指定线程将会是指定 group中的线程
private DefaultEventExecutorGroup defaultEventExecutorGroup;
// 服务端启动端口
private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers - 共享的处理器, 多个 channel可进行共享
// 与 SSL鉴定相关, 已确认消息收发双端身份验证
private HandshakeHandler handshakeHandler;
private NettyEncoder encoder;
// 监听与 channel的连接状态, 当连接状态发生更变时 (激活、失效、连接、超时),
// 会生成一个 Netty事件, 交由相应组件来去进行处理 (会由 nettyEventExecutor中线程去处理, 最后会交给 houseKeepingService)
private NettyConnectManageHandler connectionManageHandler;
// 处理入站事件的处理器
private NettyServerHandler serverHandler; // 这个处理器比较关键
注释已经写得很清楚了,先从构造 remotingServer中做了哪些初始化:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
/**
* 服务端向客户端发送请求时是有并发限制的 - 在服务端配置中有
* a.单向请求的并发限制
* b.异步请求的并发限制
* 这里便是通过这两个并发控制相关配置信息来去创建出 Semaphore
*/
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
// 可以看到, 公共线程池中线程数目最终改为了 4
publicThreadNums = 4;
}
// 创建出固定线程数目的公共线程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
// 这里来指定了下线程名称的命名方式
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 实例化出 bossGroup、workerGroup
// Linux下, 默认下这里条件会成立
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
简洁明了,其实就是根据传递过来的配置信息来去对一些属性做些初始化而已:bossGroup、workerGroup、公共线程池 publicExecutor等,这里还有一个注意点:平常我们都会认为请求是由客户端 -> 服务端,但实际上也会存在服务端 -> 客户端的情况,这种情况下,服务端发送客户端的并发请求是存在并发限制的,因此,这里做了两种请求类型的并发限制:单向请求、异步请求,底层实现也比较简单,其实就是通过同步工具 Semaphore去申请令牌实现的
启动
现在可以直接从remotingServer.start()进行剖析:
@Override
public void start() {
// 实例化出 handler可能会使用到的 group
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 实例化出共享的 handler
prepareSharableHandlers();
ServerBootstrap childHandler =
// 这里来对 serverBootStrap进行配置
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 设置服务端选项
.option(ChannelOption.SO_BACKLOG, 1024) // 这里来设置了下连接队列长度 - 1024
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
// 设置客户端选项
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) // 发送缓冲区大小 - 65535
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) // 接收缓冲区大小 - 65535
// 设置服务器绑定端口 (启动端口和监听端口不同)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 这里来初始化客户端 pipeline的逻辑 - 默认添加了一些 handler
// 使用的 group均是 defaultEventExecutorGroup
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
// IdldStateHandler - 此处理器会去监听通道的空闲时间, 当空闲时间达到所设置的阈值时,
// 此时会去关闭通道
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
// 默认是 true
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
// 客户端开启线程池, 使用内存池分配器 PooledByteBufAllocator.DEFAULT
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 在这里, 服务器来进行了端口的绑定
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
// 将绑定的端口赋值给了字段 port
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 如果 houseKeepingService != null
if (this.channelEventListener != null) {
// 这里实际上便是初始化网络异常事件处理器
// 由此, 可以去看看 nettyEventExector中线程主要去干了哪些事情
this.nettyEventExecutor.start();
}
/**
* 提交了个定时任务, 每秒执行一次
* 定时去扫描 ResponseTable表, 将过期的 responseFuture清除
*/
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
start()方法中干的事情不少:初始化出我们往 pipeline中添加的 handler可能会使用到的 group (defaultEventExecutorGroup),实例化出共享使用的 handler,对 serverBootStrap去做些配置待连接建立时会使用到,以及还有两个比较关键的扩展点:一方面启动了网络异常事件的处理器,另一方面便是提交了个定时任务:定时扫描 responseFuture映射表,将其中过期的 responseFuture给清除掉,这过程中若是 future中存在异步任务的话,会去异步执行该任务
实例化 handler:
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
}
connectionManageHandler是一个双向的处理器 (也就是 "网络异常事件处理器"),主要去处理 channel连接状态发生变化相关的事件,其会去将这些事件封装成一个 Netty事件,然后交给 nettyEventExecutor去进行处理,进而后续会转交给 houseKeepingService去进行处理
serverHandler则是会对客户端发送过来的请求进行不同类型的处理:客户端请求服务端、客户端响应服务端
定时扫描 responseFuture映射表:
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
* </p>
*/
// 定时去扫描 ResponseTable表, 将过期的 responseFuture清除
public void scanResponseTable() {
// 表示被移除的 responseFuture表
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
// 扫描映射表
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
// true - 当前 responseFuture已经过期了
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
// 这来来执行过期的 responseFuture中可能存在的异步回调对象
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
服务端发送请求:
invokeSyncImpl():
同步怎么实现?服务端向 ch写刷数据 (remotingCommand),首先会先去创建一个回调处理对象 responseFuture,每个 remotingCommand会有一个唯一标识 opaque (这也唯一标识着每一个 responseFuture映射表中的 responseFuture),当数据写到出站缓冲区,经 flush()刷到 socket缓冲区,于此同时挂起业务线程,客户端接收进行响应后,响应即对应着 pipeline中的一个入站事件,经由 nettyHandler进行相应处理,这里回来去解除业务线程的挂起状态,业务线程便可以根据获取到的客户端的响应数据来去做些响应的处理了
/**
*
* @param channel 客户端 ch
* @param request 网络传输对象 - 这里指的是发送给客户端的对象
* @param timeoutMillis 超时时长
* 同步调用
*/
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
// 请求 id - 单进程内 RemotingCommand的唯一标识
final int opaque = request.getOpaque();
try {
// 创建出一个 ResponseFuture
// 参数一:客户端 ch
// 参数二:请求 id
// 请求三:超时时间
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
// key - 请求 id
// 将 responseFuture存入到 responseTable映射表中去
this.responseTable.put(opaque, responseFuture);
// 获取客户端远程地址信息
final SocketAddress addr = channel.remoteAddress();
// 将数据写入到客户端 ch, 并添加了个监听器 - 对数据发送请求的监听
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
// 写请求成功了
responseFuture.setSendRequestOK(true);
return;
} else {
// 失败了...
responseFuture.setSendRequestOK(false);
}
// 来到这, 说明请求失败了 - 消息都没有发送成功, 哪来的响应
// 将当前请求对应的 responseFuture从映射表中移除
responseTable.remove(opaque);
// 设置失败原因
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
/**
* 业务线程会在此被挂起 - 内部基于 countdownLatch实现的
* responseFuture中的结果什么时候会被设置?
* 入站事件对应的 serverHandler会对客户端发送过来的请求 / 响应来去做响应的处理
*/
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
// 线程执行到这, 两种情况:
// a.获取到了客户端响应数据, IO线程将业务线程唤醒
// b.超时
// 条件成立 - 没有获取到结果 - 超时了
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
// 即, 这属于发送 RemotingCommand给客户端失败的场景
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
// 此处, 返回服务端获取到的客户端响应的结果
return responseCommand;
} finally {
// 获取到结果了, 此处便可以安心将 responseFuture给删除啦
this.responseTable.remove(opaque);
}
}
挂起业务线程:
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
invokeAsyncImpl():
异步实现更为简单,传递一个回调处理对象即可,客户端响应回来时去执行异步回调对象即可,这期间业务线程不会阻塞
/**
*
* @param channel 客户端 ch
* @param request 网络传输对象 -
* @param timeoutMillis 超时时长
* @param invokeCallback 请求结果回调处理对象
* 异步调用
*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 开始时间
long beginStartTime = System.currentTimeMillis();
// 请求 Id
final int opaque = request.getOpaque();
// 尝试获取令牌 - 信号量
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
// 条件成立, 说明获取令牌成功, 当前服务器请求客户端并发度尚未达到阈值
if (acquired) {
// once操作封装了释放令牌的操作
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
// 获取执行到这一步的耗时
long costTime = System.currentTimeMillis() - beginStartTime;
// true - 已经超时了, 不用再发起 Rpc调用请求了
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
// 这里创建出了一个 responseFuture
// 参数一:客户端 ch
// 参数二:请求 Id
// 参数三:剩余超时时长
// 参数四:请求响应回调
// 参数五:令牌释放封装对象
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
// 将创建出来的 responseFuture添加到映射表中
this.responseTable.put(opaque, responseFuture);
try {
// 业务线程将数据交给 IO线程, netty IO线程负责数据的写、刷操作
// 然后注册了与此次写操作相关的一个监听器
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// true - 写刷成功
if (f.isSuccess()) {
// 设置 responseFuture状态为 true
responseFuture.setSendRequestOK(true);
return; // 于此, 业务线程便返回了
// 不难猜想, 异步回调处理是由别的线程去处理的
}
// 来到这, 说明发送失败
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else { // 执行到这里, 说明当前服务器并发度比较高, 获取令牌失败
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
invokeOnewayImpl():
单向请求,即服务端不关心也不需要客户端给予任何响应
/**
* 服务端主动向客户端发送单向请求, 服务端不关注与客户端的响应
* @param channel 客户端 ch
* @param request 网络传输对象
* @param timeoutMillis 超时时长
*/
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 给 RemotingCommand设置标记, 对端检查标记便可知道这是个什么类型的请求
request.markOnewayRPC();
// 尝试去获取令牌 - 存在单向请求的并发度
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
// 获取成功
if (acquired) {
// once封装着释放令牌的操作
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
// 将数据交给 IO线程, 由 netty IO线程完成数据的写、刷操作
// 以及注册了个监听器
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 于此释放令牌
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
处理客户端事件
可能是对服务端请求或响应,so,需要加以区分,区分实现便是在 serverHandler中进行了
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
// 处理入站事件 - 针对于 RemotingCommand
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
/**
* Entry of incoming command processing.
*
* <p>
* <strong>Note:</strong>
* The incoming remoting command may be
* <ul>
* <li>An inquiry request from a remote peer component;</li>
* <li>A response to a previous request issued by this very participant.</li>
* </ul>
* </p>
*
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
// 依据网络传输对象的 type来去判断此次入站事件的类型
switch (cmd.getType()) {
case REQUEST_COMMAND: // 客户端请求服务端
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: // 客户端响应服务端
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
客户端响应服务端:
/**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
// 设置客户端响应服务端结果
responseFuture.setResponseCommand(cmd);
// 响应结果设置完后, 便可以进行 responseFuture的移除了
responseTable.remove(opaque);
// 条件成立 - 存在异步回调 - 异步调用时
if (responseFuture.getInvokeCallback() != null) {
// 由公共线程池中线程执行异步回调处理对象
executeInvokeCallback(responseFuture);
} else {
// 设置 responseFuture结果, 此出获取调用 countdownLatch.countdown()
// 之后业务线程便可拿结果, 做些相应的操作了
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
如果服务端是同步请求的,则会走 responseFuture.release()
public void release() {
if (this.once != null) {
this.once.release();
}
}
// 于此实现令牌的释放
public void release() {
if (this.semaphore != null) {
if (this.released.compareAndSet(false, true)) {
this.semaphore.release();
}
}
}
如果服务端是异步请求的话,则会去异步执行回调处理对象 executeInvokeCallback(responseFuture)
异步任务谁来执行 - 公共线程池中的线程
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
// 调用异步回调处理对象 - 使用的是公共线程池中的线程
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
// 获取执行异步回调的线程池 - 实际上就是我们的公共线程池
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
// 于此, 异步回调得以执行
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
客户端 request:
这块涉及较多,后续将另开篇文章专门对其进行论述
其余扩展:
RpcHook
提供了些 Hook钩子的注册,通过 hook我们可以去实现一些类似于 aop的操作
/**
* Hook for business operation, similar as aop
*/
public interface RPCHook {
/**
* 在发送请求之前
* @param remoteAddr
* @param request
*/
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
/**
* 在响应回来之后
* @param remoteAddr
* @param request
* @param response
*/
void doAfterResponse(final String remoteAddr, final RemotingCommand request,
final RemotingCommand response);
}
/**
* 注册一些钩子
* 通过钩子来根据业务需求来去实现一些类似 aop的逻辑
* @param rpcHook
*/
@Override
public void registerRPCHook(RPCHook rpcHook) {
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}