Slow Consumers

    One way some of the libraries deal with bursty message traffic is to buffer incoming messages for a subscription. So if an application can handle 10 messages per second and sometimes receives 20 messages per second, the library may hold the extra 10 to give the application time to catch up. To the server, the application will appear to be handling the messages and consider the connection healthy. Most client libraries will notify the application that there is a SlowConsumer error and discard messages.

    Receiving and dropping messages from the server keeps the connection to the server healthy, but creates an application requirement. There are several common patterns:

    • Use request/reply to throttle the sender and prevent overloading the subscriber
    • Use a queue with multiple subscribers splitting the work
    • Persist messages with something like NATS streaming

    Libraries that cache incoming messages may provide two controls on the incoming queue, or pending messages. These are useful if the problem is bursty publishers and not a continuous performance mismatch. Disabling these limits can be dangerous in production and although setting these limits to 0 may help find problems, it is also a dangerous proposition in production.

    The incoming cache is usually per subscriber, but again, check the specific documentation for your client library.

    The first way that the incoming queue can be limited is by message count. The second way to limit the incoming queue is by total size. For example, to limit the incoming cache to 1,000 messages or 5mb whichever comes first:

    Java

    1. Dispatcher d = nc.createDispatcher((msg) -> {
    2. // do something
    3. });
    4. d.subscribe("updates");
    5. d.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a dispatcher
    6. // Subscribe
    7. Subscription sub = nc.subscribe("updates");
    8. sub.setPendingLimits(1_000, 5 * 1024 * 1024); // Set limits on a subscription
    9. // Do something
    10. // Close the connection
    11. nc.close();

    JavaScript

    1. // slow pending limits are not configurable on node-nats

    Python

    1. nc = NATS()
    2. await nc.connect(servers=["nats://demo.nats.io:4222"])
    3. future = asyncio.Future()
    4. async def cb(msg):
    5. nonlocal future
    6. future.set_result(msg)
    7. # Set limits of 1000 messages or 5MB
    8. await nc.subscribe("updates", cb=cb, pending_bytes_limit=5*1024*1024, pending_msgs_limit=1000)

    Ruby

    TypeScript

    1. // slow pending limits are not configurable on TypeScript NATS client.

    C

    1. natsConnection *conn = NULL;
    2. natsSubscription *sub1 = NULL;
    3. natsSubscription *sub2 = NULL;
    4. natsStatus s = NATS_OK;
    5. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
    6. // Subscribe
    7. s = natsConnection_Subscribe(&sub1, conn, "updates", onMsg, NULL);
    8. // Set limits of 1000 messages or 5MB, whichever comes first
    9. s = natsSubscription_SetPendingLimits(sub1, 1000, 5*1024*1024);
    10. // Subscribe
    11. if (s == NATS_OK)
    12. s = natsConnection_Subscribe(&sub2, conn, "updates", onMsg, NULL);
    13. // Set no limits for this subscription
    14. if (s == NATS_OK)
    15. s = natsSubscription_SetPendingLimits(sub2, -1, -1);
    16. (...)
    17. // Destroy objects that were created
    18. natsSubscription_Destroy(sub1);
    19. natsSubscription_Destroy(sub2);
    20. natsConnection_Destroy(conn);

    Detect a Slow Consumer and Check for Dropped Messages

    Some libraries, like Java, will not send this notification on every dropped message because that could be noisy. Rather the notification may be sent once per time the subscriber gets behind. Libraries may also provide a way to get a count of dropped messages so that applications can at least detect a problem is occurring.

    Go

    1. // Set the callback that will be invoked when an asynchronous error occurs.
    2. nc, err := nats.Connect("demo.nats.io", nats.ErrorHandler(logSlowConsumer))
    3. if err != nil {
    4. log.Fatal(err)
    5. }
    6. defer nc.Close()
    7. // Do something with the connection

    Java

    JavaScript

    1. // slow consumer detection is not configurable on NATS JavaScript client.

    Python

    1. nc = NATS()
    2. async def error_cb(e):
    3. if type(e) is nats.aio.errors.ErrSlowConsumer:
    4. print("Slow consumer error, unsubscribing from handling further messages...")
    5. await nc.unsubscribe(e.sid)
    6. await nc.connect(
    7. servers=["nats://demo.nats.io:4222"],
    8. error_cb=error_cb,
    9. )
    10. msgs = []
    11. future = asyncio.Future()
    12. async def cb(msg):
    13. nonlocal msgs
    14. nonlocal future
    15. msgs.append(msg)
    16. if len(msgs) == 3:
    17. # Head of line blocking on other messages caused
    18. # by single message processing taking too long...
    19. await asyncio.sleep(1)
    20. await nc.subscribe("updates", cb=cb, pending_msgs_limit=5)
    21. for i in range(0, 10):
    22. await nc.publish("updates", "msg #{}".format(i).encode())
    23. await asyncio.sleep(0)
    24. try:
    25. await asyncio.wait_for(future, 1)
    26. except asyncio.TimeoutError:
    27. pass
    28. for msg in msgs:
    29. print("[Received]", msg)
    30. await nc.close()

    Ruby

    1. # The Ruby NATS client currently does not have option to customize slow consumer limits per sub.

    C

    1. static void
    2. errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
    3. {
    4. // Do something
    5. printf("Error: %d - %s", s, natsStatus_GetText(s));
    6. }
    7. (...)
    8. natsConnection *conn = NULL;
    9. natsOptions *opts = NULL;
    10. natsStatus s = NATS_OK;
    11. s = natsOptions_Create(&opts);
    12. if (s == NATS_OK)
    13. s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
    14. if (s == NATS_OK)
    15. s = natsConnection_Connect(&conn, opts);
    16. (...)
    17. // Destroy objects that were created
    18. natsOptions_Destroy(opts);