首发地址
day05 多线程实现都需要注意什么?
项目仓库地址
https://github.com/lzs123/CProxy,欢迎fork and star!
往期教程
day01-从一个基础的socket服务说起
day02 真正的高并发还得看IO多路复用
day03 C++项目开发配置最佳实践(vscode远程开发配置、格式化、代码检查、cmake管理配置)
day04 高性能服务设计思路
工作线程如何初始化?
在我们的设计中,工作线程本身是一个事件循环,启动后会陷入阻塞,等待事件发生。为了达到这个效果,线程启动时需要做一些初始化工作。
我们定义了EventLoopThread
类,类的定义如下
class EventLoopThread {
public:
EventLoopThread()
: thread_(std::bind(&EventLoopThread::ThreadFunc, this)),
mutex_(),
cond_(){};
~EventLoopThread();
void StartLoop();
void ThreadFunc();
void AddChannel(SP_Channel);
void AddConn(SP_Conn);
SP_EventLoop GetLoop() { return loop_; }
private:
SP_EventLoop loop_;
bool started_;
std::thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
};
SP开头的代表对应类的shared_ptr智能指针类型;比如SP_EventLoop =std::shared_ptr\<EventLoop>
在EventLoopThread
构造函数中,创建了一个std::thread
对象,并以EventLoopThread::ThreadFunc
作为线程执行函数。
void EventLoopThread::ThreadFunc() try {
if (loop_) {
throw "loop_ is not null";
}
loop_ = SP_EventLoop(new EventLoop());
{
std::unique_lock<std::mutex> lock(mutex_);
started_ = true;
cond_.notify_all();
}
loop_->Loop();
} catch (std::exception& e) {
SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what());
abort();
}
我们先看看第5行:loop_ = SP_EventLoop(new EventLoop());
初始化了一个EventLoop
并赋值到EventLoopThread
成员变量loop_
上,我们先看看EventLoop
的定义
class EventLoop {
public:
EventLoop() : poller_(SP_Epoll(new Epoll())){};
void Loop();
void AddToPoller(SP_Channel channel);
void UpdateToPoller(SP_Channel channel);
void RemoveFromPoller(SP_Channel channel);
private:
SP_EventDispatcher poller_;
};
可以看到,在EventLoop
的构造函数中,初始化了一个Epoll
对象赋值到变量poller_
上。poller_
本身是个EventDispacther
对象,Epoll
继承了EventDispatcher
,表示基于epoll实现的事件分发;
这样做的好处是如果要新增一种事件分发机制,比如项目要支持mac环境,我们需要用kqueue
代替epoll
实现事件分发。这个时候我们只需要修改EventLoop
的构造函数,将新的事件分发对象Kqueue
赋值给poller_
即可。
我们再看看Epoll
在初始化时做了什么。
Epoll::Epoll() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)), epoll_events_(EVENTSNUM) {
assert(epoll_fd_ > 0);
}
调用了epoll_create1
,创建了一个epoll
实例,也就是说,一个Epoll
对象初始化时,就已经在内核准备好了一个eventpoll
对象,我们可以添加套接字并监听相关事件了。
看回EventLoopThread::ThreadFunc
, 在初始化一个EventLoop
对象后,EventLoopThread::ThreadFunc
会再调用loop_->Loop()
,这个就是我们之前说的事件循环了。
void EventLoop::Loop() {
std::vector<SP_Channel> ready_channels;
for (;;) {
ready_channels.clear();
ready_channels = poller_->WaitForReadyChannels();
for (SP_Channel chan : ready_channels) {
chan->HandleEvents();
}
}
}
EventLoop::Loop
本身是个死循环,大概逻辑也比较简单,就是不断从poller_
中获取触发事件的channel
列表,再遍历该列表,调用对应的HandleEvent
事件处理函数。
当没有事件时,该循环会阻塞在WaitForReadyChannels
处,底层其实是阻塞在epoll_wait
。(阻塞过程中线程是挂起状态,并不会占用cpu)。
我们简单回顾下线程的初始化。
- 创建了一个
EventLoop
对象,底层通过调用epoll_create1
创建了epoll
实例,可以通过该epoll
实例添加事件监听。 -
之后调用
EventLoop::Loop
,在没有事件时,线程会陷入阻塞;当有事件发生时,会调用注册时对应的handleEvents
方法进行处理。如何控制线程启动的顺序?
上面我们讲了线程的初始化,但初始化后,
EventLoopThread
还需要调用StartLoop
才能开始工作。这其实是为了让主线程等待线程池中的工作线程完成初始化。为什么要控制?
首先讲讲主线程为什么要等待工作线程完成初始化。
在我们的线程模型设计中,主线程负责监听接收新连接请求,然后选择线程池中的一个工作线程,将新连接套接字交给工作线程处理。
假设工作线程不需要StartLoop
,在工作线程初始化后直接加入到线程池。void EventLoopThreadPool::start() { for (int i = 0; i < thread_num_; i++) { SP_EventLoopThread t(new EventLoopThread()); // t->StartLoop(); threads_.emplace_back(t); } }
当有新连接时,主线程通过从线程池中获取一个工作线程。但这个时候,我们是没法保证选出的工作线程是已经初始化了
loop_
的。因为EventLoopThread::ThreadFunc
的执行是异步的,执行顺序可能如下。
在主线程选中将新连接添加到工作线程中时,工作线程的loop_
此时还未初始化,可能会导致程序直接coredump。
所以,我们必须想办法让工作线程的EventLoop
初始化在主线程开始接收新连接请求之前。如何控制?
这实际上是一个多线程的通知问题,我们主要采用的是
mutex
和condition
这两个武器,通过条件变量来完成通知。在C++中,我们通常使用
condition_variable
搭配互斥量mutex
来处理线程间同步问题。主要用到的是condition_variable::notify_xx
和condition_variable::wait
函数。
顾名思义,wait
就是一个等待的作用,假设我们在线程A进行wait
,流程如下:- lock获取到锁,
- 调用
wait
,会先自动unlock释放锁,然后将线程阻塞住; - 被其他线程唤醒后,会自动lock获取锁,继续执行
wait
的下一行代码
唤醒线程的函数是
notify_all
和notify_one
,两者的区别在于notify_one()
每次只唤醒一个线程,那么notify_all()
函数会唤醒所有等待中的线程(当最终能抢到锁的只有一个线程)。调用流程如下:- lock获取到锁
- 调用
notify_all/notify_one
,唤醒等待中的线程 - 释放锁
我们为EventLoopThread
引入了StartLoop
方法,大概效果如下
为了阅读方便,在这里再贴一遍EventLoopThread
相关的代码
// lib/event_loop_thread.h
class EventLoopThread {
public:
...
void StartLoop();
void ThreadFunc();
...
private:
...
bool started_;
std::mutex mutex_;
std::condition_variable cond_;
};
// lib/event_loop_thread.cpp
void EventLoopThread::ThreadFunc() try {
if (loop_) {
throw "loop_ is not null";
}
loop_ = SP_EventLoop(new EventLoop());
{
std::unique_lock<std::mutex> lock(mutex_);
started_ = true;
cond_.notify_all();
}
loop_->Loop();
} catch (std::exception& e) {
SPDLOG_CRITICAL("EventLoopThread::ThreadFunc exception: {}", e.what());
abort();
}
void EventLoopThread::StartLoop() {
std::unique_lock<std::mutex> lock(mutex_);
while (!started_) cond_.wait(lock);
}
首先,我们需要明确,在工作线程初始化loop_
后,就代表该线程已经准备完成,可以接收处理套接字了。所以我们在完成loop_
的初始化后,将started_
置为true
,然后就发送notify通知唤醒等待线程。
而在StartLoop
函数中,我们先检查started_
是否为false
,如果是true
,代表工作线程已经初始化完loop_
了,这种情况StartLoop
不再需要wait,直接返回即可;如果started_
为false
,则陷入等待,直到工作线程完成loop_
初始化后唤醒。
如何将套接字添加到工作线程?
最后,我们仔细聊聊新连接套接字是如何添加到工作线程中的。
没有请求时,主线程会阻塞在accept
调用,当有新连接请求时,accept
会返回新连接套接字accept_fd
。
主线程会先将accept_fd
封装成一个Conn
对象,上一节《day04 高性能服务设计思路》讲到项目中有多种连接,这些连接有一个共同的基类Conn
, Conn
主要是将套接字封装成一个Channel
,并设置该Channel
各种事件回调处理逻辑。不同类型的Conn
有各自的回调处理逻辑。
接下来,主线程通过EventLoopThreadPool::PickRandThread
获取一个工作线程。
SP_EventLoopThread EventLoopThreadPool::PickRandThread() {
SP_EventLoopThread t;
{
std::unique_lock<std::mutex> lock(thread_mutex_);
t = threads_[next_work_thread_Idx_];
next_work_thread_Idx_ = (next_work_thread_Idx_ + 1) % thread_num_;
}
return t;
}
这里我们直接采用轮训策略选取线程池中的线程。
获取到工作线程后,我们直接调用EventLoopThread::AddConn
,将该连接交由工作线程处理。
// lib/event_loop_thread.cpp
void EventLoopThread::AddConn(SP_Conn conn) {
loop_->AddToPoller(conn->GetChannel());
}
// lib/event_loop.cpp
void EventLoop::AddToPoller(SP_Channel channel) {
poller_->PollAdd(channel);
}
// lib/epoll.cpp
void Epoll::PollAdd(SP_Channel channel) {
int fd = channel->getFd();
epoll_event event;
event.data.fd = fd;
event.events = channel->GetEvents();
if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) {
SPDLOG_CRITICAL("epoll_ctl fd: {} err: {}", fd, strerror(errno));
} else {
fd2chan_[fd] = channel;
}
}
可以发现,底层是调用epoll_ctl
将套接字fd
加到对应工作线程的epoll实例上。
这里值得注意的是,【套接字添加到工作线程的epoll实例】这个动作是在主线程上完成,由于epoll是线程安全的,所以在主线程直接操作工作线程的epoll实例是没有问题的。
继续思考
有没有办法将【套接字添加到工作线程的epoll实例】这个动作放到工作线程上完成呢?其实这种做法更为普遍,比如有些时候为了避免加锁,提高操作效率,某些操作需要由主线程触发,由工作线程执行。
这里的难点在于工作线程是本身是个无限循环,在没有事件发生时,会一直阻塞在epoll_wait
上,这种情况下,主线程如何通知工作线程执行操作呢?
这里介绍一种思路,我们可以在EventLoop
初始化时,通过eventfd()
调用创建一个套接字event_fd
,EventLoop
添加监听event_fd
的读事件。
在EventLoop::Loop
函数中,每次处理完一轮读写后,都会再执行一个函数doPendingFns()
, 伪代码如下
void EventLoop::Loop() {
std::vector<SP_Channel> ready_channels;
for (;;) {
ready_channels.clear();
ready_channels = poller_->WaitForReadyChannels();
for (SP_Channel chan : ready_channels) {
chan->HandleEvents();
}
doPendingFns();
}
}
void doPendingFns() {
std::vector<Functor> fns;
{
MutexLockGuard lock(mutex_);
fns.swap(pendingFns_);
}
for (auto fn : fns) {
fn();
}
}
主线程在需要工作线程执行某个函数时,只需要往工作线程的pendingFns
列表添加对应的函数,再往event_fd
随便写一些数据,让工作线程退出阻塞,工作线程最终会在doPendingFns
遍历执行pendingFns
列表中的全部函数。
如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容