8-Task模块和通用TCP服务器(gen_tcp)

    我们首先实现一个Echo(回声)服务器来开始我们的TCP服务器之旅。它只是简单地返回从请求中收到的文字。
    我们会慢慢地改进这个服务器,使它有监督者来监督,并且可以处理大量连接。

    一个TCP服务器,总的来说,实现以下几步:

    1. 在可用端口建立socket连接,监听这个端口
    2. 等待这个端口的客户端连接,有了就接受它
    3. 读取客户端请求并且写回复

    我们来实现这些步骤。在程序中,打开文件lib/kv_server.ex,添加以下函数:

    我们通过调用KVServer.accept(4040)来启动服务器,其中4040是端口号。在accept/1中,第一步是去监听这个端口,知道socket变成可用状态,然后调用loop_acceptor/1。函数loop_acceptor/1只是一个循环,来接受客户端的连接。
    对于每次接受的客户端连接,我们调用serve/1函数。

    函数serve/1也是个循环,它一次从socket中读取一行,并将其写进给socket的回复。
    注意serve/1使用了管道运算符 |>来表达操作流程。
    管道运算符计算左侧函数计算的结果,并将其作为第一个参数传递给右侧函数调用。如:

    1. socket |> read_line() |> write_line(socket)

    相当于:

    1. write_line(read_line(socket), socket)

    函数read_line/2中使用:gen_tcp.recv/2接收从socket传来的数据。
    write_line/2中使用:gen_tcp.send/2向socket写入数据。

    这差不多就是我们为实现这个回声服务器所要做的。让我们试一试。

    iex -S mixkv_server应用程序中启动对话,执行:

    1. iex> KVServer.accept(4040)
    1. $ telnet 127.0.0.1 4040
    2. Trying 127.0.0.1...
    3. Connected to localhost.
    4. Escape character is '^]'.
    5. hello
    6. hello
    7. is it me
    8. is it me
    9. you are looking for?
    10. you are looking for?

    输入“hello”,按回车,你就会得到“hello”字样的回复。好牛逼!

    退出telnet客户端方法不一,有些用ctrl + ],有些是quit按回车。

    一旦你退出telnet客户端,你会发现IEx会话中打印出一个错误信息:

    1. ** (MatchError) no match of right hand side value: {:error, :closed}
    2. (kv_server) lib/kv_server.ex:41: KVServer.read_line/1
    3. (kv_server) lib/kv_server.ex:33: KVServer.serve/1
    4. (kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1

    这是因为我们还期望从:gen_tcp.recv/2拿数据,但是客户端断了。我们将来要处理这个问题才行。

    目前还有个更重要的bug要修:假如TCP接收者挂了怎么办?意为它没有监督者,不会自己重启,要是挂了我们将不能在处理更多的请求。
    这就是为啥我们要将它挪进监督树。

    我们已经学习了Agent,通用服务器以及事件管理器。它们都可以进行多消息协作,或者管理状态。
    但是,若是只需要处理一些任务,选什么呢?

    为此提供了所需的功能。
    例如,它有start_link/3函数,接受一个模块名、一个函数和函数的参数,从而执行这个传入的函数,并且还是作为监督树的一部分。

    我们来试试。打开lib/kv_server.ex,修改下里start/2函数里的监督者:

    1. def start(_type, _args) do
    2. import Supervisor.Spec
    3. worker(Task, [KVServer, :accept, [4040]])
    4. ]
    5. opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    6. Supervisor.start_link(children, opts)

    改动的意思是要让KVServer.accept(4040)成为一个工人来运行。目前我们暂时hardcode这个端口号,之后再讨论如何修改。

    现在,这个服务器是监督树的一部分了,它应该会随着应用程序启动而自动运行。
    在终端中输入mix run --no-halt,然后再次用telnet客户端来试试看是否还一切正常:

    看,它还是好使!这回就算退了客户端,服务器挂了,你会看到又一个立马起来了。嗯,不错。。。不过它可伸缩性如何?

    试着打开两个telnet客户端一起连接,你会注意到,第二个客户端根本不能回声:

    1. $ telnet 127.0.0.1 4040
    2. Trying 127.0.0.1...
    3. Connected to localhost.
    4. Escape character is '^]'.
    5. hello
    6. hello?
    7. HELLOOOOOO?

    为了让我们的服务器能够处理并发连接,我们需要让一个进程来当接收者,然后派生其它的进程来服务接收到的连接。
    一个方案是:

    1. defp loop_acceptor(socket) do
    2. {:ok, client} = :gen_tcp.accept(socket)
    3. serve(client)
    4. loop_acceptor(socket)
    5. end

    函数Task.start_link/1类似Task.start_link/3,但是它可以接受一个匿名函数而不是(模块,函数,参数)的组合:

    1. defp loop_acceptor(socket) do
    2. {:ok, client} = :gen_tcp.accept(socket)
    3. Task.start_link(fn -> serve(client) end)
    4. loop_acceptor(socket)
    5. end

    我们翻过这个错了,记得吗?

    和我们当时在注册表进程中调用KV.Bucket.start_link/0犯的错差不多。它意味着一个bucket挂会导致整个注册表进程挂。

    上面的代码页犯了相同的错误:如果我们把serve(client)这个任务和接收者连接起来,那么在处理请求时发生的小事故就会导致请求接收者挂,继而导致连接都挂掉。

    当时我们解决这个问题是用了一个简单的一对一监督者。这里我们也将使用相同的办法,除了一点:这个模式在Task中实在是太通用了,
    所有Task已经为之提供了一个解决方案—-一个简单的一对一监督者加上临时工(临时的工人),这个我们在之前的监督树中就是这么用的。

    让我们再次修改下start/2函数,加个监督者:

    1. def start(_type, _args) do
    2. import Supervisor.Spec
    3. children = [
    4. supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
    5. worker(Task, [KVServer, :accept, [4040]])
    6. ]
    7. opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    8. Supervisor.start_link(children, opts)
    9. end

    我们简单地启动了一个Task.Supervisor进程,
    名字叫Task.Supervisor。记住,因为接收者任务依赖于这个监督者,因此该监督者必须先启动。

    现在我们只需修改loop_acceptor/2,使用Task.Supervisor来处理每个请求:

    用命令mix run --no-halt启动新的服务器,现在就可以打开多个客户端来连接了。而且你会发现一个客户端退出不会让接收者挂掉。
    好棒!

    一下是完整的服务器实现,在单个模块中:

    1. defmodule KVServer do
    2. @doc false
    3. def start(_type, _args) do
    4. import Supervisor.Spec
    5. children = [
    6. supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
    7. ]
    8. opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    9. Supervisor.start_link(children, opts)
    10. end
    11. @doc """
    12. Starts accepting connections on the given `port`.
    13. """
    14. def accept(port) do
    15. {:ok, socket} = :gen_tcp.listen(port,
    16. [:binary, packet: :line, active: false])
    17. IO.puts "Accepting connections on port #{port}"
    18. loop_acceptor(socket)
    19. end
    20. defp loop_acceptor(socket) do
    21. {:ok, client} = :gen_tcp.accept(socket)
    22. Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    23. loop_acceptor(socket)
    24. end
    25. defp serve(socket) do
    26. socket
    27. |> read_line()
    28. |> write_line(socket)
    29. serve(socket)
    30. end
    31. defp read_line(socket) do
    32. {:ok, data} = :gen_tcp.recv(socket, 0)
    33. data
    34. end
    35. defp write_line(line, socket) do
    36. :gen_tcp.send(socket, line)
    37. end

    因为我们修改了监督者的需求,我们会问:我们的监督者策略还适用吗?

    下一章我们将开始解析客户请求,然后发送回复,从而完成我们的服务器。