Elixir 编程Elixir & PhoenixElixir

Elixir 简明笔记(十九) --- 多进程

2016-06-30  本文已影响856人  人世间

多进程

Elixir强大的并发来自其actor并发模型,简而言之就是可以使用大量的进程来实现并发。elixir中的进程依托与erlang虚拟机的存在,这个进程与操作系统的进程不一样,虽然他们可以像原生进程一样在处理器中运行,但是他们比原生进程轻,在普通机器创建十万个进程是轻而易举的事情,甚至比普通语言创建线程还要轻便。下面就看看elixir的多进程是如何work。

使用erlang的timer模块,模拟进程中耗时的操作。定义一个匿名函数,运行函数之后,可以看到iex在两秒之后才打印消息,连续调用5次,则一共耗时10秒:


iex(1)> run_query = fn query_def ->
...(1)>     :timer.sleep 2000
...(1)>     "#{query_def} result"
...(1)> end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(2)> run_query.("query 1")
"query 1 result"
iex(3)> 1..5 |> Enum.map(&(run_query.("query #{&1}")))
["query 1 result", "query 2 result", "query 3 result", "query 4 result",
 "query 5 result"]

创建进程,Elixir创建进程只需要使用spawn宏即可,sqwan/1 接受一个匿名函数,创建另外一个新进程。

spawn(
    fn ->
        expression_1
        ...
        expression_2
    end
)

修改上面的例子,使用spawn创建一个新进程。可以看到,执行spawn之后,马上返回了函数调用的结果,为新进程新pid。两秒后,新进程执行并打印内容返回到iex中。

iex(4)> spawn(fn -> IO.puts run_query.("1") end )
#PID<0.65.0>
1 result

因此可以定义一个异步的查询函数并执行,可以发现每次执行函数都马上返回,而新创建的进程将会在后台运行,并打印最终结果:

iex(5)> async_query = fn query_def ->
...(5)>     spawn(fn -> IO.puts run_query.(query_def) end) end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(6)> async_query.("query 1")
#PID<0.71.0>
query 1 result
iex(7)> 1..5 |> Enum.map(&(async_query.("query #{&1}")))
[#PID<0.73.0>, #PID<0.74.0>, #PID<0.75.0>, #PID<0.76.0>, #PID<0.77.0>]
query 1 result
query 2 result
query 3 result
query 4 result
query 5 result

Elixir中,所有代码都运行在一个在进程中,iex也是运行在进程中,并且是shell中的主进程。上面的例子中,进程是并发运行的,因此并不会按照顺序输出结果。每一个进程都是独立的,不同的进程是不能读取对方的数据的,而进程之间想要通信需要通过message

进程中的message

通常的语言中,使用多线程进行并发,所有线程共享内存数据。而elixir提供的是aotor的进程模型。进程通过message同步数据。进程A想要让进程B做点事情,需要A给B的mailbox发送异步消息,B进程读取mailbox的消息,解析后执行。因为进程见是无法共享内存的,因此消息发送的时候存在着深拷贝(deep-copied)。

发送信息使用 send函数,接受消息使用 receive do 结构。 send提供两个参数,第一个是进程的标识pid,第二个是所需要发送的数据。receive的结构如下

receive do
  pattern_1 -> do_something
  pattern_2 -> do_something_else
end

receive 结构和 case 结构十分相似,也支持mailbox的模式匹配。在iex中,self表示当前的进程,下面使用self来对消息做实验:

iex(8)> send(self, "a message")
"a message"
iex(9)> receive do
...(9)>   message -> IO.puts message
...(9)> end
a message
:ok
iex(10)> send(self, {:message, 1})
{:message, 1}
iex(11)> receive do
...(11)>   {:message, id} -> IO.puts "received message #{id}"
...(11)> end
received message 1
:ok
iex(13)> receive do
...(13)>    {_, _, _} ->
...(13)>        IO.puts "received"
...(13)> end

send调用之后会返回发送的内容。由于是自身给自身发消息,所以可以在当前的进程中调用receive结构拉取自身的mailbox的message。如果模式匹配失败,当前的进程会被block。可以设置一个after分支,当匹配失败之后,执行after的内容。

iex(2)> receive do
...(2)>   {_,_,_} -> 'nonthing'
...(2)> after 3000 -> "message not received"
...(2)> end
"message not received"
iex(3)>

receive结构主要从当前的mailbox中pull数据。如果当前消息模式匹配失败,这个消息将会被放回进程的mailbox之中,仅需读取下一条消息。总结receive的工作流如下:

  1. 从mailbox中读取第一条消息。
  2. 尝试使用 receive中模式和消息进行模式匹配,从上到下依次进行匹配。
  3. 匹配成功则执行对应分支的代码。
  4. 如果模式匹配失败,将消息放回原处,接着出现下一条消息。
  5. 如果mailbox的队列没有消息了,则等待下一个消息到达,消息一到达,则重复开始第一步。
  6. 如果存在after语句,在等到消息未到到或匹配失败之后,执行after的代码分支逻辑。

进程通信

通常send消息都是异步执行的,进程发送消息之后就返回,然后当前进程并不知道子进程的执行状况。通常我们需要子进程的执行过程,然后子进程将会send消息来反馈主进程。创建了子进程将会返回进程标识pid,基于pid和message可以进行进程间的通信。重写async_query 函数并调用:

iex(1)> run_query = fn(query_def) ->
...(1)>           :timer.sleep(2000)
...(1)>           "#{query_def} result"
...(1)>         end
#Function<6.54118792/1 in :erl_eval.expr/5>
iex(3)> async_query = fn(query_def) ->
...(3)>     caller = self
...(3)>     spawn(fn query_def ->
...(3)>         send(caller, {:query_result, run_query.(query_def)})
...(3)>     end)
...(3)> end
iex(4)> Enum.each(1..5, &async_query.("query #{&1}"))
:ok

新创建的子进程给主进程的mailbox发送了消息,主进程再把这些消息读取出来

iex(7)> get_result = fn ->
...(7)>     receive do
...(7)>         {:query_result, result} -> result
...(7)>     end
...(7)> end
iex(10)> get_result.()
"query 1 result"
iex(11)> get_result.()
"query 2 result"
iex(12)> get_result.()
"query 3 result"

客户端服务端状态

通过message和receive可以实现进程间的通信,通常把主进程当成客户端进程,新创建的进程当成服务端进程。那么cs之间的进程通信会涉及到一下状态(state)的操作。所谓的server process 是指一些长时间监听消息的进程,就像服务器进程一样永远运行,处于一个无限循环当中,监听客户端的消息,处理消息。

receive结构会将它mailbox,模式匹配不成功的时候会block进程。可是一旦mailbox中的messge消费完了,receive的监听也就结束了,进程会结束。因此需要在message消费完毕之后仍然运行进程。使用while结构很容易实现这样的程序逻辑,elixir没有循环,可是有递归。

下面以数据库服务为例来做说明。其基本结构如下:

defmodule DatabaseServer do
    
    def start do
        spawn(&loop/0)
    end

    defp loop do
        receive do
            # pass
        end
        loop
    end

end

DatabaseServer模块实现了一个服务器循环进程loop,给客户端(主进程,调用者)提供了一个启动入口start函数。这个函数将会创建一个服务端进程,用于监听客户端的发送的消息,韩寒处理返回。有人可能有以为,start和loop都是模块中的函数,分别运行在不同进程中。其实模块和进程本身没有特别的关系,模块就是函数的集合,这些函数可以运行在进程中,仅此而已。后期关于类似的实现,可以用到更高级的gen_server。

接下来实现loop中的逻辑,以及数据库的服务端和客户端的查询方法。

defmodule DatabaseServer do
    
    def start do
        spawn(&loop/0)
    end


    def run_async(server_pid, query_def) do
        send(server_pid, {:run_query, self, query_def})
    end

    defp loop do
        receive do
            
            {:run_query, caller, query_def} -> send(caller, {:query_result, run_query(query_def)})
            
        end
        loop
    end

    defp run_query(query_def) do
        :timer.sleep(2000)
        "#{query_def} result"
    end
end

run_query 为服务端的查询方法,run_async为客户端的查询方法,run_async将查询信息和自身的pid发给服务端,服务端匹配之后查询处理,然后再给客户端pid发送查询结果。客户端同样也使用receive结构pull查询结果:

defmodule DatabaseServer do
    
    def start do
        spawn(&loop/0)
    end

    def run_async(server_pid, query_def) do
        send(server_pid, {:run_query, self, query_def})
    end

    def get_result do
        
        receive do
            {:query_result, result} -> result
        after 5000 ->
            {:error, :timeout}
        end
    end

    defp loop do
        receive do
        
            {:run_query, caller, query_def} -> send(caller, {:query_result, run_query(query_def)})
        
        end
        loop
    end

    defp run_query(query_def) do
        :timer.sleep(2000)
        "#{query_def} result"
    end
end

运行测试

iex(1)> server_pid = DatabaseServer.start
#PID<0.63.0>
iex(2)> DatabaseServer.run_async(server_pid, "query 1")
{:run_query, #PID<0.61.0>, "query 1"}
iex(3)> DatabaseServer.get_result
"query 1 result"
iex(4)> DatabaseServer.run_async(server_pid, "query 2")
{:run_query, #PID<0.61.0>, "query 2"}
iex(5)> DatabaseServer.get_result
"query 2 result"
iex(6)> DatabaseServer.get_result
{:error, :timeout}

把 timer.sleep 改成 10s后, 进程客户端马上就返回并且监听服务端的返回,可以服务端异步长时间处理,不能马上返回,客户端超时断开了。第二次调用get_result的时候,此时服务端已经处理完毕,并发送结果给客户端的mailbox。

iex(2)> DatabaseServer.run_async(server_pid, "query 1")
{:run_query, #PID<0.61.0>, "query 1"}
iex(3)> DatabaseServer.get_result
{:error, :timeout}
iex(4)> DatabaseServer.get_result
"query 1 result"

服务端进程都是顺序的

尽管实现了服务端进程来处理查询请求,可是服务端进程监听的是自己进程的mailbos,消费消息却是顺序的。如果客户端调用十个查询请求,服务端同样需要执行10秒。为了避免这样的情况,一个简单的处理就是每一个请求实现一个服务端进程,也就是服务端实现一个进程池。面对大量的客户端就能处理了。等等,你一定以为实现进程池是一个夸张的做法,毕竟直觉上进程的创建和销毁十分耗资源。感谢Erlang的并发模式,我们可以在Elixir中轻而易举的创建大量的进程,这个进程和操作系统进程概念不一样,它甚至比操作系统的线程还要轻量级。

下面演示进程池的用法:

iex(1)> pool = 1..100 |> Enum.map(fn _ -> DatabaseServer.start end)
[#PID<0.64.0>, #PID<0.65.0>, #PID<0.66.0>, #PID<0.67.0>, #PID<0.68.0>,
 #PID<0.69.0>, #PID<0.70.0>, #PID<0.71.0>, #PID<0.72.0>, #PID<0.73.0>,
 #PID<0.74.0>, #PID<0.75.0>, #PID<0.76.0>, #PID<0.77.0>, #PID<0.78.0>,
 #PID<0.79.0>, #PID<0.80.0>, #PID<0.81.0>, #PID<0.82.0>, #PID<0.83.0>,
 #PID<0.84.0>, #PID<0.85.0>, #PID<0.86.0>, #PID<0.87.0>, #PID<0.88.0>,
 #PID<0.89.0>, #PID<0.90.0>, #PID<0.91.0>, #PID<0.92.0>, #PID<0.93.0>,
 #PID<0.94.0>, #PID<0.95.0>, #PID<0.96.0>, #PID<0.97.0>, #PID<0.98.0>,
 #PID<0.99.0>, #PID<0.100.0>, #PID<0.101.0>, #PID<0.102.0>, #PID<0.103.0>,
 #PID<0.104.0>, #PID<0.105.0>, #PID<0.106.0>, #PID<0.107.0>, #PID<0.108.0>,
 #PID<0.109.0>, #PID<0.110.0>, #PID<0.111.0>, #PID<0.112.0>, #PID<0.113.0>, ...]
iex(2)> 1..5 |>
...(2)>     Enum.each(fn query_def ->
...(2)>         server_pid = Enum.at(pool, :random.uniform(100) - 1)
...(2)>         DatabaseServer.run_async(server_pid, query_def)
...(2)>     end)
:ok
iex(3)> 1..5 |>
...(3)>           Enum.map(fn(_) -> DatabaseServer.get_result end)
["3 result", "5 result", "4 result", "1 result", "2 result"]

运行的结果中,并没有超过十秒,而是很快就返回了结果。

状态

设想一下,如果需要跟数据库服务交互的时候,首先当然是需要建立一个连接。连接就必须保持socket能够正确的工作。因此也需要在进程中保持状态,可以修改loop函数实现。

defmodule DatabaseServer do
    
    def start do
        spawn(fn ->
            connection = :random.uniform(1000)
            loop(connection)
        end)
    end

    def run_async(server_pid, query_def) do
        send(server_pid, {:run_query, self, query_def})
    end

    def get_result do
        receive do
            {:query_result, result} -> result
        after 5000 ->
            {:error, :timeout}
        end
    end

    defp loop(connection) do
        receive do  
            {:run_query, from_pid, query_def} -> 
                query_result = run_query(connection, query_def)
                send(from_pid, {:query_result, query_result})

        end
        loop(connection)
    end

    defp run_query(connection, query_def) do
        :timer.sleep(2000)
        "Connection #{connetion}: #{query_def} result"
    end
end

iex(1)> server_pid = DatabaseServer.start
#PID<0.63.0>
iex(2)> DatabaseServer.run_async(server_pid, "query 1")
{:run_query, #PID<0.61.0>, "query 1"}
iex(3)> DatabaseServer.get_result
"Connection 444: query 1 result"
iex(4)> DatabaseServer.run_async(server_pid, "query 2")
{:run_query, #PID<0.61.0>, "query 2"}
iex(5)> DatabaseServer.get_result
"Connection 444: query 2 result"

start 函数中创建了一些连接,然后loop中把这个状态传递到进程执行代码的地方。从iex的结果可以看出,这个状态一直被保持,两次请求服务,都是同一个连接的状态。实际的服务器环境中,往往状态不是一层不变的。此时我们需要更新状态。一个简单的技巧就是在loop函数中更新状态。

def loop(state) do
    new_state = receive do      # 捕捉新状态
        msg1 -> ...
        msg2 -> ...
    end
    loop(new_state)             # 更新状态
end 

下面实现一个计算器服务来阐明状态更新技巧。

defmodule Calculator do
    
    def start do
        spawn(fn ->loop(0) end)
    end

    def loop(current_value) do
        new_value = receive do
            {:value, caller} ->
                send(caller, {:response, current_value})
                current_value
            {:add, value} -> current_value + value
            {:sub, value} -> current_value - value
            {:mul, value} -> current_value * value
            {:div, value} -> current_value / value

            invalid_request ->
                IO.puts "invalid request #{inspect invalid_request}"
                current_value
        end
        loop(new_value)
    end

    def value(server_pid) do
        send(server_pid, {:value, self})
        receive do
            {:response, value} -> value
        end
    end

    def add(server_pid, value), do: send(server_pid, {:add, value})
    def sub(server_pid, value), do: send(server_pid, {:sub, value})
    def mul(server_pid, value), do: send(server_pid, {:mul, value})
    def div(server_pid, value), do: send(server_pid, {:div, value})

end

iex(1)> calculator_pid = Calculator.start
#PID<0.63.0>
iex(2)> Calculator.value(calculator_pid)
0
iex(3)> Calculator.add(calculator_pid, 10)
{:add, 10}
iex(4)> Calculator.sub(calculator_pid, 5)
{:sub, 5}
iex(5)> Calculator.mul(calculator_pid, 3)
{:mul, 3}
iex(6)> Calculator.div(calculator_pid, 5)
{:div, 5}
iex(7)> Calculator.value(calculator_pid)
3.0

通常情况下,状态远远比一个数字复杂。不过技术手段都是一样的,只需要在loop函数中操作状态即可。当应用的状态变得复杂的时候,是非有必要对代码进行组织。服务端进程的模块可以剥离出来专注请求的处理。下面针对之前的todo应用,使用多进程进行改下一下:

defmodule TodoServer do
    
    def start do
        spawn(fn -> loop(TodoList.new) end)
    end

    defp loop(todo_list) do
        new_todo_list = receive do
            message -> process_message(todo_list, message)
        end
        loop(new_todo_list)
    end


    def process_message(todo_list, {:add_entry, new_entry}) do
        TodoList.add_entry(todo_list, new_entry)
    end

    def process_message(todo_list, {:entries, caller, date}) do
        send(caller, {:todo_entries, TodoList.entries(todo_list, date)})
        todo_list
    end

    def add_entry(todo_server, new_entry) do
        send(todo_server, {:add_entry, new_entry})
    end

    def entries(todo_server, date) do
        send(todo_server, {:entries, self, date})
        receive do
            {:todo_entries, entries} -> entries
        after 5000 ->
            {:error, :timeout}
        end
    end
end

调用方式如下:

iex(1)> todo_server = TodoServer.start
#PID<0.66.0>
iex(2)> TodoServer.add_entry(todo_server,
...(2)>                   %{date: {2013, 12, 19}, title: "Dentist"})
{:add_entry, %{date: {2013, 12, 19}, title: "Dentist"}}
iex(3)> TodoServer.entries(todo_server, {2013, 12, 19})
[%{date: {2013, 12, 19}, id: 1, title: "Dentist"}]
iex(8)> TodoServer.add_entry(todo_server,
...(8)>                   %{date: {2013, 12, 20}, title: "Shopping"})
{:add_entry, %{date: {2013, 12, 20}, title: "Shopping"}}
iex(9)> TodoServer.entries(todo_server, {2013, 12, 19})
[%{date: {2013, 12, 19}, id: 1, title: "Dentist"}]
iex(10)> TodoServer.entries(todo_server, {2013, 12, 20})
[%{date: {2013, 12, 20}, id: 2, title: "Shopping"}]
iex(11)> TodoServer.add_entry(todo_server,
...(11)>                   %{date: {2013, 12, 19}, title: "Movies"})
{:add_entry, %{date: {2013, 12, 19}, title: "Movies"}}
iex(12)> TodoServer.entries(todo_server, {2013, 12, 19})
[%{date: {2013, 12, 19}, id: 3, title: "Movies"},
 %{date: {2013, 12, 19}, id: 1, title: "Dentist"}]

这样的调用方式,客户端需要知道开启的后台进程号。如果这个过程隐藏在模块中,岂不是更简洁。elixir提供了Process 模块的register函数,实现了针对进程的设置别名的应用。

其用法如下:

iex(1)> Process.register(self, :some_name)
iex(2)> send(:some_name, :msg)
iex(3)> receive do
          msg -> IO.puts "received #{msg}"
        end
received msg

修改TodoServer如下:

defmodule TodoServer do
    
    def start do
        pid = spawn(fn -> loop(TodoList.new) end)
        Process.register(pid, :todo_server)
    end

    defp loop(todo_list) do
        new_todo_list = receive do
            message -> process_message(todo_list, message)
        end
        loop(new_todo_list)
    end


    def process_message(todo_list, {:add_entry, new_entry}) do
        TodoList.add_entry(todo_list, new_entry)
    end

    def process_message(todo_list, {:entries, caller, date}) do
        send(caller, {:todo_entries, TodoList.entries(todo_list, date)})
        todo_list
    end

    def add_entry(new_entry) do
        send(:todo_server, {:add_entry, new_entry})
    end

    def entries(date) do
        send(:todo_server, {:entries, self, date})
        receive do
            {:todo_entries, entries} -> entries
        after 5000 ->
            {:error, :timeout}
        end
    end
end

调用方式如下:

iex(1)> TodoServer.start
true
iex(2)> TodoServer.add_entry(%{date: {2013, 12, 19}, title: "Dentist"})
{:add_entry, %{date: {2013, 12, 19}, title: "Dentist"}}
iex(3)> TodoServer.add_entry(%{date: {2013, 12, 20}, title: "Shopping"})
{:add_entry, %{date: {2013, 12, 20}, title: "Shopping"}}
iex(4)> TodoServer.add_entry(%{date: {2013, 12, 19}, title: "Movies"})
{:add_entry, %{date: {2013, 12, 19}, title: "Movies"}}
iex(5)> TodoServer.entries({2013, 12, 19})
[%{date: {2013, 12, 19}, id: 3, title: "Movies"},
 %{date: {2013, 12, 19}, id: 1, title: "Dentist"}]

可见,后台执行任务的进程,相对客户端被隐藏啦。

上一篇 下一篇

猜你喜欢

热点阅读