How to debug Pulsar connectors

Deploy a Mongo sink environment

  1. Start a Mongo service.

  1. Create a DB and a collection.

    1. docker exec -it pulsar-mongo /bin/bash
    2. mongo
    3. > use pulsar
    4. > db.createCollection('messages')
    5. > exit
  1. Start Pulsar standalone.

    1. docker pull apachepulsar/pulsar:2.4.0
    2. docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --link pulsar-mongo --name pulsar-mongo-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
  1. Configure the Mongo sink with the mongo-sink-config.yaml file.

    1. configs:
    2. mongoUri: "mongodb://pulsar-mongo:27017"
    3. database: "pulsar"
    4. collection: "messages"
    5. batchSize: 2
    6. batchTimeMs: 500
  1. ```
  2. docker cp mongo-sink-config.yaml pulsar-mongo-standalone:/pulsar/
  3. ```
  1. Download the Mongo sink nar package.

    1. docker exec -it pulsar-mongo-standalone /bin/bash
    2. curl -O http://apache.01link.hk/pulsar/pulsar-2.4.0/connectors/pulsar-io-mongo-2.4.0.nar

Start the Mongo sink in localrun mode using the localrun command.

tip

For more information about the localrun command, see localrun.

Use one of the following methods to get a connector log in localrun mode:

  • After executing the localrun command, the log is automatically printed on the console.

  • The log is located at:

    1. logs/functions/tenant/namespace/function-name/function-name-instance-id.log
  1. **Example**
  2. The path of the Mongo sink connector is:
  3. ```
  4. logs/functions/public/default/pulsar-mongo-sink/pulsar-mongo-sink-0.log
  5. ```

To clearly explain the log information, here breaks down the large block of information into small blocks and add descriptions for each block.

    1. 08:21:54.132 [main] INFO org.apache.pulsar.common.nar.NarClassLoader - Created class loader with paths: [file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/, file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/META-INF/bundled-dependencies/,
  1. ##### tip
  2. If `class cannot be found` exception is thrown, check whether the nar file is decompressed in the folder `file:/tmp/pulsar-nar/pulsar-io-mongo-2.4.0.nar-unpacked/META-INF/bundled-dependencies/` or not.
  • This piece of log information illustrates the basic information about the Mongo sink connector, such as tenant, namespace, name, parallelism, resources, and so on, which can be used to check whether the Mongo sink connector is configured correctly or not.

    1. 08:21:55.390 [main] INFO org.apache.pulsar.functions.runtime.ThreadRuntime - ThreadContainer starting function with instance config InstanceConfig(instanceId=0, functionId=853d60a1-0c48-44d5-9a5c-6917386476b2, functionVersion=c2ce1458-b69e-4175-88c0-a0a856a2be8c, functionDetails=tenant: "public"
    2. namespace: "default"
    3. name: "pulsar-mongo-sink"
    4. className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
    5. autoAck: true
    6. parallelism: 1
    7. source {
    8. typeClassName: "[B"
    9. inputSpecs {
    10. key: "test-mongo"
    11. value {
    12. }
    13. }
    14. cleanupSubscription: true
    15. }
    16. sink {
    17. className: "org.apache.pulsar.io.mongodb.MongoSink"
    18. configs: "{\"mongoUri\":\"mongodb://pulsar-mongo:27017\",\"database\":\"pulsar\",\"collection\":\"messages\",\"batchSize\":2,\"batchTimeMs\":500}"
    19. typeClassName: "[B"
    20. }
    21. resources {
    22. cpu: 1.0
    23. ram: 1073741824
    24. }
    25. componentType: SINK
    26. , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=38459, clusterName=local)
  • This piece of log information demonstrates the status of the connections to Mongo and configuration information.

  • This piece of log information explains the configuration of consumers and clients, including the topic name, subscription name, subscription type, and so on.

    1. 08:21:56.719 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
    2. "topicNames" : [ "test-mongo" ],
    3. "topicsPattern" : null,
    4. "subscriptionName" : "public/default/pulsar-mongo-sink",
    5. "subscriptionType" : "Shared",
    6. "acknowledgementsGroupTimeMicros" : 100000,
    7. "negativeAckRedeliveryDelayMicros" : 60000000,
    8. "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
    9. "consumerName" : null,
    10. "ackTimeoutMillis" : 0,
    11. "tickDurationMillis" : 1000,
    12. "priorityLevel" : 0,
    13. "cryptoFailureAction" : "CONSUME",
    14. "properties" : {
    15. "application" : "pulsar-sink",
    16. "id" : "public/default/pulsar-mongo-sink",
    17. "instance_id" : "0"
    18. },
    19. "readCompacted" : false,
    20. "subscriptionInitialPosition" : "Latest",
    21. "patternAutoDiscoveryPeriod" : 1,
    22. "regexSubscriptionMode" : "PersistentOnly",
    23. "deadLetterPolicy" : null,
    24. "autoUpdatePartitions" : true,
    25. "replicateSubscriptionState" : false,
    26. "resetIncludeHead" : false
    27. }
    28. 08:21:56.726 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
    29. "serviceUrl" : "pulsar://localhost:6650",
    30. "authPluginClassName" : null,
    31. "authParams" : null,
    32. "operationTimeoutMs" : 30000,
    33. "statsIntervalSeconds" : 60,
    34. "numIoThreads" : 1,
    35. "numListenerThreads" : 1,
    36. "connectionsPerBroker" : 1,
    37. "useTcpNoDelay" : true,
    38. "useTls" : false,
    39. "tlsTrustCertsFilePath" : null,
    40. "tlsAllowInsecureConnection" : false,
    41. "tlsHostnameVerificationEnable" : false,
    42. "concurrentLookupRequest" : 5000,
    43. "maxLookupRequest" : 50000,
    44. "maxNumberOfRejectedRequestPerConnection" : 50,
    45. "keepAliveIntervalSeconds" : 30,
    46. "connectionTimeoutMs" : 10000,
    47. "requestTimeoutMs" : 60000,
    48. "defaultBackoffIntervalNanos" : 100000000,
    49. "maxBackoffIntervalNanos" : 30000000000
    50. }

You can use the following methods to debug a connector in cluster mode:

Use connector log

In cluster mode, multiple connectors can run on a worker. To find the log path of a specified connector, use the workerId to locate the connector log.

Pulsar admin CLI helps you debug Pulsar connectors with the following subcommands:

Create a Mongo sink

  1. ./bin/pulsar-admin sinks create \
  2. --archive pulsar-io-mongo-2.4.0.nar \
  3. --tenant public \
  4. --namespace default \
  5. --inputs test-mongo \
  6. --name pulsar-mongo-sink \
  7. --sink-config-file mongo-sink-config.yaml \
  8. --parallelism 1

get

Use the get command to get the basic information about the Mongo sink connector, such as tenant, namespace, name, parallelism, and so on.

  1. ./bin/pulsar-admin sinks get --tenant public --namespace default --name pulsar-mongo-sink
  2. {
  3. "tenant": "public",
  4. "namespace": "default",
  5. "name": "pulsar-mongo-sink",
  6. "className": "org.apache.pulsar.io.mongodb.MongoSink",
  7. "inputSpecs": {
  8. "test-mongo": {
  9. "isRegexPattern": false
  10. }
  11. },
  12. "configs": {
  13. "mongoUri": "mongodb://pulsar-mongo:27017",
  14. "database": "pulsar",
  15. "batchSize": 2.0,
  16. "batchTimeMs": 500.0
  17. },
  18. "parallelism": 1,
  19. "processingGuarantees": "ATLEAST_ONCE",
  20. "retainOrdering": false,
  21. "autoAck": true
tip

For more information about the get command, see .

Use the status command to get the current status about the Mongo sink connector, such as the number of instance, the number of running instance, instanceId, workerId and so on.

  1. ./bin/pulsar-admin sinks status
  2. --tenant public \
  3. --namespace default \
  4. --name pulsar-mongo-sink
  5. {
  6. "numInstances" : 1,
  7. "numRunning" : 1,
  8. "instances" : [ {
  9. "instanceId" : 0,
  10. "status" : {
  11. "running" : true,
  12. "error" : "",
  13. "numRestarts" : 0,
  14. "numReadFromPulsar" : 0,
  15. "numSystemExceptions" : 0,
  16. "latestSystemExceptions" : [ ],
  17. "numSinkExceptions" : 0,
  18. "latestSinkExceptions" : [ ],
  19. "numWrittenToSink" : 0,
  20. "lastReceivedTime" : 0,
  21. "workerId" : "c-standalone-fw-5d202832fd18-8080"
  22. }
  23. } ]
  24. }
tip

For more information about the status command, see . If there are multiple connectors running on a worker, workerId can locate the worker on which the specified connector is running.

topics stats

Use the topics stats command to get the stats for a topic and its connected producer and consumer, such as whether the topic has received messages or not, whether there is a backlog of messages or not, the available permits and other key information. All rates are computed over a 1-minute window and are relative to the last completed 1-minute period.

  1. ./bin/pulsar-admin topics stats test-mongo
  2. {
  3. "msgRateIn" : 0.0,
  4. "msgThroughputIn" : 0.0,
  5. "msgRateOut" : 0.0,
  6. "msgThroughputOut" : 0.0,
  7. "averageMsgSize" : 0.0,
  8. "storageSize" : 1,
  9. "publishers" : [ ],
  10. "subscriptions" : {
  11. "public/default/pulsar-mongo-sink" : {
  12. "msgRateOut" : 0.0,
  13. "msgThroughputOut" : 0.0,
  14. "msgRateRedeliver" : 0.0,
  15. "msgBacklog" : 0,
  16. "blockedSubscriptionOnUnackedMsgs" : false,
  17. "msgDelayed" : 0,
  18. "unackedMessages" : 0,
  19. "type" : "Shared",
  20. "msgRateExpired" : 0.0,
  21. "consumers" : [ {
  22. "msgRateOut" : 0.0,
  23. "msgThroughputOut" : 0.0,
  24. "msgRateRedeliver" : 0.0,
  25. "consumerName" : "dffdd",
  26. "availablePermits" : 999,
  27. "unackedMessages" : 0,
  28. "blockedConsumerOnUnackedMsgs" : false,
  29. "metadata" : {
  30. "instance_id" : "0",
  31. "application" : "pulsar-sink",
  32. "id" : "public/default/pulsar-mongo-sink"
  33. },
  34. "connectedSince" : "2019-08-26T08:48:07.582Z",
  35. "clientVersion" : "2.4.0",
  36. "address" : "/172.17.0.3:57790"
  37. } ],
  38. "isReplicated" : false
  39. }
  40. },
  41. "replication" : { },
  42. "deduplicationStatus" : "Disabled"
  43. }
tip

This checklist indicates the major areas to check when you debug connectors. It is a reminder of what to look for to ensure a thorough review and an evaluation tool to get the status of connectors.

  • Does Pulsar start successfully?

  • Does the external service run normally?

  • Is the nar package complete?

  • Is the connector configuration file correct?

  • In localrun mode, run a connector and check the printed information (connector log) on the console.

  • In cluster mode:

    • Use the get command to get the basic information.

    • Use the status command to get the current status.

    • Use the topics stats command to get the stats for a specified topic and its connected producers and consumers.

  • Enter into the external system and verify the result.