DispatchThread线程
pika使用的是多线程模型,使用多个工作线程来进行读写操作,由底层blackwidow引擎来保证线程安全,线程分为11种:
PikaServer:主线程
DispatchThread:监听端口1个端口,接收用户连接请求
WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果,每个线程执行写命令后,追加到binlog中
Trysync:尝试与master建立首次连接,并在以后出现故障后发起重连
BinlogSender:存在多个(动态创建销毁,本master节点挂多少个slave节点就有多少个),每个线程根据slave节点发来的同步偏移量,从binlog指定的偏移开始实时同步命令给slave节点
BinlogReceiver:存在1个(动态创建销毁,一个slave节点同时只能有一个master),将用户指定或当前的偏移量发送给master节点并开始接收执行master实时发来的同步命令,在本地使用和master完全一致的偏移量来追加binlog
SlavePing:slave用来向master发送心跳进行存活检测
bgsave:后台dump线程
HeartBeat:master用来接收所有slave发送来的心跳并回复进行存活检测
scan:后台扫描keyspace线程
purge:后台删除binlog线程
Monitor: 实时打印出Pika服务器接收到的命令
Pub/Sub: 用来支持Pika的订阅功能
在看这一系列线程之前,
看一下最简单的线程编程,我们只用关心线程创建、线程资源回收以及线程终止。
posix接口:
一、线程创建:
int pthread_create(pthread_t *thread, const pthread_attr_t attr, void (start_routine)(void) void *arg);
线程资源回收:
int pthread_join(pthread_t thread, void **retval);
不回收:
1)已经退出的线程,其空间没有被释放,仍然在进程地址空间;
2)新创建的线程,无法复用退出线程的地址空间。
1,2同时发生就会造成内存泄露。所以同多进程编程一样,所有的高级语言都需要调用wait,不然会造成僵尸进程。
线程终止:
void pthread_exit( void * value_ptr );
线程DispatchThread是用来bind一个服务端口,listen客户端connot,并创建连接的一个线程。
再pika(server端)中负责建立连接的线程底层实现为:
third/pink/pink/src/pink_thread.cc
void* Thread::RunThread(void *arg) {
Thread* thread = reinterpret_cast<Thread*>(arg);
if (!(thread->thread_name().empty())) {
SetThreadName(pthread_self(), thread->thread_name());
}
thread->ThreadMain();
return nullptr;
}
int Thread::StartThread() { //pthread_create
slash::MutexLock l(&running_mu_);
should_stop_ = false;
if (!running_) {
running_ = true;
return pthread_create(&thread_id_, nullptr, RunThread, (void *)this);
}
return 0;
}
int Thread::StopThread() { //线程退出,这个不是正真意义上的pthread_exit。其实最终还是调用pthread_join,也就是在整个服务的过程中,该线程是不会退出的,只会随着进程退出而退出。
slash::MutexLock l(&running_mu_);
should_stop_ = true;
if (running_) {
running_ = false;
return pthread_join(thread_id_, nullptr);
}
return 0;
}
int Thread::JoinThread() { //pthread_join
return pthread_join(thread_id_, nullptr);
}
在代码pthread_create(&thread_id_, nullptr, RunThread, (void *)this)中,RunThread是创建的线程需要执行的函数。
thread->ThreadMain(),这个thread其实就是 ServerThread。
二、Server端的网络编程实现很简单:
bind()—>listen()—>accept()
在 ServerThread中实现了socket 的逻辑,具体不展开。
third/pink/pink/src/server_thread.cc)
connfd = accept(fd, (struct sockaddr *) &cliaddr, &clilen);
socket_p = new ServerSocket(port_)中实现listen(),bind()逻辑。
三、接着看如何从pikaServer入口,调用到最底层实现。
文件 | 代码部分 | 关键调用部分 |
---|---|---|
./src/pika_server.cc | pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, 3000, worker_queue_limit) | |
src/pika_dispatch_thread.cc | pink::NewDispatchThread(ips, port, work_num, &conn_factory_,cron_interval, queue_limit, &handles_); | |
third/pink/pink/src/dispatch_thread.cc | return new DispatchThread(port, work_num, conn_factory, cron_interval, queue_limit, handle) |
|
hird/pink/pink/src/dispatch_thread.cc | DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle) : ServerThread::ServerThread(ips, port, cron_interval, handle),last_thread_(0), work_num_(work_num), queue_limit_(queue_limit). | ServerThread::ServerThread(ips, port, cron_interval, handle) |
third/pink/pink/src/server_thread.cc | int ServerThread::StartThread() { int ret = 0; ret = InitHandle(); //bind,listern实现 if (ret != kSuccess) return ret; return Thread::StartThread(); //线程accept实现 } |
return Thread::StartThread(); |
third/pink/pink/src/pink_thread.cc | void* Thread::RunThread(void arg) { Thread thread = reinterpret_cast<Thread*>(arg); if (!(thread->thread_name().empty())) { SetThreadName(pthread_self(), thread->thread_name()); } |
thread->ThreadMain(); |
third/pink/pink/src/server_thread.cc | ThreadMain() |