6-ETS

    每次我们要找一个bucket时,都要发消息给注册表进程。在某些情况下,这意味着注册表进程会变成性能瓶颈!

    本章我们将学习ETS(Erlang Term Storage),以及如何把它当成缓存使用。
    之后我们会拓展它的功能,把数据从监督者保存到其孩子上。这样即使崩溃,数据也能存续。

    ETS可以把Erlang/Elixir的词语(term)存储在内存表中。
    使用来操作:

    在创建一个ETS表时,需要两个参数:表名和一组选项。对于在上面的例子,在可选的选项中我们传递了表类型和访问规则。
    我们选择了:set类型,意思是键不能有重复(集合论)。
    我们选择的访问规则是:protected,意思是对于这个表,只有创建该表的进程可以修改,而其它进程只能读取。
    这两个选项是默认的,这里就不多说了。

    ETS表可以被命名,可以通过名字访问:

    1. iex> :ets.new(:buckets_registry, [:named_table])
    2. :buckets_registry
    3. iex> :ets.insert(:buckets_registry, {"foo", self})
    4. true
    5. iex> :ets.lookup(:buckets_registry, "foo")
    6. [{"foo", #PID<0.41.0>}]

    好了,现在我们使用ETS表,修改KV.Registry
    我们对事件管理器和bucket的监督者使用相同的技术,显式传递ETS表名给start_link
    记住,有了服务器以及ETS表的名字,本地进程就可以访问那个表。

    打开lib/kv/registry.ex,修改里面的实现。加上注释来标明我们的修改:

    1. defmodule KV.Registry do
    2. use GenServer
    3. ## Client API
    4. @doc """
    5. Starts the registry.
    6. """
    7. def start_link(table, event_manager, buckets, opts \\ []) do
    8. # 1. We now expect the table as argument and pass it to the server
    9. GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
    10. end
    11. @doc """
    12. Looks up the bucket pid for `name` stored in `table`.
    13. Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
    14. """
    15. def lookup(table, name) do
    16. # 2. lookup now expects a table and looks directly into ETS.
    17. # No request is sent to the server.
    18. case :ets.lookup(table, name) do
    19. [{^name, bucket}] -> {:ok, bucket}
    20. [] -> :error
    21. end
    22. end
    23. @doc """
    24. Ensures there is a bucket associated with the given `name` in `server`.
    25. """
    26. def create(server, name) do
    27. GenServer.cast(server, {:create, name})
    28. end
    29. ## Server callbacks
    30. def init({table, events, buckets}) do
    31. # 3. We have replaced the names HashDict by the ETS table
    32. ets = :ets.new(table, [:named_table, read_concurrency: true])
    33. refs = HashDict.new
    34. {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
    35. end
    36. # 4. The previous handle_call callback for lookup was removed
    37. def handle_cast({:create, name}, state) do
    38. # 5. Read and write to the ETS table instead of the HashDict
    39. case lookup(state.names, name) do
    40. {:ok, _pid} ->
    41. {:noreply, state}
    42. :error ->
    43. {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
    44. ref = Process.monitor(pid)
    45. refs = HashDict.put(state.refs, ref, name)
    46. :ets.insert(state.names, {name, pid})
    47. GenEvent.sync_notify(state.events, {:create, name, pid})
    48. {:noreply, %{state | refs: refs}}
    49. end
    50. end
    51. def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    52. # 6. Delete from the ETS table instead of the HashDict
    53. {name, refs} = HashDict.pop(state.refs, ref)
    54. :ets.delete(state.names, name)
    55. GenEvent.sync_notify(state.events, {:exit, name, pid})
    56. {:noreply, %{state | refs: refs}}
    57. end
    58. def handle_info(_msg, state) do
    59. {:noreply, state}
    60. end
    61. end

    注意,修改前的KV.Registry.lookup/2给服务器发送请求;修改后,它就直接从ETS表里面读取数据了。该表是对各进程都共享的。
    这就是我们实现的缓存机制的大体想法。

    为了让缓存机制工作,新建的ETS起码需要:protected访问规则(默认的),这样客户端才能从中读取数据。
    否则就只有KV.Registry进程才能访问。
    我们还在启动ETS表时设置了:read_concurrency,为表的并发访问稍作优化。

    我们以上的改动导致测试都挂了。一个重要原因是我们在启动注册表进程时,需要多传递一个参数给KV.Registry.start_link/3
    让我们重写setup回调来修复测试代码test/kv/registry_test.exs

    1. setup do
    2. {:ok, sup} = KV.Bucket.Supervisor.start_link
    3. {:ok, manager} = GenEvent.start_link
    4. {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
    5. GenEvent.add_mon_handler(manager, Forwarder, self())
    6. {:ok, registry: registry, ets: :registry_table}
    7. end

    注意我们传递了一个表名:registry_tableKV.Registry.start_link/3
    其后返回了ets: :registry_table,成为了测试的上下文。

    修改了这个回调后,测试仍有fail,差不多都是这个样子:

    1. 1) test spawns buckets (KV.RegistryTest)
    2. test/kv/registry_test.exs:38
    3. ** (ArgumentError) argument error
    4. (stdlib) :ets.lookup(#PID<0.99.0>, "shopping")
    5. (kv) lib/kv/registry.ex:22: KV.Registry.lookup/2

    这是因为我们传递了注册表进程的pid给函数KV.Registry.lookup/2,而它期待的却是ETS的表名。
    为了修复我们要把所有的:

    1. KV.Registry.lookup(registry, ...)

    都改为:

    1. KV.Registry.lookup(ets, ...)

    像这样,我们对测试进行修改,把ets传递给lookup/2。一旦我们完成这些修改,有些测试还是会失败。
    你还会观察到,每次执行测试,成功和失败不是稳定的。例如,对于“派生bucket进程”这个测试来说:

    1. test "spawns buckets", %{registry: registry, ets: ets} do
    2. assert KV.Registry.lookup(ets, "shopping") == :error
    3. KV.Registry.create(registry, "shopping")
    4. assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
    5. KV.Bucket.put(bucket, "milk", 1)
    6. assert KV.Bucket.get(bucket, "milk") == 1
    7. end

    有可能会在这行失败:

    1. assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")

    但是假如我们在这行之前创建一个bucket,还会失败吗?

    原因在于(嗯哼!基于教学目的),我们犯了两个错误:

    1. 我们过于冒进地使用缓存来优化
    2. 我们使用的是cast/2,它应该是call/2

    用Elixir编程不会让你避免竞争状态。但是Elixir关于“没啥是共享”的这个特点可以帮助你很容易找到导致竞争状态的根本原因。

    我们测试中发生的事儿是延迟—-介于我们操作和我们观察到ETS表被改动之间。下面是我们期望发生的:

    1. 我们执行KV.Registry.create(registry, "shopping")
    2. 注册表进程创建了bucket,并且更新了缓存表
    3. 我们用KV.Registry.lookup(ets, "shopping")从表中获取信息
    4. 上面的命令返回{:ok, bucket}

    但是,因为KV.Registry.create/2使用cast操作,命令在真正修改表之前先返回了结果!换句话说,其实发生了下面的事:

    1. 我们执行KV.Registry.create(registry, "shopping")
    2. 我们用KV.Registry.lookup(ets, "shopping")从表中获取信息
    3. 命令返回:error
    4. 注册表进程创建了bucket,并且更新了缓存表

    要修复这个问题,只需要让KV.Registry.create/2同步操作,使用call/2而不是cast/2
    这就能保证客户端只会在表被修改后才能继续下面的操作。让我们来修改相应函数和回调:

    1. def create(server, name) do
    2. GenServer.call(server, {:create, name})
    3. end
    4. def handle_call({:create, name}, _from, state) do
    5. case lookup(state.names, name) do
    6. {:ok, pid} ->
    7. {:reply, pid, state} # Reply with pid
    8. :error ->
    9. {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
    10. ref = Process.monitor(pid)
    11. refs = HashDict.put(state.refs, ref, name)
    12. :ets.insert(state.names, {name, pid})
    13. GenEvent.sync_notify(state.events, {:create, name, pid})
    14. {:reply, pid, %{state | refs: refs}} # Reply with pid
    15. end
    16. end

    我们只是简单地把回调里的handle_cast/2改成了handle_call/3,并且返回创建的bucket的pid。

    现在执行下测试。这次,我们要使用--trace选项:

    1. $ mix test --trace

    如果你的测试中有死锁或者竞争条件时,--trace选项非常有用。因为它可以同步执行所有测试(而async: true没啥效果),并且显式每条测试的详细信息。这次我们应该只有一条失败(可能也是间歇性的):

    1. 1) test removes buckets on exit (KV.RegistryTest)
    2. test/kv/registry_test.exs:48
    3. Assertion with == failed
    4. code: KV.Registry.lookup(ets, "shopping") == :error
    5. lhs: {:ok, #PID<0.103.0>}
    6. rhs: :error
    7. stacktrace:
    8. test/kv/registry_test.exs:52

    根据错误信息,我们期望表中没有bucket,但是它却有。
    这个问题和我们刚刚解决的相反:之前的问题是创建bucket的命令与更新表之间的延迟,而现在是bucket处理退出操作与清除它在表中的记录之间的延迟。

    不幸的是,这次我们无法简单地把handle_info/2改成一个同步的操作。但是我们可以用事件管理器的通知来修复该失败。
    先来看看我们handle_info/2的实现:

    1. def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    2. # 5. Delete from the ETS table instead of the HashDict
    3. {name, refs} = HashDict.pop(state.refs, ref)
    4. :ets.delete(state.names, name)
    5. GenEvent.sync_notify(state.event, {:exit, name, pid})
    6. {:noreply, %{state | refs: refs}}
    7. end

    注意我们在发通知之前就从ETS表中进行删除操作。这是有意为之的。
    这意味着当我们收到{:exit, name, pid}通知的时候,表即已经是最新了。让我们更新剩下的代码:

    我们对测试稍作调整,保证先收到{:exit, name, pid}消息,再执行KV.Registry.lookup/2```。

    为方便,下面给出能通过的测试全文:

    1. defmodule KV.RegistryTest do
    2. use ExUnit.Case, async: true
    3. defmodule Forwarder do
    4. use GenEvent
    5. def handle_event(event, parent) do
    6. send parent, event
    7. {:ok, parent}
    8. end
    9. end
    10. setup do
    11. {:ok, sup} = KV.Bucket.Supervisor.start_link
    12. {:ok, manager} = GenEvent.start_link
    13. {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
    14. GenEvent.add_mon_handler(manager, Forwarder, self())
    15. {:ok, registry: registry, ets: :registry_table}
    16. end
    17. test "sends events on create and crash", %{registry: registry, ets: ets} do
    18. KV.Registry.create(registry, "shopping")
    19. {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
    20. assert_receive {:create, "shopping", ^bucket}
    21. Agent.stop(bucket)
    22. assert_receive {:exit, "shopping", ^bucket}
    23. end
    24. test "spawns buckets", %{registry: registry, ets: ets} do
    25. assert KV.Registry.lookup(ets, "shopping") == :error
    26. KV.Registry.create(registry, "shopping")
    27. assert KV.Bucket.get(bucket, "milk") == 1
    28. end
    29. test "removes buckets on exit", %{registry: registry, ets: ets} do
    30. KV.Registry.create(registry, "shopping")
    31. {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
    32. Agent.stop(bucket)
    33. assert_receive {:exit, "shopping", ^bucket} # Wait for event
    34. assert KV.Registry.lookup(ets, "shopping") == :error
    35. end
    36. test "removes bucket on crash", %{registry: registry, ets: ets} do
    37. KV.Registry.create(registry, "shopping")
    38. {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
    39. # Kill the bucket and wait for the notification
    40. Process.exit(bucket, :shutdown)
    41. assert_receive {:exit, "shopping", ^bucket}
    42. assert KV.Registry.lookup(ets, "shopping") == :error
    43. end
    44. end

    随着测试通过,我们只需更新监督者init/1回调函数的代码(文件lib/kv/supervisor.ex),传递ETS表的名字作为参数给注册表工人:

    1. @manager_name KV.EventManager
    2. @registry_name KV.Registry
    3. @ets_registry_name KV.Registry
    4. @bucket_sup_name KV.Bucket.Supervisor
    5. def init(:ok) do
    6. children = [
    7. worker(GenEvent, [[name: @manager_name]]),
    8. supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
    9. worker(KV.Registry, [@ets_registry_name, @manager_name,
    10. @bucket_sup_name, [name: @registry_name]])
    11. ]
    12. supervise(children, strategy: :one_for_one)
    13. end

    注意我们仍使用KV.Registry作为ETS表的名字,好让debug方便些,因为它指明了使用它的模块。ETS名和进程名分别存储在不同的注册表,以避免冲突。

    到目前为止,我们在初始化注册表的时候创建了一个ETS表,而没有操心在注册表结束时关闭该ETS表。
    这是因为ETS表是“连接”(某种修辞上说)着创建它的进程的。如果那进程挂了,表也会自动关闭。

    这作为默认行为实在是太方便了,我们可以在将来更多地利用这个特点。
    记住,注册表和bucket监督者之间有依赖。注册表挂,我们希望bucket监督者也挂。
    因为一旦注册表挂,所有连接bucket进程的信息都会丢失。
    但是,假如我们能保存注册表的数据怎么样?
    如果我们能做到这点,就可以去除注册表和bucket监督者之间的依赖了,让:one_for_one成为监督者最合适的策略。

    要做到这点需要些小改动。首先我们需要在监督者内启动ETS表。其次,我们需要把表的访问类型从:protected改成:public
    因为表的所有者是监督者,但是进行修改操作的仍然是时间管理者。

    让我们从修改KV.Supervisorinit/1回调开始:

    1. def init(:ok) do
    2. ets = :ets.new(@ets_registry_name,
    3. [:set, :public, :named_table, {:read_concurrency, true}])
    4. children = [
    5. worker(GenEvent, [[name: @manager_name]]),
    6. supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
    7. worker(KV.Registry, [ets, @manager_name,
    8. @bucket_sup_name, [name: @registry_name]])
    9. ]
    10. supervise(children, strategy: :one_for_one)
    11. end

    接下来,我们修改KV.Registryinit/1回调,因为它不再需要创建一个表,而是需要一个表作为参数:

    1. def init({table, events, buckets}) do
    2. refs = HashDict.new
    3. {:ok, %{names: table, refs: refs, events: events, buckets: buckets}}
    4. end

    最终,我们修改test/kv/registry_test.exs中的setup回调,来显式地创建ETS表。
    我们还将用这个机会分离setup的功能,放到一个方便的私有函数中:

    1. setup do
    2. ets = :ets.new(:registry_table, [:set, :public])
    3. registry = start_registry(ets)
    4. {:ok, registry: registry, ets: ets}
    5. end
    6. defp start_registry(ets) do
    7. {:ok, sup} = KV.Bucket.Supervisor.start_link
    8. {:ok, manager} = GenEvent.start_link
    9. {:ok, registry} = KV.Registry.start_link(ets, manager, sup)
    10. GenEvent.add_mon_handler(manager, Forwarder, self())
    11. registry
    12. end

    这之后,我们的测试应该都绿啦!

    现在只剩下一个场景需要考虑:一旦我们收到了ETS表,可能有现存的bucket的pid在这个表中。
    这是我们这次改动的目的。
    但是,新启动的注册表进程没有监视这些bucket,因为它们是作为之前的注册表的一部分创建的,现在那些注册表已经不存在了。
    这意味着表将被严重拖累,因为我们都不去清除已经挂掉的bucket。

    来增加一个测试来暴露这个bug:

    1. test "monitors existing entries", %{registry: registry, ets: ets} do
    2. bucket = KV.Registry.create(registry, "shopping")
    3. # Kill the registry. We unlink first, otherwise it will kill the test
    4. Process.unlink(registry)
    5. Process.exit(registry, :shutdown)
    6. # Start a new registry with the existing table and access the bucket
    7. start_registry(ets)
    8. assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}
    9. # Once the bucket dies, we should receive notifications
    10. Process.exit(bucket, :shutdown)
    11. assert_receive {:exit, "shopping", ^bucket}
    12. assert KV.Registry.lookup(ets, "shopping") == :error
    13. end

    执行这个测试,它将失败:

    这是我们期望的。如果bucket不被监视,在它挂的时候,注册表将得不到通知,因此也没有事件发生。
    我们可以修改KV.Registryinit/1回调来修复这个问题。给所有表中的现存条目设置监视器:

    1. def init({table, events, buckets}) do
    2. refs = :ets.foldl(fn {name, pid}, acc ->
    3. HashDict.put(acc, Process.monitor(pid), name)
    4. end, HashDict.new, table)
    5. {:ok, %{names: table, refs: refs, events: events, buckets: buckets}}
    6. end

    我们用:ets.foldl/3来遍历表中所有条目,类似于Enum.reduce/3。它为每个条目执行提供的函数,并且用一个累加器累加结果。
    在函数回调中,我们监视每个表中的pid,并相应地更新存放引用信息的字典。
    如果有某个条目是挂掉的,我们还能收到消息,稍后可以清除它们。