1. 安装 。
  2. 安装 Pulsar 内置连接器

要使用 Pulsar SQL 查询数据,请完成以下步骤。

  1. 启动 Pulsar 独立集群。
  1. 启动 Pulsar SQL 工作器。
  1. 初始化 Pulsar 独立集群和 SQL 工人后,运行 SQL CLI 。
  1. 使用 SQL 命令测试。
  1. presto> show catalogs;
  2. Catalog
  3. ---------
  4. pulsar
  5. system
  6. (2 rows)
  7. Query 20180829_211752_00004_7qpwh, FINISHED, 1 node
  8. Splits: 19 total, 19 done (100.00%)
  9. 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
  10. presto> show schemas in pulsar;
  11. Schema
  12. -----------------------
  13. public/default
  14. public/functions
  15. sample/standalone/ns1
  16. (4 rows)
  17. Splits: 19 total, 19 done (100.00%)
  18. 0:00 [4 rows, 89B] [21 rows/s, 471B/s]
  19. presto> show tables in pulsar."public/default";
  20. Table
  21. -------
  22. (0 rows)
  23. Query 20180829_211839_00006_7qpwh, FINISHED, 1 node
  24. Splits: 19 total, 19 done (100.00%)
  25. 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
  1. Start the built-in connector DataGeneratorSource and ingest some mock data.

然后可以在命名空间“公共/默认”中查询一个主题。

  1. presto> show tables in pulsar."public/default";
  2. Table
  3. ----------------
  4. generator_test
  5. (1 row)
  6. Query 20180829_213202_00000_csyeu, FINISHED, 1 node
  7. Splits: 19 total, 19 done (100.00%)
  8. 0:02 [1 rows, 38B] [0 rows/s, 17B/s]

可以查询模拟数据。

  1. public static class Foo {
  2. private int field1 = 1;
  3. private String field2;
  4. private long field3;
  5. }
  6. public static void main(String[] args) throws Exception {
  7. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  8. Producer<Foo> producer = pulsarClient.newProducer(AvroSchema.of(Foo.class)).topic("test_topic").create();
  9. for (int i = 0; i < 1000; i++) {
  10. Foo foo = new Foo();
  11. foo.setField1(i);
  12. foo.setField2("foo" + i);
  13. foo.setField3(System.currentTimeMillis());
  14. producer.newMessage().value(foo).send();
  15. }
  16. producer.close();
  17. pulsarClient.close();