Cluster

    The distributed programs of the Erlang / OTP language platform are composed of distributed interconnected Erlang runtime systems. Each Erlang runtime system is called a node. Nodes are interconnected by TCP to form a network structure.

    Erlang nodes are identified by a unique node name, which consists of two parts separated by :

    Communication between nodes is addressed by node name. For example, start four shell terminals locally, and then use the -name parameter to start four Erlang nodes respectively:

    1. erl -name node1@127.0.0.1 -setcookie my_nodes
    2. erl -name node2@127.0.0.1 -setcookie my_nodes
    3. erl -name node3@127.0.0.1 -setcookie my_nodes
    4. erl -name node4@127.0.0.1 -setcookie my_nodes

    node (). can be used to view the name of this node, and nodes (). can be used to view other nodes that have established a connection with the current node. We now go to the console of ‘node1@127.0.0.1’ and check the current node name and connected nodes:

    1. (node1@127.0.0.1) 4> node().
    2. 'node1@127.0.0.1'
    3. (node1@127.0.0.1) 4> nodes().
    4. []

    Then we let node1 initiate connections with other nodes:

    1. (node1@127.0.0.1) 1> net_kernel:connect_node('node2@127.0.0.1').
    2. true
    3. (node1@127.0.0.1) 2> net_kernel:connect_node('node3@127.0.0.1').
    4. true
    5. (node1@127.0.0.1) 3> net_kernel:connect_node('node4@127.0.0.1').

    Now we can check other nodes that are already connected to node1:

    1. (node1@127.0.0.1) 4> nodes().
    2. ['node2@127.0.0.1','node3@127.0.0.1','node4@127.0.0.1']

    We can see that node2, node3, and node4 have established a distributed connection with node1, and these four nodes form a cluster. Note that whenever a new node joins the cluster, it will establish a TCP connection with all the nodes in the cluster. At this point, the four nodes have completed the mesh structure shown in the following figure:

    Security

    Cookies are used for interconnection authentication between Erlang nodes. A cookie is a string, and only two nodes with the same cookie can establish a connection. In the , We used the -setcookie my_nodes parameter to set the same cookie of my_nodes to four nodes.

    See http://erlang.org/doc/reference_manual/distributed.htmlDistributed Cluster - 图2 (opens new window) for details.

    Using TLS for backplane connections

    It is possible to enable TLS encryption for the backplane connections. It comes at the cost of increased CPU load, though.

    1. Create a root CA using openssl tool:

      1. # Create self-signed root CA:
      2. openssl req -nodes -x509 -sha256 -days 1825 -newkey rsa:2048 -keyout rootCA.key -out rootCA.pem -subj "/O=LocalOrg/CN=LocalOrg-Root-CA"
    2. Generate CA-signed certificates for the nodes using the rootCA.pem created at step 1:

      1. # Create a private key:
      2. openssl genrsa -out domain.key 2048
      3. # Create openssl extfile:
      4. cat <<EOF > domain.ext
      5. authorityKeyIdentifier=keyid,issuer
      6. basicConstraints=CA:FALSE
      7. subjectAltName = @alt_names
      8. [alt_names]
      9. DNS.1 = backplane
      10. EOF
      11. # Create a CSR:
      12. openssl req -key domain.key -new -out domain.csr -subj "/O=LocalOrg"
      13. # Sign the CSR with the Root CA:
      14. openssl x509 -req -CA rootCA.pem -CAkey rootCA.key -in domain.csr -out domain.pem -days 365 -CAcreateserial -extfile domain.ext

      All the nodes in the cluster must use certificates signed by the same CA.

    3. Put the generated domain.pem, domain.key and rootCA.pem files to /var/lib/emqx/ssl on each node of the cluster. Make sure the emqx user can read these files, and permissions are set to 600.

    4. For Enterprise edition 4.4.0, add the following configuration to the end of ./releases/4.4.0/emqx.schema

      1. {mapping, "rpc.default_client_driver", "gen_rpc.default_client_driver",
      2. [{default, tcp}, {datatype, {enum, [tcp, ssl]}}]}.
    5. Add the following configuration. etc/rpc.conf for Enterprise edition, and for community edition:

      1. rpc.driver=ssl
      2. rpc.default_client_driver=ssl
      3. rpc.certfile=/var/lib/emqx/ssl/domain.pem
      4. rpc.cacertfile=/var/lib/emqx/ssl/rootCA.pem
      5. rpc.keyfile=/var/lib/emqx/ssl/domain.key
      6. rpc.enable_ssl=5369

    EMQX Broker Cluster protocol settings

    Each node in the Erlang cluster can be connected through TCPv4, TCPv6 or TLS, and the connection method can be configured inetc/emqx.conf:

    EMQX Broker Distributed cluster design

    The basic function of EMQX Broker distribution is to forward and publish messages to subscribers on each node, as shown in the following figure:

    To achieve this, EMQX Broker maintains several data structures related to it: subscription tables, routing tables, and topic trees.

    Subscription Table: Topics-Subscribers

    When an MQTT client subscribes to a topic, EMQX Broker maintains a Subscription Table for the Topic-> Subscriber mapping. The subscription table only exists on the EMQX Broker node where the subscriber is located, for example:

    Route Table: Topic-Node

    1. topic1 -> node1, node2
    2. topic2 -> node3
    3. topic3 -> node2, node4

    In addition to the routing table, each node in the EMQX Broker cluster also maintains a backup of the Topic Trie.

    The following topic-subscription relationship is an example:

    When all subscriptions are completed, EMQX Broker maintains the following Topic Trie and Route Table:

    image

    Message Distribution Process

    When an MQTT client publishes a message, the node where it is located retrieves the route table and forwards the message to the relevant node according to the message topic, and then the relevant node retrieves the local subscription table and sends the message to the relevant subscriber.

    For example, when client1 publishes a message to the topic t/a. The routing and distribution of the message between nodes are as follows:

    1. client1 publishes a message with the topic t/a to the node1
    2. By querying the topic tree, node1 learns that t/a can match the two existing topics of t/a and t/#.
    3. By querying the route table, node1 learns that topic t/a has subscribers only on node3, and topict/#has subscribers only on node2. So node1 forwards the message to node2 and node3.
    4. After node2 receives the forwarded t/a message, it queries the local subscription table to obtain the subscribers who have subscribed to t/# on this node and distributes the message to them.
    5. After node3 receives the forwarded t/a message, it queries the local subscription table to obtain the subscribers who have subscribed to t/a on this node and distributes the message to them.
    6. Message forwarding and distribution are finished.

    Data partition and sharing

    EMQX Broker’s subscription table is partitioned in the cluster, while the topic tree and routing table are replicated.

    EMQX Broker supports Autocluster based on Ekka library. Ekka is a cluster management library developed for Erlang / OTP applications. It supports Service Discovery, Autocluster, Network Partition Autoheal, and Autoclean of Erlang node.

    EMQX supports multiple node discovery strategies:

    Note: mcast discovery strategy has been deprecated and will be removed in the future releases.

    Creating a cluster manually

    The default configuration is to manually create a cluster. Nodes should be added via the command of ./bin/emqx_ctl join \ <Node >:

    1. cluster.discovery = manual

    Autocluster based on static node list

    Configure a fixed node list to automatically discover and create clusters:

    1. cluster.discovery = static
    2. cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1

    Autocluster based on mcast

    Automatically discover and create clusters based on UDP multicast:

    1. cluster.mcast.addr = 239.192.0.1
    2. cluster.mcast.ports = 4369,4370
    3. cluster.mcast.iface = 0.0.0.0
    4. cluster.mcast.ttl = 255
    5. cluster.mcast.loop = on

    Automatically discover and create clusters based on DNS A records:

    1. cluster.discovery = dns
    2. cluster.dns.name = localhost
    3. cluster.dns.app = ekka

    Autocluster based on etcd

    Automatically discover and create clusters based on :

    1. cluster.discovery = etcd
    2. cluster.etcd.server = http://127.0.0.1:2379
    3. cluster.etcd.prefix = emqcl
    4. cluster.etcd.node_ttl = 1m

    Autocluster based on kubernetes

    Automatically discover and create clusters based on :

    1. cluster.discovery = k8s
    2. cluster.k8s.apiserver = http://10.110.111.204:8080
    3. cluster.k8s.service_name = ekka
    4. cluster.k8s.address_type = ip
    5. cluster.k8s.app_name = ekka

    Introduction to manual cluster management

    Deploy EMQX Broker cluster on two servers of s1.emqx.io, s2.emqx.io:

    Tip

    The format of node name is, and Host must be an IP address or FQDN (server name. Domain name)

    Configure emqx@s1.emqx.io node

    emqx/etc/emqx.conf:

    1. node.name = emqx@s1.emqx.io
    2. # or

    Configure through environment variables:

    Tip

    Configure emqx@s2.emqx.io node

    emqx/etc/emqx.conf:

    1. node.name = emqx@s2.emqx.io
    2. # or
    3. node.name = emqx@192.168.0.20

    Node joins the cluster

    After starting two nodes, the join can be executed on s2.emqx.io:

    1. $ ./bin/emqx_ctl cluster join emqx@s1.emqx.io
    2. Join the cluster successfully.
    3. Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]

    Or executed on s1.emqx.io:

    1. $ ./bin/emqx_ctl cluster join emqx@s2.emqx.io
    2. Join the cluster successfully.
    3. Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]

    Query the cluster status on any node:

    1. $ ./bin/emqx_ctl cluster status
    2. Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]

    Exit the cluster

    There are two ways for a node to exit the cluster:

    1. leave: Leave this node exit the cluster
    2. force-leave: Remove other nodes from cluster

    Let emqx@s2.emqx.io actively exit the cluster:

    1. $ ./bin/emqx_ctl cluster leave

    Or remove the emqx@s2.emqx.io node from the cluster on s1.emqx.io:

    1. $ ./bin/emqx_ctl cluster force-leave emqx@s2.emqx.io

    Start a cluster on single machine

    For users who only have one server, the pseudo-distributed starting mode can be used. Please notice that if we want to start two or more nodes on one machine, we must adjust the listening port of the other node to avoid the port conflicts.

    The basic process is to copy another emqx folder and name it emqx2. After that, we let all the listening ports of the original emqx to be added by an offset as the listening ports of the emqx2 node. For example, we can change the MQTT/TCP listening port from the default 1883 to 2883 as the MQTT/TCP listening port for emqx2. Please refer to regarding to the above operations and also refer to Configuration Instructions and for details.

    Network Partition Autoheal

    EMQX supports Network Partition Autoheal, which can be configure in etc/emqx.conf:

    1. cluster.autoheal = on

    Network Partition Autoheal Process:

    1. The node performs Network Partition confirmation 3 seconds after receiving the inconsistent_database event from Mnesia;
    2. After the node confirms that the Network Partition has occurred, it reports the message to the Leader node (the earliest start node in the cluster);
    3. After the Leader node delays for a period of time, it create a SplitView when all nodes are online;
    4. The Leader node selects the self-healing Coordinator node in the majority partition;

    EMQX supports Autoclean frol cluster , which can be configured in etc/emqx.conf :

    1. cluster.autoclean = 5m

    Firewall settings

    The Node Discovery Ports

    If the environment variable WITH_EPMD=1 is set in advance, the epmd (listening port 4369) will be enabled for node discovery when emqx is started, which is called epmd mode.

    If the environment variable WITH_EPMD is not set, epmd is not enabled when emqx is started, and emqx ekka is used for node discovery, which is also the default method of node discovery since version 4.0. This is called ekka mode.

    epmd mode:

    If there is a firewall between cluster nodes, the firewall needs to open TCP port 4369 for each node, to allow peers query each other’s listening port. The firewall should also allow nodes connecting to port in configurable range from node.dist_listen_min to node.dist_listen_max (inclusive, default is 6369 for both)

    ekka mode(Default mode since version 4.0):

    In ekka mode, the port mapping is conventional, but not dynamic as in epmd mode. The configurations node.dist_listen_min and node.dist_listen_max take no effect in this case.

    If there is a firewall between the cluster nodes, the conventional listening port should be allowed for nodes to connect each other. See below for port mapping rule in ekka mode.

    Erlang distribution port mapping rule in ekka mode: ListeningPort = BasePort + Offset, where BasePort is 4370 (which is not made configurable), and Offset is the numeric suffix of the node’s name. If the node name does not have a numeric suffix, Offsset is 0.

    For example, having node.name = emqx@192.168.0.12 in emqx.conf should make the node listen on port 4370, and port 4371 for emqx1 (or emqx-1), and so on.

    The Cluster RPC Port

    Each emqx node also listens on a (conventional) port for the RPC channels, which should also be allowed by the firewall. The port mapping rule is similar to the node discovery ports in ekka mode, but with the BasePort = 5370. That is, having node.name = emqx@192.168.0.12 in emqx.conf should make the node listen on port 5370, and port 5371 for (or emqx-1), and so on.