Skip to content

Saurabh Thakur

How to throttle that queue?

Development, Kafka5 min read

You are going to bed peacefully after deploying your code which took a good long development time. Finally there is a new micro-service which handles the orders asynchronously by consuming the data from a Kafka queue where the API is pushing it whenever it receives an order from the client. Database and cache updates are done by this service.

The stock market is going to open in the morning. You ran all the tests, they passed fine.

But wait! 🚨

Stock market opens, you receive the panic alert in the morning, everything is going haywire. There's this news that happened, Stark Industries have finished their prototype Iron Suit and is ready to launch in the market. People have gone crazy over the Stark stocks and there were thousands of orders just as the markets opened. 📈

Your system wasn't able to handle the burst of the orders. The queue took the data from multiple API servers and this new service of ours started consuming all of this at once. Database and cache choked. The dreaded DoS got the best of you.

Identifying the bottleneck

The consumer service shouldn't have consumed all of the data at once from the queue, it should have been able to create a back pressure so that orders could be handled in a graceful manner.

Kafka for the queue

Kafka has become an industry standard for implementing queues and event based services.

But everything has its pros and cons.

Due to the simplistic approach with which Kafka is built, you can't receive data in form of single messages. What that means is that a consumer consumes data in batches. Now that one batch may contain 10 or maybe 1000 messages. There's no way you can tell your service to consume data message by message.

One thing you can do is configure the max bytes that the consumer should receive in a single batch but still you can't forecast what will be the size of each message.

WARNING: If you set max fetch bytes, less than the size of a message, then the partition gets stuck and you don't receive any more new messages from that particular partition.

We will play around with some of these features that Kafka provides and set up exactly what we want.

Throttling that darn queue

For the below examples I am using NodeJS for demonstration but the logic can be written similarly in any language. The library I am using is kafka-node. Again any library can be used as the APIs we are going to use are kafka based and is library agnostic.

Execution phase

We will write code line by line and also explain it on the way:

1. Create a consumer instance

1const { ConsumerGroup } = require('kafka-node');
2
3const options = {
4 kafkaHost: 'broker:9092',
5 groupId: 'order-service',
6};
7
8const consumer = new ConsumerGroup(options, 'OrderTopic');
9
10consumer.on('message', (data) => {
11 // queue data available here
12});

People who are already familiar with Kafka might notice that we have created ConsumerGroup instead of Consumer to create a consumer instance. This can be discussed in detail in future maybe, but if you want some reference you can read this.

2. Set max size of the batch

1const { ConsumerGroup } = require('kafka-node');
2
3const options = {
4 kafkaHost: 'broker:9092',
5 groupId: 'order-service',
6 fetchMaxBytes: 100 * 1024, // 100 KB
7};
8
9const consumer = new ConsumerGroup(options, 'OrderTopic');
10
11consumer.on('message', (data) => {
12 // queue data available here
13});

Limits the amount of data received in a single batch. This will make sure that a large amount of data isn't in the memory of the instance where the service is running. This isn't reliable and can be very dangerous as auto-commit commits the message as soon as they are received and if the application crashes and restarts or instance stops then data is left unprocessed.

3. Stop auto committing of the messages

1const { ConsumerGroup } = require('kafka-node');
2
3const options = {
4 kafkaHost: 'broker:9092',
5 groupId: 'order-service',
6 fetchMaxBytes: 100 * 1024, // 100 KB
7 autoCommit: false,
8};
9
10const consumer = new ConsumerGroup(options, 'OrderTopic');
11
12consumer.on('message', (data) => {
13 // queue data available here
14});

We already discussed how auto commit can be dangerous and you might end up losing the data processing that could have been very important.

Make sure that after switching off auto commit you are committing the messages manually. This is a very necessary step when auto commit is off, skipping it can lead to duplicate data processing. We will look at where and how to do manual commit in the coming steps.

4. Pause the queue on data reception

For our throttling to work consumer shouldn't be pulling data rapidly and continuously from a tightly filled queue. Some implementations throttle the processing of data by writing the logic in the application itself which works when the consumer pulls all the data from the topic and there is some delay logic written in the application. This is not a good solution and brings number of problems with it:

  1. As discussed already, data present in the memory is lost if the application restarts because of any reason.
  2. The throttling logic becomes application dependent and any bug (like uncalled callback) can lead to false throttling and even blocking the application queue.
  3. During times of high volume traffic, the service can reach high levels of memory consumption because the processing is delayed but the incoming messages are high.

So to counter the above problems we should keep all the data in the queue safely and process it batch by batch. Using Kafka we can do this by pausing a consumer when it receives a message from the topic.

1const { ConsumerGroup } = require('kafka-node');
2
3const options = {
4 kafkaHost: 'broker:9092',
5 groupId: 'order-service',
6 fetchMaxBytes: 100 * 1024, // 100 KB
7 autoCommit: false,
8};
9
10const consumer = new ConsumerGroup(options, 'OrderTopic');
11
12consumer.on('message', (data) => {
13 consumer.pause();
14 // queue data available here
15});

Now when we pause the queue we already have a batch of data in the application memory with multiple messages in it. We will resume the consumer once we process this whole batch.

But how do we know we have processed the whole batch?

5. Setting up a local, throttled queue

Wait a minute, didn't you just say that setting up an application level queue is a bad thing?

Well, yes I did!

But that was a different problem statement, where the throttling logic was implemented after bringing all the consumer data in the application destroying the whole purpose of using Kafka. And bringing 10 messages to the memory and processing it and then bringing the other 10 and so on, is much better than 1000 messages at once. Geddit??

I use async queue to implement a logical queue as it is well tested and accepted by the industry with an intuitive interface.

1const async = require('async');
2const { ConsumerGroup } = require('kafka-node');
3
4const consumerConfig = {
5 kafkaHost: 'broker:9092',
6 groupId: 'order-service',
7 fetchMaxBytes: 100 * 1024, // 100 KB
8 autoCommit: false,
9};
10
11const consumer = new ConsumerGroup(consumerConfig, 'OrderTopic');
12
13async function processData(queueData) {
14 // perform data processing here
15}
16
17const concurrency = 5;
18const q = async.queue(processData, concurrency);
19
20consumer.on('message', (data) => {
21 consumer.pause();
22 q.push(data);
23});

Now as you can see at line number 23, we are pushing the consumer data inside the async queue as soon as we receive it. This will push all the messages of the batch inside the async queue and process it accordingly. Concurrency can be set after benchmarking what suits your application.

6. Commit the messages

async queue provides a method in which we can pass a function, which runs each time the queue gets drained. That is one of the excellent reasons why I use it as a part of this whole process.

So what is the first thing we should do when the queue gets drained?
We should commit the messages that we just finished processing.

1const async = require('async');
2const { ConsumerGroup } = require('kafka-node');
3
4const consumerConfig = {
5 kafkaHost: 'broker:9092',
6 groupId: 'order-service',
7 fetchMaxBytes: 100 * 1024, // 100 KB
8 autoCommit: false,
9};
10
11const consumer = new ConsumerGroup(consumerConfig, 'OrderTopic');
12
13async function processData(queueData) {
14 // perform data processing here
15}
16
17const concurrency = 5;
18const q = async.queue(processData, concurrency);
19
20q.drain(() => {
21 consumer.commit();
22});
23
24consumer.on('message', (data) => {
25 consumer.pause();
26 q.push(data);
27});

What does committing the messages mean?

By committing the messages you are telling Kafka that you have processed messages upto this part, this is just like saving a checkpoint in any video game. If the application restarts now, you will receive messages from the last saved checkpoint i.e. the recent committed offset. Isn't that awesome?

6. Resume the queue

1const async = require('async');
2const { ConsumerGroup } = require('kafka-node');
3
4const consumerConfig = {
5 kafkaHost: 'broker:9092',
6 groupId: 'order-service',
7 fetchMaxBytes: 100 * 1024, // 100 KB
8 autoCommit: false,
9};
10
11const consumer = new ConsumerGroup(consumerConfig, 'OrderTopic');
12
13async function processData(queueData) {
14 // perform data processing here
15}
16
17const concurrency = 5;
18const q = async.queue(processData, concurrency);
19
20q.drain(() => {
21 consumer.commit();
22 consumer.resume();
23});
24
25consumer.on('message', (data) => {
26 consumer.pause();
27 q.push(data);
28});

Now that you have finished processing the current batch and safely committed it, it's time to bring the next batch of messages. This is done just after committing the messages.

And so on it continues to process data in a well mannered way.

Here is a quick video which explains the above process


This was a quick go through of a working code to create a throttled queue using Kafka. There are a lot of details which I wasn't able to cover here because those are specific to use cases and you might or might not need them.