sidekiq源码分析一

2019-11-02  本文已影响0人  will2yang

一切从includeSidekiq::Worker开始

class SimpleWorker
  include Sidekiq::Worker

  def perform(*args)
    # some code
  end
end

SimpleWorker.perform_async

一个最为简单的worker。include了Sidekiq::Worker模块,被调用了perform_async的类方法,异步执行了worker代码。

module Sidekiq
  module Worker
    def self.included(base)
      raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }

      base.include(Options)
      base.extend(ClassMethods)
    end
  end
end

看到被覆盖的钩子方法,不仅仅include了Worker模块还包含了Options模块,以及extends了ClassMethods,我们先聚焦到perform_async方法。

def perform_async(*args)
  client_push("class" => self, "args" => args)
end
def client_push(item) # :nodoc:
  pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
  # stringify
  item.keys.each do |key|
    item[key.to_s] = item.delete(key)
  end

  Sidekiq::Client.new(pool).push(item)
end

perform_async主要的作用就是把worker和args push到pool里。而Worker还提供了在一段时间之后执行或者某个时间点执行相应任务的方法perform_in和perform_at。虽然名字不同但其实这两者是一个方法。

def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)

  payload = @opts.merge("class" => @klass, "args" => args)
  # Optimization to enqueue something now that is scheduled to go out now or in the past
  payload["at"] = ts if ts > now
  @klass.client_push(payload)
end
alias_method :perform_at, :perform_in

根据interval的时间不同分别实现两种不同的调用方式,并且最后都是调用了push方法。
然后我们看一下push方法里的内容:

def push(item)
  normed = normalize_item(item)
  payload = process_single(item["class"], normed)

  if payload
    raw_push([payload])
    payload["jid"]
  end
end

normalize_item主要是验证item参数,并加入created_at和jid, 然后执行raw_push。

def raw_push(payloads)
  @redis_pool.with do |conn|
    conn.multi do
      atomic_push(conn, payloads)
    end
  end
  true
end
def atomic_push(conn, payloads)
  if payloads.first.key?("at")
    conn.zadd("schedule", payloads.map { |hash|
      at = hash.delete("at").to_s
      [at, Sidekiq.dump_json(hash)]
    })
  else
    queue = payloads.first["queue"]
    now = Time.now.to_f
    to_push = payloads.map { |entry|
      entry["enqueued_at"] = now
      Sidekiq.dump_json(entry)
    }
    conn.sadd("queues", queue)
    conn.lpush("queue:#{queue}", to_push)
  end
end

向redis里加入数据。在raw_push里用connection_pool管理了redis的连接池, with方法会在得到一个有用的连接前阻塞代码,直到timeout跑错。conn执行multi可以原子级的运行一段代码。
atomic_push 根据payloads是否传递了"at"参数,如果传递了,那么就将内容插入schedule的有序集合里。反之,则将内容插入到相应的list里。

上一篇 下一篇

猜你喜欢

热点阅读