DispatchThread线程

2020-04-21  本文已影响0人  白馨_1114

pika使用的是多线程模型,使用多个工作线程来进行读写操作,由底层blackwidow引擎来保证线程安全,线程分为11种:
PikaServer:主线程
DispatchThread:监听端口1个端口,接收用户连接请求
WorkerThread:存在多个(用户配置),每个线程里有若干个用户客户端的连接,负责接收处理用户命令并返回结果,每个线程执行写命令后,追加到binlog中
Trysync:尝试与master建立首次连接,并在以后出现故障后发起重连
BinlogSender:存在多个(动态创建销毁,本master节点挂多少个slave节点就有多少个),每个线程根据slave节点发来的同步偏移量,从binlog指定的偏移开始实时同步命令给slave节点
BinlogReceiver:存在1个(动态创建销毁,一个slave节点同时只能有一个master),将用户指定或当前的偏移量发送给master节点并开始接收执行master实时发来的同步命令,在本地使用和master完全一致的偏移量来追加binlog
SlavePing:slave用来向master发送心跳进行存活检测
bgsave:后台dump线程
HeartBeat:master用来接收所有slave发送来的心跳并回复进行存活检测
scan:后台扫描keyspace线程
purge:后台删除binlog线程
Monitor: 实时打印出Pika服务器接收到的命令
Pub/Sub: 用来支持Pika的订阅功能

在看这一系列线程之前,
看一下最简单的线程编程,我们只用关心线程创建、线程资源回收以及线程终止。
posix接口:
一、线程创建:
int pthread_create(pthread_t *thread, const pthread_attr_t attr, void (start_routine)(void) void *arg);
线程资源回收:
int pthread_join(pthread_t thread, void **retval);
不回收:
1)已经退出的线程,其空间没有被释放,仍然在进程地址空间;
2)新创建的线程,无法复用退出线程的地址空间。
1,2同时发生就会造成内存泄露。所以同多进程编程一样,所有的高级语言都需要调用wait,不然会造成僵尸进程。
线程终止:
void pthread_exit( void * value_ptr );

线程DispatchThread是用来bind一个服务端口,listen客户端connot,并创建连接的一个线程。

再pika(server端)中负责建立连接的线程底层实现为:
third/pink/pink/src/pink_thread.cc

void* Thread::RunThread(void *arg) {
  Thread* thread = reinterpret_cast<Thread*>(arg);
  if (!(thread->thread_name().empty())) {
    SetThreadName(pthread_self(), thread->thread_name());
  }
  thread->ThreadMain();
  return nullptr;
}
int Thread::StartThread() { //pthread_create
  slash::MutexLock l(&running_mu_);
  should_stop_ = false;
  if (!running_) {
    running_ = true;
    return pthread_create(&thread_id_, nullptr, RunThread, (void *)this);
  }
  return 0;
}
int Thread::StopThread() { //线程退出,这个不是正真意义上的pthread_exit。其实最终还是调用pthread_join,也就是在整个服务的过程中,该线程是不会退出的,只会随着进程退出而退出。
  slash::MutexLock l(&running_mu_);
  should_stop_ = true;
  if (running_) {
    running_ = false;
    return pthread_join(thread_id_, nullptr);
  }
  return 0;
}
int Thread::JoinThread() { //pthread_join
  return pthread_join(thread_id_, nullptr);
}

在代码pthread_create(&thread_id_, nullptr, RunThread, (void *)this)中,RunThread是创建的线程需要执行的函数。
thread->ThreadMain(),这个thread其实就是 ServerThread。

二、Server端的网络编程实现很简单:
bind()—>listen()—>accept()
在 ServerThread中实现了socket 的逻辑,具体不展开。
third/pink/pink/src/server_thread.cc)
connfd = accept(fd, (struct sockaddr *) &cliaddr, &clilen);
socket_p = new ServerSocket(port_)中实现listen(),bind()逻辑。

三、接着看如何从pikaServer入口,调用到最底层实现。

文件 代码部分 关键调用部分
./src/pika_server.cc pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, 3000, worker_queue_limit)
src/pika_dispatch_thread.cc pink::NewDispatchThread(ips, port, work_num, &conn_factory_,cron_interval, queue_limit, &handles_);
third/pink/pink/src/dispatch_thread.cc return new DispatchThread(port, work_num, conn_factory,
cron_interval, queue_limit, handle)
hird/pink/pink/src/dispatch_thread.cc DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle) : ServerThread::ServerThread(ips, port, cron_interval, handle),last_thread_(0), work_num_(work_num), queue_limit_(queue_limit). ServerThread::ServerThread(ips, port, cron_interval, handle)
third/pink/pink/src/server_thread.cc int ServerThread::StartThread() {
int ret = 0; ret = InitHandle(); //bind,listern实现
if (ret != kSuccess) return ret; return Thread::StartThread(); //线程accept实现
}
return Thread::StartThread();
third/pink/pink/src/pink_thread.cc void* Thread::RunThread(void arg) { Thread thread = reinterpret_cast<Thread*>(arg);
if (!(thread->thread_name().empty())) {
SetThreadName(pthread_self(),
thread->thread_name());
}
thread->ThreadMain();
third/pink/pink/src/server_thread.cc ThreadMain()
上一篇下一篇

猜你喜欢

热点阅读