随笔分类
RocketMQ启动源码(上)
NamesrvStartup
这是 RocketMQ启动类入口
// RocketMQ启动入口
public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
public static void main(String[] args) {
// 如果我们启动时有去使用 -c、-p去设置参数的话(在命令行中), 那么 args将会承载着这些参数
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创建 Namesrv控制器
// Namesrv控制器:初始化 Namesrv, 启动 Namesrv, 关闭 Namesrv
// 这里创建其的目的主要是读取配置信息, 对 Namesrv进行初始化
NamesrvController controller = createNamesrvController(args);
// 启动 Namesrv控制器
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
这里引入了 NamesrvController,这是 Namesrv的控制器,其主要去做的便是:初始化 Namesrv、关闭 Namesrv,这里主要来做的便是对 Namesrv进行初始化
首先便是通过 createNamesrvController
创建出了 NamesrvController,即:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 设置了下系统属性
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 启动时的参数由 commandLine来进行管理了 - 这不是我们关注的核心
// 即, 原来由 args承载着的配置参数信息转交给 command来去承载了
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// Namesrv配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// netty服务器配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Namesrv服务器监听端口修改为 9876
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 读取 -c选项的值
String file = commandLine.getOptionValue('c');
if (file != null) {
// 读取 config文件中的数据到 properties中去
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 如果 config配置文件内的配置涉及到 namesrvConfig 或者 nettyServerConfig中的一些字段的话, 这里会去进行覆写
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 将读取到的配置文件路径存储到 namesrvConfig字段 configStorePath中去
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 一般也不会去使用 -p选项
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
// 可见, 若是使用到了 -p选项, 这里会进行退出逻辑
System.exit(0);
}
// 将启动时命令行设置的 kv复写到 namesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 这里来检查下 RocketMQ环境变量进行了配置与否
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 创建日志对象
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 创建 Namesrv控制器
// 参数一:namesrvConfig
// 参数二:网络层配置 nettyServerConfig
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
主要干的事情:解析启动 Rocketmq时配置的参数信息,将其封装在对象 commandLine
中,然后创建出两个配置类:NamesrvConfig、NettyServerConfig,将配置信息中涉及到此两个配置类的相关采参数信息,这里会对其对应的字段进行覆写,之后便是去根据上述的两个配置类去创建出 NamesrvController
NamesrvConfig、NettyServerConfig,前者存放的是关于 Namesrv的配置信息,后者根据其名字,我们可以猜想出 Namesrv的服务端是基于 Netty来实现的,NettyServerConfig中也是存放着 Netty服务端的相关配置信息
那么这两个配置类中包含了那些信息呢?先来瞅一瞅:
// 与 NameSrv相关配置信息
public class NamesrvConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 获取 ROCKETMQ_HOME值(环境变量)
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// 获取 kvConfig.json路径, RocketMQ会将一些键值对持久化到该 json文件中, 启动时再去读取到内存中去
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 顺序消息是否开启 - 默认是关闭着的
private boolean orderMessageEnable = false;
对于 NamesrvConfig,我们主要关注于便是顺序消息的配置默认是关闭的
public class NettyServerConfig implements Cloneable {
// 服务端启动时监听的端口号
private int listenPort = 8888;
// 业务线程池的线程数量
private int serverWorkerThreads = 8;
// 根据该值创建 remotingServer内部的一个 publicExecutor
private int serverCallbackExecutorThreads = 0;
// netty workerGroup中线程数目
private int serverSelectorThreads = 3;
// 服务端 单向访问 客户端时的并发限制
private int serverOnewaySemaphoreValue = 256;
// 服务端 异步访问 客户端时的并发控制
private int serverAsyncSemaphoreValue = 64;
// channel的最大存活时间:2min
private int serverChannelMaxIdleTimeSeconds = 120;
// 发送缓冲区大小 - 65535
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 接收缓冲区大小 - 65535
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否有去使用 Netty的池化内存 - 默认是开启着的
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
// 表示默认是否去使用基于 epoll模式的 selector - Linux会去使用 epoll
private boolean useEpollNativeSelector = false;
Namesrv的服务端是基于 Netty来去实现的,因此我们关注的便是:线程池线程数目、监听端口、收发缓冲区大小等信息,其默认也配置着去使用池化内存,如果是 Linux os,会去使用基于 epoll
实现的 Selector
那么要让 NamesrvController起作用,我们需要干的便是启动它,即 start(controller)
在分析 start()之前,先去看看 NamesrvController的源码,从此层面去剖析 NamesrvController的职责
NamesrvController
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 存放与 Namesrv相关配置
private final NamesrvConfig namesrvConfig;
// 可以看到, Namesrv的服务端是基于 Netty来实现的
private final NettyServerConfig nettyServerConfig;
// 调度线程池, 执行定时任务, 主要感两件事:1.检查 broker的存活状态 2.打印配置
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 管理 KV配置
private final KVConfigManager kvConfigManager;
// 管理路由信息的对象
private final RouteInfoManager routeInfoManager;
// 网络层封装对象
private RemotingServer remotingServer;
// ChannelEventListener, 用于监听 channel状态, 当 channel状态发生改变时 close、idle等,
// 会向 "事件队列"发起事件, 事件最终会由该 brokerHousekeepingService来进行处理
private BrokerHousekeepingService brokerHousekeepingService;
// 业务线程池, Netty线程的主要任务是解析报文, 将报文解析成 RemotingCommand对象,
// 然后将该对象交给业务线程池再继续处理
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
除了上述已经讲过的那两个配置类外,可以看到 NamesrvController中通过对象 RouteInfoManager
对象来去管理着路由信息,即针对与 Topic到 broker ip的一个路由;其次,这里存在着两个线程池:scheduledExecutorService、remotingExecutor,前者是需要调度的线程,后者则是业务线程池
对于前者,主要去干两件事:a.检查 broker的存活状态 b.打印配置
对于后者,主要去处理业务相关的事,Netty线程主要去解析报文,将其解析为 remotingCommand对象,然后再讲对象转交给业务线程池来去进行处理
除了这些字段外,还有着一个比较核心的:RemotingServer,其实针对于网络层的封装对象
initialize()便是对 NamesrvController的初始化:
public boolean initialize() {
// 加载本地 KV配置
this.kvConfigManager.load();
// 创建网络服务器对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建业务线程池对象, 线程数默认是 8, 可以进行修改的
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 这里来注册了一个协议处理器(缺省的协议处理器)
this.registerProcessor();
// 接下来往调度线程池中去注册了两个定时任务
// 定时任务一:初始延迟 5s, 周期 10s, 检查 broker的存活状态, 将 idle状态的 broker移除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定时任务二:初始延迟 1min, 周期 10min, 打印 kv配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
主要干的便是:对上述所涉及到的两个线程池进行初始化,并且往调度线程池中注册了两个定时任务:检查 broker存活状态、定时打印 KV配置;还需要注意的便是,去默认注册了一个缺省的协议处理器:RequestDefaultProcessor,主要干的便是去创建出一个 Pair,K对应于缺省的协议处理器,V对应于业务线程池对象:
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else { // 我们主要考虑这条分支
// 注册一个缺省的 协议处理器
// 参数一:缺省的协议处理器
// 参数二:处理器工作时使用的线程池, 这里使用的是业务线程池
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
// 参数一:缺省的协议处理器
// 参数二:处理器工作时使用的线程池, 这里使用的是业务线程池
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
现在已经完成了对 NamesrvController的创建、初始化,现在便是去启动它!
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化 controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// JVM HOOK, 平滑关闭的逻辑;当 JVM被关闭时, 主动调用 controller.shutdown()方法, 让服务器平滑关机
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动服务器
controller.start();
return controller;
}
public void start() throws Exception {
// 启动服务器网络层对象
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
可以看到,真正去启动的其实还是我们的网络层封装对象:remotingServer
那么这个网络层封装对象究竟起着什么个作用?下一期对其进行剖析