Jan 12, 2021, 12:00 AM by Wolfgang Sourdeau
Please note that the data units used in this article use the old notation where “1 megabyte” means “1,024 kilobytes” and is abbreviated “MB” instead of “MiB”. The same goes for kilobytes and gigabytes.
Our benchmark installation for Logibec NOAH is made of one service and three virtual machines (VM) in total.
The service is an Azure PostgreSQL database which is configured to use 2 vCores to play the role of the source system.
The first VM is a B2ms that plays the role of the on-premise processing machine. B2ms have 2 cores and 8 GB of RAM. This VM is equipped with a hard drive that has a performance of 500 IOPS and the VM itself is reportedly able to reach 1,920 IOPS. The software components that are installed are the Debezium CDC Connector for PostgreSQL – a Kafka broker.
The Kafka consumer can read the messages from the broker and store them on a disk. Then, the Kafka producer can read those messages back into memory and send them to a Kafka broker stored on the third machine. The consumer and producer components are meant to emulate a somewhat complex setup that makes use of a third-party system that we left out for the sake of simplicity.
The second VM is an A4m v2 with 4 cores and 32 GB of ram. It plays the role of the external Kafka broker that receives the messages out of the first VM. Nothing will otherwise be measured on that VM.
Finally, we have a third VM dedicated to the collection and display of metrics via Prometheus, a metrics aggregator, and Grafana, a data analysis and visualization tool.
The benchmark was meant to answer those two questions:
- What’s the minimum amount of memory and CPU cores required to handle 5 GB of data in a day?
- How many gigabytes per day can that configuration support?
The metrics we’re interested in are the CPU and memory usage of the overall system, the amount of memory that’s allocated on the Java heap, and the rate at which each Kafka process handles the messages. The Kafka processes will be run with the agent provided with the Prometheus JMX Exporter, which allows to expose metrics in Prometheus format on a configurable HTTP port.
Since Spark 3 has been released, it has also been possible to export the Spark metrics using a Prometheus servlet. Although we managed to make it work, we were unable to collect metrics about the message rate, which was remaining at a constant value of 0 messages/second.
To emulate 5 GB of data, the PostgreSQL database has been populated with 128 tables of 64 columns each and 65,536 records. Each data cell contains a string of 10 random alpha-numeric characters. As we’re using JSON encoded messages, there’s a slight overhead with each message.
In total, the real amount of data exchanged through the Kafka tallies around 7 GB. We will consider it as a normal transport overhead and will ignore it in our calculations. Excluding that overhead, the size of each message is 2,560 bytes for a total of 2,097,152 messages.
From the logs, we could see that the first message has been emitted from the CDC connector at around 4:44 PM and the last message was received by the Cloud broker at 5:06 PM, 22 minutes later.
This gives us a rate of 232 MB of useful data per minute, 3.87 MB per second. We can extrapolate this to be 326 gigabytes per day.
The overall message rate of the system was around 95,325 messages per minute, or 1,588.75 messages per second.
We can conclude that the on-premise machine configuration is more than enough to handle 5 GB per day. However, from the graphs, we can see that the overall system consumption climbs to a bit more than 5 GB of RAM. Therefore, 8 GB for a RAM system appears to be a comfortable minimum.
Note that we’ve been using the default memory configuration switches for those processes and no optimisation had been performed.
On the Grafana panel for the CDC Message rate, we notice that there were around 1,650 messages/second. Interestingly, the memory consumption of the CDC component seems to oscillate and to reach suspicious highs of more than 1.4 GB.
Considering the fact that each second would require only 4 MB for one copy of all the messages and that the heap consumption of the Kafka broker was lower than 380 MB during the whole experiment, we’re a bit perplexed. This phenomenon might be the subject of a future article.
Although it’s not surprising that a modern system would be able to handle our 5 GB payload at this rate, we wanted to explore ways of making things even faster.
The rule of the game here was that we can change the VM parameters or the software parameters on the VM but nothing else. Although we wrote the two Spark components, code optimization is out of the question, as we want to consider the initial software components unchanged as a baseline.
Optimizing the database performance or the on-cloud Kafka server was out of scope. We measured the throughput based on the time the first message was processed by the Debezium connector and the last message sent by the Spark producer.
The measurement we performed using the initial parameters gave us a total delay of 16 minutes and 35 seconds.
First, we tried to see if performances could be improved by using a separate volume for the broker “log files” while the spark consumer process would keep storing its files on the first disk. The B2ms VM we’re using offers 1920 IO operations per second. Since a standard disk offers a performance of 500 IOPS, we hypothesized that a second one would allow a total performance of 1000 IOPS, which could potentially improve the throughput. This turned out to have no effect, no matter the storage type that was chosen (HDD Standard, SSD Standard or SSD Premium). This indicates that Kafka is keeping most of its messages in memory.
Then, we increased num.partitions from 1 to 2 in the broker’s configuration, and we used an additional consumer. We suspected it would allow us to benefit further from the additional number of cores, but the performance remained the same.
As an afterthought, it might have had an impact if we had doubled the number of cores once more.
The Kafka broker has a num.network.threads parameter that was initially set to 3. Considering the fact that the broker had at least one connection to the Debezium connector and another one to the Spark consumer, we modified it down to 1.
We thought that it might decrease the performance, but it had no impact.
Decreasing it further down to 1 led to a serious drop of performance by increasing the total time to 46:49 minutes. This spectacular drop in performance remained the same after our second try.
Setting the number of threads to 2 improved total delay to 18:09 minutes.
We hypothesized that adding more cores wouldn’t have much of an impact since the Kafka workflow is presumably IO bound. However, it might reduce the contention between the 4 active processes of the stack.
We replaced our VM model with a B4ms, giving us 4 Cores and 16 GB of RAM instead of 2 GB and 8 GB, respectively. We expected that the increase of RAM would have an impact, since the total memory usage would be far below the 8 GB available. This configuration upgrade allowed to decrease our number down to 13:30 minutes.
The further increase of num.io.threads from 2 to 5 threads in the broker’s configuration reduced that number to 13:01 minutes. At this point the improvement was too small for us to consider this change to have any impact.
We then proceeded with further attempts to reduce the contention for resources by testing one data transfer between two components at the time.
First, we measured the transfer times individually. From the Debezium connector to the Kafka server, the transfer lasts 14:48 minutes. From the Kafka server to the consumer, the transfer time is 8:59 minutes.
Lastly, we measured the time to transfer the resulting files using our producer program to be 4:19 minutes. We also used the kafkacat utility to measure the feeding time of our Kafka server while getting rid of the latency induced by the network connection to PostgreSQL.
The data transfer from kafkacat to the Kafka server took 14:49 minutes, which is very similar to the performance with the Debezium connector.
Then, we modified the size of the network buffers in the broker’s configuration by modifying both socket.send.buffer.bytes and socket.receive.buffer.bytes to 4,194,304 bytes.
This change would allow the handling of a bigger chunk of data at a time, thereby reducing the number of instructions to execute and consequently the CPU usage. This improved the execution time of kafkacat, now reduced down to 7:09 minutes.
However, the performance of the Debezium connector remained the same at 15:24 minutes, which is surprisingly similar to its initial result. The performance of the consumer remained unaffected by this change.
We pushed it a little further by changing the configuration of the Debezium connector. The database buffers can be modified through the snapshot.fetch.size and max.batch.size parameter. We set both to 65,536. This is roughly 3 times as much as the default value of 20,240.
Again, the performance of the connector remained the same, at 15:22 minutes.
Since it seemed that we had nothing left to change at the individual process level, we ran the full chain of programs together with the new parameters.
With the B4ms VM, the results were very similar to the initial results, with a timing of 15:05 minutes. Surprisingly. After a switch back to the B2ms model, the results were again very similar, at 15:59 minutes.
A final execution of the full chain using kafkacat was timed at 10:49 minutes. We attribute this to the significant bottleneck between PostgreSQL and the Debezium CDC connector.
To complement our analysis, we could have played with the database parameters. We also could have taken a deeper approach to performance diagnosis by using a Java profiler or even “strace”. This would have given us a peek on what our threads were really spending time on.
It would be interesting to test Kafka’s behavior with more connectors or even experiment with more classic and kafkaesque setup with one or more machine hosting a single Kafka server each.
The 8 GB configuration with 2 cores is more than safe for a configuration to handle 5 GB per day. Surprisingly, we found that we could handle roughly 326 gigabytes per day, which is more than 60 times as much.
The parameter that had the biggest impact on the performance were the num.io.threads in the broker’s configuration. With an optimized configuration, we could reach a performance of 450 gigabytes per day, which is amazing considering how cheap this setup is nowadays.