文章目录
- 一、前言
- 二、Seata Server启动
-
- 1、找入口
- 2、整体执行流程
-
- 1)对配置文件做参数解析
- 2)初始化监控
- 3)创建TC与RM/TM通信的RPC服务器
- 4)初始化UUID生成器
-
- IdWorker
- 1> initTimestampAndSequence()
- 2> initWorkerId(Long)
- 5)设置事务会话(`SessionHolder`)、全局锁(`LockManager`)的持久化方式并初始化
-
- 1> SessionHolder
- 2> LockerManager
- 6)创建并初始化事务协调器(`DefaultCoordinator`)
- 7)注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator
- 8)启动NettyServer(NettyRemotingServer)
-
- 1> 首先注册消息处理器
- 2> 初始化`NettyRemotingServer`
-
- AbstractNettyRemotingServer.ServerHandler类
- 三、总结和后续
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
本文着重聊一聊seata-server启动时都做了什么?
PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。
二、Seata Server启动
Seata Server包含几个主要模块:Config(配置TC)、Store(TC运行时全局事务以及分支事务的相关信息通过Store持久化)、Coordinator(TC实现事务协调的核心)、Netty-RPC(负责TC与TM/RM交互)、Lock(资源全局锁的实现);
1、找入口
当要启动一个seata-server时,只需要执行压缩包中bin/目录下的seata-server.sh
,在这个脚本中会运行seata-server.jar
;
即对应于源码工程中的server目录 / seata-server 模块,由于seata-server是一个SpringBoot项目,找到其启动类ServerApplication
,里面仅仅指定了一个包扫描路径为io.seata
,并无其余特殊配置;
在启动类的同级目录下,有一个ServerRunner
类;
ServerRunner
类实现了CommandLineRunner
接口:
而CommandLineRunner
接口主要用于实现在Spring容器初始化后执行,并且在整个应用生命周期内只会执行一次;也就是说在Spring容器初始化后会执行ServerRunner#run()
方法;
ServerRunner#run()
方法中仅仅调用了Server#start()
方法;因此可以确定入口为io.seata.server.Server
类的start()方法;
2、整体执行流程
Server#start()
方法:
public class Server {
/**
* The entry point of application.
*
* @param args the input arguments
*/
public static void start(String[] args) {
// create logger
final Logger logger = LoggerFactory.getLogger(Server.class);
//initialize the parameter parser
//Note that the parameter parser should always be the first line to execute.
//Because, here we need to parse the parameters needed for startup.
// 1. 对配置文件做参数解析:包括registry.conf、file.conf的解析
ParameterParser parameterParser = new ParameterParser(args);
// 2、初始化监控,做metric指标采集
MetricsManager.get().init();
// 将Store资源持久化方式放到系统的环境变量store.mode中
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
// seata server里netty server 的io线程池(核心线程数50,最大线程数100)
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(),
NettyServerConfig.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
// 3、创建TC与RM/TM通信的RPC服务器--netty
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
// 4、初始化UUID生成器(雪花算法)
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
// 5、设置事务会话的持久化方式,有三种类型可选:file/db/redis
SessionHolder.init(parameterParser.getSessionStoreMode());
LockerManagerFactory.init(parameterParser.getLockStoreMode());
// 6、创建并初始化事务协调器,创建时后台会启动一堆线程
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
// 将DefaultCoordinator作为Netty Server的transactionMessageHandler;
// 用于做AT、TCC、SAGA等不同事务类型的逻辑处理
nettyRemotingServer.setHandler(coordinator);
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
// 7、注册ServerRunner销毁(Spring容器销毁)的回调钩子函数
ServerRunner.addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
// 8、启动netty Server,用于接收TM/RM的请求
nettyRemotingServer.init();
}
}
Server端的启动流程大致做了八件事:
- 对配置文件(包括registry.conf、file.conf)做参数解析;
- 初始化监控,做metric指标采集;
- 创建TC与RM/TM通信的RPC服务器(
NettyRemotingServer
)–netty;- 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
- 设置事务会话(
SessionHolder
)、全局锁(LockManager
)的持久化方式并初始化,有三种类型可选:file/db/redis;- 创建并初始化事务协调器(
DefaultCoordinator
),后台启动一堆线程做定时任务,并将DefaultCoordinator
绑定到RPC服务器上做为transactionMessageHandler
;- 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
- 启动netty Server,用于接收TM/RM的请求;
1)对配置文件做参数解析
具体代码执行流程如下:
ParameterParser的init()方法中:
- 首先从启动命令(运行时参数)中解析;
- 接着判断server端是否在容器中启动,是则从容器环境中获取seata环境、host、port、serverNode、storeMode存储模式等信息;
- 如果storeMode不存在,则从配置中心/文件中获取配置。
// 解析运行期参数,默认什么里面什么都没有
private void getCommandParameters(String[] args) {
JCommander jCommander = JCommander.newBuilder().addObject(this).build();
jCommander.parse(args);
if (help) {
jCommander.setProgramName(PROGRAM_NAME);
jCommander.usage();
System.exit(0);
}
}
// server端在容器中启动,则从容器环境中读取环境、host、port、server节点以及StoreMode存储模式
private void getEnvParameters() {
// 设置seata的环境
if (StringUtils.isBlank(seataEnv)) {
seataEnv = ContainerHelper.getEnv();
}
// 设置Host
if (StringUtils.isBlank(host)) {
host = ContainerHelper.getHost();
}
// 设置端口号
if (port == 0) {
port = ContainerHelper.getPort();
}
if (serverNode == null) {
serverNode = ContainerHelper.getServerNode();
}
if (StringUtils.isBlank(storeMode)) {
storeMode = ContainerHelper.getStoreMode();
}
if (StringUtils.isBlank(sessionStoreMode)) {
sessionStoreMode = ContainerHelper.getSessionStoreMode();
}
if (StringUtils.isBlank(lockStoreMode)) {
lockStoreMode = ContainerHelper.getLockStoreMode();
}
}
2)初始化监控
默认不开启,此处不做过多介绍
3)创建TC与RM/TM通信的RPC服务器
单纯的new一个NettyRemotingServer
,也没啥可说的;
4)初始化UUID生成器
UUID底层采用雪花算法,其用于生成全局事务id和分支事务id;
代码执行流程如下:
UUIDGenerator
会委托IdWorker来生成雪花id,生成的雪花Id由0、10位的workerId、41位的时间戳、12位的sequence序列号组成。
IdWorker
IdWorker中有8个重要的成员变量/常量:
/**
* Start time cut (2020-05-03)
*/
private final long twepoch = 1588435200000L;
/**
* The number of bits occupied by workerId
*/
private final int workerIdBits = 10;
/**
* The number of bits occupied by timestamp
*/
private final int timestampBits = 41;
/**
* The number of bits occupied by sequence
*/
private final int sequenceBits = 12;
/**
* Maximum supported machine id, the result is 1023
*/
private final int maxWorkerId = ~(-1 << workerIdBits);
/**
* business meaning: machine ID (0 ~ 1023)
* actual layout in memory:
* highest 1 bit: 0
* middle 10 bit: workerId
* lowest 53 bit: all 0
*/
private long workerId;
/**
* 又是一个雪花算法(64位,8字节)
* timestamp and sequence mix in one Long
* highest 11 bit: not used
* middle 41 bit: timestamp
* lowest 12 bit: sequence
*/
private AtomicLong timestampAndSequence;
/**
* 从一个long数组类型中抽取出一个时间戳伴随序列号,偏向一个辅助性质
* mask that help to extract timestamp and sequence from a long
*/
private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));
变量/常量解释:
- 常量
twepoch
表示我们的时间戳时间从2020-05-03
开始计算,即当前时间的时间戳需要减去twepoch
的值1588435200000L
;- 常量
workerIdBits
表示机器号workerId占10位;- 常量
timestampBits
表示时间戳timestamp占41位;- 常量
sequenceBits
表示序列化占12位;- 常量
maxWorkerId
表示机器号的最大值为1023;- long类型的变量
workerId
本身也是一个雪花算法,只是从开头往后数,第2位开始,一共10位用来表示workerId,其余位全是0;- AtomicLong类型的变量
timestampAndSequence
,其本身也是一个雪花算法,头11位不使用,中间41位表示timestamp,最后12位表示sequence;- long类型的常量
timestampAndSequenceMask
,用于从一个完整的雪花ID(long类型)中摘出timestamp 和 sequence
IdWorker构造器中会分别初始化TimestampAndSequence、WorkerId。
1> initTimestampAndSequence()
initTimestampAndSequence()方法负责初始化timestamp
和sequence
;
private void initTimestampAndSequence() {
// 拿到当前时间戳 - (2020-05-03 时间戳)的数值,即当前时间相对2020-05-03的时间戳
long timestamp = getNewestTimestamp();
// 把时间戳左移12位,后12位流程sequence使用
long timestampWithSequence = timestamp << sequenceBits;
// 把混合sequence(默认为0)的时间戳赋值给timestampAndSequence
this.timestampAndSequence = new AtomicLong(timestampWithSequence);
}
// 获取当前时间戳
private long getNewestTimestamp() {
//当前时间的时间戳减去2020-05-03的时间戳
return System.currentTimeMillis() - twepoch;
}
2> initWorkerId(Long)
initWorkerId(Long workerId)方法负责初始化workId,默认不会传过来workerId,如果传过来则使用传过来的workerId,并校验其不能大于1023,然后将其左移53位;
private void initWorkerId(Long workerId) {
if (workerId == null) {
// workid为null时,自动生成一个workerId
workerId = generateWorkerId();
}
// workerId最大只能是1023,因为其只占10bit
if (workerId > maxWorkerId || workerId < 0) {
String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
throw new IllegalArgumentException(message);
}
this.workerId = workerId << (timestampBits + sequenceBits);
}
如果没传则基于MAC地址生成;
如果基于MAC地址生成workerId出现异常,则也1023为基数生成一个随机的workerId;
最后同样,校验workerId不能大于1023,然后将其左移53位;
5)设置事务会话(SessionHolder
)、全局锁(LockManager
)的持久化方式并初始化
1> SessionHolder
SessionHolder负责事务会话Session的持久化,一个session对应一个事务,事务又分为全局事务和分支事务;
SessionHolder支持db,file和redis的持久化方式,其中redis和db支持集群模式,项目上推荐使用redis或db模式;
SessionHolder有五个重要的属性,如下:
// 用于管理所有的Setssion,以及Session的创建、更新、删除等
private static SessionManager ROOT_SESSION_MANAGER;
// 用于管理所有的异步commit的Session,包括创建、更新以及删除
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试commit的Session,包括创建、更新以及删除
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于管理所有的重试rollback的Session,包括创建、更新以及删除
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
// 用于管理分布式锁
private static DistributedLocker DISTRIBUTED_LOCKER;
这五个属性在SessionHolder#init()
方法中初始化,init()方法源码如下:
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
}
StoreMode storeMode = StoreMode.get(mode);
// 根据storeMode采用SPI机制初始化SessionManager
// db模式
if (StoreMode.DB.equals(storeMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
} else if (StoreMode.FILE.equals(storeMode)) {
// 文件模式
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
DEFAULT_SESSION_STORE_FILE_DIR);
if (StringUtils.isBlank(sessionStorePath)) {
throw new StoreException("the {store.file.dir} is empty.");
}
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
} else if (StoreMode.REDIS.equals(storeMode)) {
// redis模式
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
} else {
// unknown store
throw new IllegalArgumentException("unknown store mode:" + mode);
}
// 根据storeMode重新加载
reload(storeMode);
}
init()方法中根据storeMode采用SPI机制初始化SessionManager,SessionManager
有三个实现类:
2> LockerManager
和SessionHolder
一样,LockManagerFactory#init()
方法同样根据storeMode采用SPI机制初始化LockManager,LockManager
有三个实现类:
6)创建并初始化事务协调器(DefaultCoordinator
)
DefaultCoordinator
是事务协调的核心,比如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是通过DefaultCoordinator进行协调处理的。
(1)先来看DefaultCoordinator的创建;
使用Double Check Lock(DCL-双重检查锁)机制获取到单例的DefaultCoordinator
;如果DefaultCoordinator
为实例化过,则new一个:
在DefaultCoordinator
的类构造器中,首先绑定远程通信的Server的具体实现到内部成员中,然后实例化一个DefaultCore
,DefaultCore是AT、TCC、XA、Saga四种分布式事务模式的具体实现类;
DefaultCore
的类构造器中首先通过SPI机制加载出所有的AbstractCore的子类,一共有四个:ATCore、TccCore、SagaCore、XACore;然后将AbstractCore
子类可以处理的事务模式作为Key、AbstractCore
子类作为Value存储到一个缓存Map(Map<BranchType, AbstractCore> coreMap
)中;
private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();
后续通过BranchType(分支类型)就可以从coreMap中获取到相应事务模式的具体AbstractCore实现类。
(2)初始化DefaultCoordinator;
所谓的初始化,其实就是后台启动一堆线程做定时任务;去定时处理重试回滚、重试提交、异步提交、超时的检测,以及定时清理undo_log。
除定时清理undo_log外,其余定时任务的处理逻辑基本都是:
- 首先获取所有可回滚的全局事务会话Session,如果可回滚的分支事务为空,则直接返回;
- 否者,遍历所有的可回滚Session;为了防止重复回滚,如果session的状态是正在回滚中并且session不是死亡的,则直接返回;
- 如果Session重试回滚超时,从缓存中删除已经超时的回滚Session;
- 发布session回滚完成事件给到Metric,对回滚中的Session添加Session生命周期的监听;
- 使用DefaultCoordinator组合的DefaultCore执行全局回滚。
以处理重试回滚的方法handleRetryRollbacking()
为例:
protected void handleRetryRollbacking() {
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
// 获取所有的可回滚的全局事务session
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
// 如果可回滚的分支事务为空,则直接返回
if (CollectionUtils.isEmpty(rollbackingSessions)) {
return;
}
long now = System.currentTimeMillis();
// 遍历所有的可回滚Session,
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
// prevent repeated rollback
// 防止重复回滚:如果session的状态是正在回滚中并且session不是死亡的,则直接返回。
if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
&& !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
// 判断回滚是否重试超时
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
}
// Prevent thread safety issues
// 删除已经超时的回滚Session
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
SessionHelper.endRollbackFailed(rollbackingSession, true);
// rollback retry timeout event
// 发布session回滚完成事件给到Metric
MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);
//The function of this 'return' is 'continue'.
return;
}
// 对回滚中的Session添加Session生命周期的监听
rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 使用DefaultCoordinator组合的DefaultCore执行全局回滚
core.doGlobalRollback(rollbackingSession, true);
} catch (TransactionException ex) {
LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
}
7)注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator
8)启动NettyServer(NettyRemotingServer)
启动NettyRemotingServer时会做两件事:注册消息处理器、初始化并启动NettyServerBootstrap
;
1> 首先注册消息处理器
消息处理器是用来处理消息的,其根据消息的不同类型选择不同的消息处理器来处理消息(属于典型的策略模式);
每个消息类型和对应的处理器关系如下:
所谓的注册消息处理器本质上就是将处理器RemotingProcessor
和处理消息的线程池ExecutorService
包装成一个Pair
,然后将Pair作为Value,messageType作为key放入一个Map(processorTable
)中;
/**
* This container holds all processors.
* processor type {@link MessageType}
*/
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
2> 初始化NettyRemotingServer
在初始化NettyRemotingServer
之前会通过AtomicBoolean
类型的原子变量initialized
+ CAS操作确保仅会有一个线程进行NettyRemotingServer
的初始化;
再看NettyRemotingServer
的类继承图:
CAS成功后进入到NettyRemotingServer
的父类AbstractNettyRemotingServer#init()
方法;
方法中:
(1)首先调用父类AbstractNettyRemoting
的init()方法:
启动一个延时3s,每3s执行一次的定时任务,做请求超时检查;
(2)紧接着启动ServerBootstrap
(就正常的nettyServer启动):
NettyRemotingServer在启动的过程中设置了4个ChannelHandler:
- IdleStateHandler:处理心跳
- ProtocolV1Decoder:消息解码器
- ProtocolV1Encoder:消息编码器
- AbstractNettyRemotingServer.ServerHandler:处理各种消息
AbstractNettyRemotingServer.ServerHandler类
ServerHandler类上有个@ChannelHandler.Sharable
注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。
processMessage(ctx, (RpcMessage) msg)
方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。
/**
* Rpc message processing.
*
* @param ctx Channel handler context.
* @param rpcMessage rpc message.
* @throws Exception throws exception process message error.
* @since 1.3.0
*/
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果消息对应的处理器设置了线程池,则放到线程池中执行
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
// 线程池拒绝策略之一,抛出异常:RejectedExecutionException
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
// 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
三、总结和后续
本文我们聊了Seata Server启动时都做了哪些事?博主总结一共八件事:
- 对配置文件(包括registry.conf、file.conf)做参数解析;
- 初始化监控,做metric指标采集;
- 创建TC与RM/TM通信的RPC服务器(
NettyRemotingServer
)–netty;- 初始化UUID生成器(雪花算法),用于生成全局事务id和分支事务id;
- 设置事务会话(
SessionHolder
)、全局锁(LockManager
)的持久化方式并初始化,有三种类型可选:file/db/redis;- 创建并初始化事务协调器(
DefaultCoordinator
),后台启动一堆线程做定时任务,并将DefaultCoordinator
绑定到RPC服务器上做为transactionMessageHandler
;- 注册ServerRunner销毁(Spring容器销毁)的回调钩子函数DefaultCoordinator;
- 启动netty Server,用于接收TM/RM的请求;
下一篇文章我们聊一下Seata Client(AT模式下仅作为RM时)启动时都做了什么?
转载请注明:【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】 | 胖虎的工具箱-编程导航