部署一个 Mongo sink 环境

  1. 开始一个 Mongo 服务。

  2. 创建数据库和收藏集。

    1. docker exec -it pulsar-mongo /bin/bash
    2. mongo
    3. > use pulsar
    4. > db.createCollection('messages')
    5. > exit
  3. 安装 Pulsar 独立模式.

    1. docker pull apachepulsar/pulsar:2.4.0
    2. docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --link pulsar-mongo --name pulsar-mongo-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
  4. 使用 Mongo-sink config.yaml 文件配置 Mongo sink

    1. configs:
    2. mongoUri: "mongodb://pulsar-mongo:27017"
    3. database: "pulsar"
    4. collection: "messages"
    5. batchSize: 2
    6. batchTimeMs: 500
    1. docker cp mongo-sink-config.yaml pulsar-mongo-standalone:/pulsar/
  5. 下载 Mongo sink nar 包。

使用 localrun 命令以本地模式启动 Mongo sink。

  1. ./bin/pulsar-admin sinks localrun \
  2. --archive pulsar-io-mongo-2.4.0.nar \
  3. --tenant public --namespace default \
  4. --inputs test-mongo \
  5. --name pulsar-mongo-sink \
  6. --sink-config-file mongo-sink-config.yaml \
  7. --parallelism 1

Use one of the following methods to get a connector log in localrun mode:

  • After executing the localrun command, the log is automatically printed on the console.

  • The log is located at:

    1. logs/functions/tenant/namespace/function-name/function-name-instance-id.log

    示例

    The path of the Mongo sink connector is:

    1. logs/functions/public/default/pulsar-mongo-sink/pulsar-mongo-sink-0.log

为了清楚地解释日志信息,此处将大块信息分解成小块,并为每个块添加描述。

    1. 08:21:54.132 [main] INFO org.apache.pulsar.common.nar.NarClassLoader - Created class loader with paths: [file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/, file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/META-INF/bundled-dependencies/,
  • This piece of log information illustrates the basic information about the Mongo sink connector, such as tenant, namespace, name, parallelism, resources, and so on, which can be used to check whether the Mongo sink connector is configured correctly or not.

  • 此日志信息显示与 Mongo 连接和配置信息的状态。

    1. 08:21:56.231 [cluster-ClusterId{value='5d6396a3c9e77c0569ff00eb', description='null'}-pulsar-mongo:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:1, serverValue:8}] to pulsar-mongo:27017
    2. 08:21:56.326 [cluster-ClusterId{value='5d6396a3c9e77c0569ff00eb', description='null'}-pulsar-mongo:27017] INFO org.mongodb.driver.cluster - Monitor thread successfully connected to server with description ServerDescription{address=pulsar-mongo:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 2, 0]}, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=89058800}
  • 这张日志信息解释了消费者和客户的配置,包括主题名称、订阅名称、订阅类型等等。

    1. 08:21:56.719 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
    2. "topicNames" : [ "test-mongo" ],
    3. "topicsPattern" : null,
    4. "subscriptionName" : "public/default/pulsar-mongo-sink",
    5. "subscriptionType" : "Shared",
    6. "receiverQueueSize" : 1000,
    7. "acknowledgementsGroupTimeMicros" : 100000,
    8. "negativeAckRedeliveryDelayMicros" : 60000000,
    9. "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
    10. "consumerName" : null,
    11. "ackTimeoutMillis" : 0,
    12. "tickDurationMillis" : 1000,
    13. "priorityLevel" : 0,
    14. "properties" : {
    15. "application" : "pulsar-sink",
    16. "id" : "public/default/pulsar-mongo-sink",
    17. "instance_id" : "0"
    18. },
    19. "readCompacted" : false,
    20. "subscriptionInitialPosition" : "Latest",
    21. "patternAutoDiscoveryPeriod" : 1,
    22. "regexSubscriptionMode" : "PersistentOnly",
    23. "deadLetterPolicy" : null,
    24. "replicateSubscriptionState" : false,
    25. "resetIncludeHead" : false
    26. }
    27. 08:21:56.726 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
    28. "serviceUrl" : "pulsar://localhost:6650",
    29. "authPluginClassName" : null,
    30. "authParams" : null,
    31. "operationTimeoutMs" : 30000,
    32. "statsIntervalSeconds" : 60,
    33. "numIoThreads" : 1,
    34. "numListenerThreads" : 1,
    35. "connectionsPerBroker" : 1,
    36. "useTcpNoDelay" : true,
    37. "useTls" : false,
    38. "tlsTrustCertsFilePath" : null,
    39. "tlsAllowInsecureConnection" : false,
    40. "tlsHostnameVerificationEnable" : false,
    41. "concurrentLookupRequest" : 5000,
    42. "maxLookupRequest" : 50000,
    43. "maxNumberOfRejectedRequestPerConnection" : 50,
    44. "keepAliveIntervalSeconds" : 30,
    45. "connectionTimeoutMs" : 10000,
    46. "requestTimeoutMs" : 60000,
    47. "defaultBackoffIntervalNanos" : 100000000,
    48. "maxBackoffIntervalNanos" : 30000000000
    49. }

You can use the following methods to debug a connector in cluster mode:

使用连接器日志

在集群模式下,多个连接器可以运行在一个工作器。 要找到指定连接器的日志路径,请使用 workerId 来定位连接器日志。

Pulsar admin CLI helps you debug Pulsar connectors with the following subcommands:

创建 Mongo sink

  1. ./bin/pulsar-admin sinks create \
  2. --archive pulsar-io-mongo-2.4.0.nar \
  3. --tenant public \
  4. --namespace default \
  5. --inputs test-mongo \
  6. --name pulsar-mongo-sink \
  7. --sink-config-file mongo-sink-config.yaml \
  8. --parallelism 1

get

使用 获取 命令,获取关于 Mongo sink 连接器的基本信息,例如租户、命名空间、名称、平行状态等等。

  1. ./bin/pulsar-admin sinks get --tenant public --namespace default --name pulsar-mongo-sink
  2. {
  3. "tenant": "public",
  4. "namespace": "default",
  5. "name": "pulsar-mongo-sink",
  6. "className": "org.apache.pulsar.io.mongodb.MongoSink",
  7. "inputSpecs": {
  8. "test-mongo": {
  9. "isRegexPattern": false
  10. }
  11. },
  12. "configs": {
  13. "mongoUri": "mongodb://pulsar-mongo:27017",
  14. "collection": "messages",
  15. "batchSize": 2.0,
  16. "batchTimeMs": 500.0
  17. },
  18. "parallelism": 1,
  19. "retainOrdering": false,
  20. "autoAck": true
  21. }

使用 状态 命令获取关于 Mongo sink 连接器的当前状态。 例如实例的数量、运行实例的数量、实例Id、WorkerId等。

topics stats

使用 主题统计 命令获取主题及其关联的生产者和消费者的统计信息 如本专题是否收到电文,是否有信息积压、现有许可和其他关键信息。 All rates are computed over a 1-minute window and are relative to the last completed 1-minute period.

  1. ./bin/pulsar-admin topics stats test-mongo
  2. {
  3. "msgRateIn" : 0.0,
  4. "msgThroughputIn" : 0.0,
  5. "msgRateOut" : 0.0,
  6. "msgThroughputOut" : 0.0,
  7. "averageMsgSize" : 0.0,
  8. "storageSize" : 1,
  9. "publishers" : [ ],
  10. "subscriptions" : {
  11. "public/default/pulsar-mongo-sink" : {
  12. "msgRateOut" : 0.0,
  13. "msgThroughputOut" : 0.0,
  14. "msgRateRedeliver" : 0.0,
  15. "msgBacklog" : 0,
  16. "blockedSubscriptionOnUnackedMsgs" : false,
  17. "msgDelayed" : 0,
  18. "unackedMessages" : 0,
  19. "type" : "Shared",
  20. "msgRateExpired" : 0.0,
  21. "consumers" : [ {
  22. "msgRateOut" : 0.0,
  23. "msgThroughputOut" : 0.0,
  24. "msgRateRedeliver" : 0.0,
  25. "consumerName" : "dffdd",
  26. "availablePermits" : 999,
  27. "unackedMessages" : 0,
  28. "blockedConsumerOnUnackedMsgs" : false,
  29. "metadata" : {
  30. "instance_id" : "0",
  31. "application" : "pulsar-sink",
  32. "id" : "public/default/pulsar-mongo-sink"
  33. },
  34. "connectedSince" : "2019-08-26T08:48:07.582Z",
  35. "clientVersion" : "2.4.0",
  36. "address" : "/172.17.0.3:57790"
  37. } ],
  38. "isReplicated" : false
  39. }
  40. },
  41. "replication" : { },
  42. "deduplicationStatus" : "Disabled"
  43. }

此清单显示调试连接器时要检查的主要区域。 它提醒应寻找什么办法来确保彻底审查和评估工具来获得连接器的地位。

  • Pulsar 是否成功启动?

  • 外部服务运行正常吗?

  • nar 包是否已完成?

  • 连接器配置文件正确吗?

  • 在本地运行模式下,运行连接器并检查控制台上打印的信息(连接器日志)。

  • 在集群模式中:

    • 使用 获取 命令来获取基本信息。

    • 使用 状态 命令获取当前状态。

    • 使用 命令获取特定主题及其关联的生产者和消费者的统计信息。

    • 检查连接器日志。