Cluster
The distributed programs of the Erlang / OTP language platform are composed of distributed interconnected Erlang runtime systems. Each Erlang runtime system is called a node. Nodes are interconnected by TCP to form a network structure.
Erlang nodes are identified by a unique node name, which consists of two parts separated by :
Communication between nodes is addressed by node name. For example, start four shell terminals locally, and then use the -name
parameter to start four Erlang nodes respectively:
erl -name node1@127.0.0.1 -setcookie my_nodes
erl -name node2@127.0.0.1 -setcookie my_nodes
erl -name node3@127.0.0.1 -setcookie my_nodes
erl -name node4@127.0.0.1 -setcookie my_nodes
node ().
can be used to view the name of this node, and nodes ().
can be used to view other nodes that have established a connection with the current node. We now go to the console of ‘node1@127.0.0.1’ and check the current node name and connected nodes:
(node1@127.0.0.1) 4> node().
'node1@127.0.0.1'
(node1@127.0.0.1) 4> nodes().
[]
Then we let node1 initiate connections with other nodes:
(node1@127.0.0.1) 1> net_kernel:connect_node('node2@127.0.0.1').
true
(node1@127.0.0.1) 2> net_kernel:connect_node('node3@127.0.0.1').
true
(node1@127.0.0.1) 3> net_kernel:connect_node('node4@127.0.0.1').
Now we can check other nodes that are already connected to node1:
(node1@127.0.0.1) 4> nodes().
['node2@127.0.0.1','node3@127.0.0.1','node4@127.0.0.1']
We can see that node2, node3, and node4 have established a distributed connection with node1, and these four nodes form a cluster. Note that whenever a new node joins the cluster, it will establish a TCP connection with all the nodes in the cluster. At this point, the four nodes have completed the mesh structure shown in the following figure:
Security
Cookies are used for interconnection authentication between Erlang nodes. A cookie is a string, and only two nodes with the same cookie can establish a connection. In the , We used the -setcookie my_nodes
parameter to set the same cookie of my_nodes
to four nodes.
See http://erlang.org/doc/reference_manual/distributed.html (opens new window) for details.
Using TLS for backplane connections
It is possible to enable TLS encryption for the backplane connections. It comes at the cost of increased CPU load, though.
Create a root CA using
openssl
tool:# Create self-signed root CA:
openssl req -nodes -x509 -sha256 -days 1825 -newkey rsa:2048 -keyout rootCA.key -out rootCA.pem -subj "/O=LocalOrg/CN=LocalOrg-Root-CA"
Generate CA-signed certificates for the nodes using the rootCA.pem created at step 1:
# Create a private key:
openssl genrsa -out domain.key 2048
# Create openssl extfile:
cat <<EOF > domain.ext
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
subjectAltName = @alt_names
[alt_names]
DNS.1 = backplane
EOF
# Create a CSR:
openssl req -key domain.key -new -out domain.csr -subj "/O=LocalOrg"
# Sign the CSR with the Root CA:
openssl x509 -req -CA rootCA.pem -CAkey rootCA.key -in domain.csr -out domain.pem -days 365 -CAcreateserial -extfile domain.ext
All the nodes in the cluster must use certificates signed by the same CA.
Put the generated
domain.pem
,domain.key
androotCA.pem
files to/var/lib/emqx/ssl
on each node of the cluster. Make sure the emqx user can read these files, and permissions are set to600
.For Enterprise edition 4.4.0, add the following configuration to the end of
./releases/4.4.0/emqx.schema
{mapping, "rpc.default_client_driver", "gen_rpc.default_client_driver",
[{default, tcp}, {datatype, {enum, [tcp, ssl]}}]}.
Add the following configuration.
etc/rpc.conf
for Enterprise edition, and for community edition:rpc.driver=ssl
rpc.default_client_driver=ssl
rpc.certfile=/var/lib/emqx/ssl/domain.pem
rpc.cacertfile=/var/lib/emqx/ssl/rootCA.pem
rpc.keyfile=/var/lib/emqx/ssl/domain.key
rpc.enable_ssl=5369
EMQX Broker Cluster protocol settings
Each node in the Erlang cluster can be connected through TCPv4, TCPv6 or TLS, and the connection method can be configured inetc/emqx.conf
:
EMQX Broker Distributed cluster design
The basic function of EMQX Broker distribution is to forward and publish messages to subscribers on each node, as shown in the following figure:
To achieve this, EMQX Broker maintains several data structures related to it: subscription tables, routing tables, and topic trees.
Subscription Table: Topics-Subscribers
When an MQTT client subscribes to a topic, EMQX Broker maintains a Subscription Table for the Topic-> Subscriber mapping. The subscription table only exists on the EMQX Broker node where the subscriber is located, for example:
Route Table: Topic-Node
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
In addition to the routing table, each node in the EMQX Broker cluster also maintains a backup of the Topic Trie.
The following topic-subscription relationship is an example:
When all subscriptions are completed, EMQX Broker maintains the following Topic Trie and Route Table:
Message Distribution Process
When an MQTT client publishes a message, the node where it is located retrieves the route table and forwards the message to the relevant node according to the message topic, and then the relevant node retrieves the local subscription table and sends the message to the relevant subscriber.
For example, when client1 publishes a message to the topic t/a
. The routing and distribution of the message between nodes are as follows:
- client1 publishes a message with the topic
t/a
to the node1 - By querying the topic tree, node1 learns that
t/a
can match the two existing topics oft/a
andt/#
. - By querying the route table, node1 learns that topic
t/a
has subscribers only on node3, and topict/#
has subscribers only on node2. So node1 forwards the message to node2 and node3. - After node2 receives the forwarded
t/a
message, it queries the local subscription table to obtain the subscribers who have subscribed tot/#
on this node and distributes the message to them. - After node3 receives the forwarded
t/a
message, it queries the local subscription table to obtain the subscribers who have subscribed tot/a
on this node and distributes the message to them. - Message forwarding and distribution are finished.
Data partition and sharing
EMQX Broker’s subscription table is partitioned in the cluster, while the topic tree and routing table are replicated.
EMQX Broker supports Autocluster based on Ekka library. Ekka is a cluster management library developed for Erlang / OTP applications. It supports Service Discovery, Autocluster, Network Partition Autoheal, and Autoclean of Erlang node.
EMQX supports multiple node discovery strategies:
Note: mcast discovery strategy has been deprecated and will be removed in the future releases.
Creating a cluster manually
The default configuration is to manually create a cluster. Nodes should be added via the command of ./bin/emqx_ctl join \ <Node >:
cluster.discovery = manual
Autocluster based on static node list
Configure a fixed node list to automatically discover and create clusters:
cluster.discovery = static
cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1
Autocluster based on mcast
Automatically discover and create clusters based on UDP multicast:
cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on
Automatically discover and create clusters based on DNS A records:
cluster.discovery = dns
cluster.dns.name = localhost
cluster.dns.app = ekka
Autocluster based on etcd
Automatically discover and create clusters based on :
cluster.discovery = etcd
cluster.etcd.server = http://127.0.0.1:2379
cluster.etcd.prefix = emqcl
cluster.etcd.node_ttl = 1m
Autocluster based on kubernetes
Automatically discover and create clusters based on :
cluster.discovery = k8s
cluster.k8s.apiserver = http://10.110.111.204:8080
cluster.k8s.service_name = ekka
cluster.k8s.address_type = ip
cluster.k8s.app_name = ekka
Introduction to manual cluster management
Deploy EMQX Broker cluster on two servers of s1.emqx.io, s2.emqx.io:
Tip
The format of node name is, and Host must be an IP address or FQDN (server name. Domain name)
Configure emqx@s1.emqx.io node
emqx/etc/emqx.conf:
node.name = emqx@s1.emqx.io
# or
Configure through environment variables:
Tip
Configure emqx@s2.emqx.io node
emqx/etc/emqx.conf:
node.name = emqx@s2.emqx.io
# or
node.name = emqx@192.168.0.20
Node joins the cluster
After starting two nodes, the join can be executed on s2.emqx.io:
$ ./bin/emqx_ctl cluster join emqx@s1.emqx.io
Join the cluster successfully.
Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]
Or executed on s1.emqx.io:
$ ./bin/emqx_ctl cluster join emqx@s2.emqx.io
Join the cluster successfully.
Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]
Query the cluster status on any node:
$ ./bin/emqx_ctl cluster status
Cluster status: [{running_nodes,['emqx@s1.emqx.io','emqx@s2.emqx.io']}]
Exit the cluster
There are two ways for a node to exit the cluster:
- leave: Leave this node exit the cluster
- force-leave: Remove other nodes from cluster
Let emqx@s2.emqx.io actively exit the cluster:
$ ./bin/emqx_ctl cluster leave
Or remove the emqx@s2.emqx.io node from the cluster on s1.emqx.io:
$ ./bin/emqx_ctl cluster force-leave emqx@s2.emqx.io
Start a cluster on single machine
For users who only have one server, the pseudo-distributed starting mode can be used. Please notice that if we want to start two or more nodes on one machine, we must adjust the listening port of the other node to avoid the port conflicts.
The basic process is to copy another emqx folder and name it emqx2. After that, we let all the listening ports of the original emqx to be added by an offset as the listening ports of the emqx2 node. For example, we can change the MQTT/TCP listening port from the default 1883 to 2883 as the MQTT/TCP listening port for emqx2. Please refer to regarding to the above operations and also refer to Configuration Instructions and for details.
Network Partition Autoheal
EMQX supports Network Partition Autoheal, which can be configure in etc/emqx.conf
:
cluster.autoheal = on
Network Partition Autoheal Process:
- The node performs Network Partition confirmation 3 seconds after receiving the
inconsistent_database
event from Mnesia; - After the node confirms that the Network Partition has occurred, it reports the message to the Leader node (the earliest start node in the cluster);
- After the Leader node delays for a period of time, it create a SplitView when all nodes are online;
- The Leader node selects the self-healing Coordinator node in the majority partition;
EMQX supports Autoclean frol cluster , which can be configured in etc/emqx.conf
:
cluster.autoclean = 5m
Firewall settings
The Node Discovery Ports
If the environment variable WITH_EPMD=1 is set in advance, the epmd (listening port 4369) will be enabled for node discovery when emqx is started, which is called epmd mode
.
If the environment variable WITH_EPMD is not set, epmd is not enabled when emqx is started, and emqx ekka is used for node discovery, which is also the default method of node discovery since version 4.0. This is called ekka mode
.
epmd mode:
If there is a firewall between cluster nodes, the firewall needs to open TCP port 4369 for each node, to allow peers query each other’s listening port. The firewall should also allow nodes connecting to port in configurable range from node.dist_listen_min
to node.dist_listen_max
(inclusive, default is 6369
for both)
ekka mode(Default mode since version 4.0):
In ekka
mode, the port mapping is conventional, but not dynamic as in epmd
mode. The configurations node.dist_listen_min
and node.dist_listen_max
take no effect in this case.
If there is a firewall between the cluster nodes, the conventional listening port should be allowed for nodes to connect each other. See below for port mapping rule in ekka
mode.
Erlang distribution port mapping rule in ekka
mode: ListeningPort = BasePort + Offset
, where BasePort
is 4370 (which is not made configurable), and Offset
is the numeric suffix of the node’s name. If the node name does not have a numeric suffix, Offsset
is 0.
For example, having node.name = emqx@192.168.0.12
in emqx.conf
should make the node listen on port 4370
, and port 4371
for emqx1
(or emqx-1
), and so on.
The Cluster RPC Port
Each emqx node also listens on a (conventional) port for the RPC channels, which should also be allowed by the firewall. The port mapping rule is similar to the node discovery ports in ekka mode
, but with the BasePort = 5370
. That is, having node.name = emqx@192.168.0.12
in emqx.conf
should make the node listen on port 5370
, and port 5371
for (or emqx-1
), and so on.