RabbitMQ 引擎
可以让你:
- 发布或订阅数据流。
- 在数据流可用时进行处理。
必要参数:
rabbitmq_host_port
– 主机名:端口号 (比如,localhost:5672
).rabbitmq_exchange_name
– RabbitMQ exchange 名称.rabbitmq_format
– 消息格式. 使用与SQLFORMAT
函数相同的标记,如JSONEachRow
。 更多信息,请参阅 Formats 部分.
可选参数:
rabbitmq_exchange_type
– RabbitMQ exchange 的类型:direct
,fanout
,topic
,headers
,consistent_hash
. 默认是:fanout
.rabbitmq_routing_key_list
– 一个以逗号分隔的路由键列表.rabbitmq_row_delimiter
– 用于消息结束的分隔符.rabbitmq_schema
– 如果格式需要模式定义,必须使用该参数。比如, 需要模式文件的路径以及根schema.capnp:Message
对象的名称.rabbitmq_num_queues
– 队列的总数。默认值:1
. 增加这个数字可以显著提高性能.rabbitmq_queue_base
- 指定一个队列名称的提示。这个设置的使用情况如下.rabbitmq_deadletter_exchange
- 为dead letter exchange指定名称。你可以用这个 exchange 的名称创建另一个表,并在消息被重新发布到 dead letter exchange 的情况下收集它们。默认情况下,没有指定 dead letter exchange。Specify name for a .- - 如果设置为 1 (true), 在插入查询中交付模式将被设置为 2 (将消息标记为 ‘persistent’). 默认是:
0
. rabbitmq_skip_broken_messages
– RabbitMQ 消息解析器对每块模式不兼容消息的容忍度。默认值:0
. 如果rabbitmq_skip_broken_messages = N
,那么引擎将跳过 N 个无法解析的 RabbitMQ 消息(一条消息等于一行数据)。rabbitmq_max_block_size
rabbitmq_flush_interval_ms
同时,格式的设置也可以与 rabbitmq 相关的设置一起添加。
示例:
必要配置:
可选配置:
SELECT
对于读取消息不是特别有用(除了调试),因为每个消息只能读取一次。使用物化视图创建实时线程更为实用。要做到这一点:
- 使用引擎创建一个 RabbitMQ 消费者,并将其视为一个数据流。
- 创建一个具有所需结构的表。
- 创建一个物化视图,转换来自引擎的数据并将其放入先前创建的表中。
当物化视图
加入引擎时,它开始在后台收集数据。这允许您持续接收来自 RabbitMQ 的消息,并使用 SELECT
将它们转换为所需格式。
一个 RabbitMQ 表可以有多个你需要的物化视图。
数据可以根据rabbitmq_exchange_type
和指定的rabbitmq_routing_key_list
进行通道。
每个表不能有多于一个 exchange。一个 exchange 可以在多个表之间共享 - 因为可以使用路由让数据同时进入多个表。
direct
- 路由是基于精确匹配的键。例如表的键列表:key1,key2,key3,key4,key5
, 消息键可以是等同他们中的任意一个.fanout
- 路由到所有的表 (exchange 名称相同的情况) 无论是什么键都是这样.headers
- 路由是基于key=value
的匹配,设置为x-match=all
或x-match=any
. 例如表的键列表:x-match=all,format=logs,type=report,year=2020
.- - 数据在所有绑定的表之间均匀分布 (exchange 名称相同的情况). 请注意,这种 exchange 类型必须启用 RabbitMQ 插件:
rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
.
设置rabbitmq_queue_base
可用于以下情况:
- 来让不同的表共享队列, 这样就可以为同一个队列注册多个消费者,这使得性能更好。如果使用
rabbitmq_num_consumers
和/或rabbitmq_num_queues
设置,在这些参数相同的情况下,实现队列的精确匹配。 - 以便在不是所有消息都被成功消费时,能够恢复从某些持久队列的阅读。要从一个特定的队列恢复消耗 - 在
rabbitmq_queue_base
设置中设置其名称,不要指定rabbitmq_num_consumers
和rabbitmq_num_queues
(默认为1)。要恢复所有队列的消费,这些队列是为一个特定的表所声明的 - 只要指定相同的设置。rabbitmq_queue_base
,rabbitmq_num_consumers
,rabbitmq_num_queues
。默认情况下,队列名称对表来说是唯一的。 - 以重复使用队列,因为它们被声明为持久的,并且不会自动删除。可以通过任何 RabbitMQ CLI 工具删除)
为了提高性能,收到的消息被分组为大小为 的块。如果在stream_flush_interval_ms毫秒内没有形成数据块,无论数据块是否完整,数据都会被刷到表中。
如果rabbitmq_num_consumers
和/或rabbitmq_num_queues
设置与rabbitmq_exchange_type
一起被指定,那么:
- 必须启用
rabbitmq-consistent-hash-exchange
插件. - 必须指定已发布信息的
message_id
属性(对于每个信息/批次都是唯一的)。
对于插入查询时有消息元数据,消息元数据被添加到每个发布的消息中:messageID
和republished
标志(如果值为true,则表示消息发布不止一次) - 可以通过消息头访问。
不要在插入和物化视图中使用同一个表。
_exchange_name
- RabbitMQ exchange 名称._channel_id
- 接收消息的消费者所声明的频道ID._delivery_tag
- 收到消息的DeliveryTag. 以每个频道为范围._redelivered
- 消息的redelivered
标志._message_id
- 收到的消息的ID;如果在消息发布时被设置,则为非空.