源码阅读笔记--job

2018-09-17  本文已影响0人  sxr008

job就是对work的封装(应该对应于咱们项目中的task,只不过facebook做了一个封装)。
job可以触发或者分派在下面三种情况:

Job definition

struct ph_job_def {
  // The callback to run when the job is dispatched.
  // Will be copied to job->callback during ph_job_alloc()
  ph_job_func_t callback;
  // The memtype to use to allocate the job
  ph_memtype_t memtype;
  // Function to be called prior to freeing the job
  void (*dtor)(ph_job_t *job);
};

1 ph_nbio_init()初始化NBIO
calloc()分配num_schedulers个emitter, 定义在struct ph_nbio_emitter, 每个emitter会跟1个事件循环,和1个thread绑定

2 ph_sched_run调度NBIO
ph_sched_run()根据num_thread_spawn确定创建线程.每个线程对应一个emitters[i].

ph_result_t ph_sched_run(void)
{
  ph_thread_t *me = ph_thread_self();
  uint32_t i;
  void *res;

  emitters[0].thread = me;
  me->is_emitter = &emitters[0];

  for (i = 1; i < num_schedulers; i++) {
    emitters[i].thread = ph_thread_spawn(sched_loop, &emitters[i]);  //一直不太懂emitter。和time wheel有关,
    emitters[i].thread->is_emitter = &emitters[i];
  }

  _ph_job_pool_start_threads(); //各个线程池开始工作
  process_deferred(me, NULL);

  gc_interval = ph_config_query_int("$.nbio.epoch_interval", 5000);
  if (gc_interval > 0) {
#ifdef USE_GIMLI
    if (getenv("GIMLI_HB_FD")) {
      hb = gimli_heartbeat_attach();
      if (hb) {
        gimli_heartbeat_set(hb, GIMLI_HB_STARTING);
      }
    }
#endif

    ph_job_init(&gc_job);
    gc_job.callback = epoch_gc;
    ph_job_set_timer_in_ms(&gc_job, gc_interval);
  }

  sched_loop(me->is_emitter);

  for (i = 1; i < num_schedulers; i++) {
    ph_thread_join(emitters[i].thread, &res);
  }
  free(emitters);
  emitters = NULL;

  ph_job_pool_shutdown();
  ph_thread_epoch_barrier();
  return PH_OK;
}
上一篇 下一篇

猜你喜欢

热点阅读