Build Production Grade Debezium Cluster With Confluent Kafka
We are living in the DataLake world. Now almost every organizations wants their reporting in Near Real Time. Kafka is of the best streaming platform for realtime reporting. Based on the Kafka connector, RedHat designed the Debezium which is an OpenSource product and high recommended for real time CDC from transnational databases. I referred many blogs to setup this cluster. But I found just basic installation steps. So I setup this cluster for AWS with Production grade and publishing this blog.
A shot intro:
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
Basic Tech Terms:
Kafka Broker: Brokers are the core for the kafka streaming, they’ll keep your messages and giving it to the consumers.
Zookeeper: It’ll maintain the cluster status and node status. It’ll help to make the Kafka’s availability.
Producers: The component who will send the messages(data) to the Broker.
Consumers: The component who will get the messages from the Queue for further analytics.
Confluent: Confluent is having their own steaming platform which basically using Apache Kafka under the hood. But it has more features.
Here Debezium is our data producer and S3sink is our consumer. For this setup, Im going to stream the MySQL data changes to S3 with customized format.
Kafka and Zookeepers are installed on the same EC2. We we’ll deploy 3 node confluent Kafka cluster. Each node will be in a different availability zone.
172.31.47.152 - Zone A
172.31.38.158 - Zone B
172.31.46.207 - Zone C
For Producer(debezium) and Consumer(S3sink) will be hosted on the same Ec2. We’ll 3 nodes for this.
172.31.47.12 - Zone A
172.31.38.183 - Zone B
172.31.46.136 - Zone C
Kafka nodes are generally needs Memory and Network Optimized. You can choose either Persistent and ephemeral storage. I prefer Persistent SSD Disks for Kafka storage. So add n GB size disk to your Kafka broker nodes. For Normal work loads its better to go with R5 instance Family.
Mount the Volume in /kafkadata location.
Use a new Security group which allows the below ports.
Install the Java and Kafka on all the Broker nodes.
We need to configure Zookeeper and Kafaka properties, Edit the /etc/kafka/zookeeper.properties on all the kafka nodes
We need to assign a unique ID for all the Zookeeper nodes.
Now we need to configure Kafka broker. So edit the /etc/kafka/server.properties on all the kafka nodes.
The next step is optimizing the Java JVM Heap size, In many places kafka will go down due to the less heap size. So Im allocating 50% of the Memory to Heap. But make sure more Heap size also bad. Please refer some documentation to set this value for very heavy systems.
The another major problem in the kafka system is the open file descriptors. So we need to allow the kafka to open at least up to 100000 files.
Here the cp-kafka is the default user for the kafka process.
Create Kafka data dir:
Start the Kafka cluster:
Make sure the Kafka has to automatically starts after the Ec2 restart.
Now our kafka cluster is ready. To check the list of system topics run the following command.
Install the confluent connector and debezium MySQL connector on all the producer nodes.
Edit the /etc/kafka/connect-distributed.properties on all the producer nodes to make our producer will run on a distributed manner.
Install Debezium MySQL Connector:
it’ll ask for making some changes just select Y for everything.
Run the distributed connector as a service:
Start the Service:
Configure Debezium MySQL Connector:
Create a mysql.json file which contains the MySQL information and other formatting options.
“database.whitelist” - List of databases to get the CDC.
key.converter and value.converter and transforms parameters - By default Debezium output will have more detailed information. But I don’t want all of those information. Im only interested in to get the new row and the timestamp when its inserted.
If you don’t want to customize anythings then just remove everything after the database.whitelist
Register the MySQL Connector:
Check the status:
Test the MySQL Consumer:
Now insert something into any tables in proddb or test (because we have whilelisted only these databaes to capture the CDC.
We can get these values from the Kafker brokers. Open any one the kafka node and run the below command.
I prefer confluent cli for this. By default it’ll not be available, so download manually.
Listen the below topic:
mysql-db01.test.rohi This is the combination of servername.databasename.tablename servername(you mentioned this in as a server name in mysql json file).
Setup S3 Sink connector in All Producer Nodes:
I want to send this data to S3 bucket. So you must have an EC2 IAM role which has access to the target S3 bucket. Or install awscli and configure access and secret key(but its not recommended)
Install S3 Connector:
Create s3.json file.
"topics.regex": "mysql-db01" - It’ll send the data only from the topics which has mysql-db01 as prefix. In our case all the MySQL databases related topics will start with this prefix.
"flush.size" - The data will uploaded to S3 only after these many number of records stored. Or after "rotate.schedule.interval.ms" this duration.
Register this S3 sink connector:
Check the Status:
Test the S3 sync:
Insert the 10000 rows into the rohi table. Then check the S3 bucket. It’ll save the data in JSON format with GZIP compression. Also in a HOUR wise partitions.
Refer this post to setup monitoring for MySQL Connector.
Replication Factor is the other main parameter to the data durability.
Use internal IP addresses as much as you can.
By default debezium uses 1 Partition per topic. You can configure this based on your work load. But more partitions more through put needed.