Kafka as Data Integration solution — Part 2 — Transactional Consistency
For a database consistency is a must-have capability. A financial booking is made for one account with +100, its counter booking -100 assigned to another account and both records together are committed.
It is an all or nothing, either both changes are successful or neither — the Atomicity of the ACID requirements of a database. Also on the reading side, the Consistency: No matter at what point in time somebody queries the sum of all bookings, it will report zero because the entire financial book must be balanced at all times.
With Kafka that is a problem at multiple, sometimes surprising, places.
Last or not last record, that is the question
Starting with the most simple case, one topic partition contains above stream of changes. The consumer reads the changes but when does he commit the data in the target database? The standard approach by Kafka Connect and others is to commit either after n seconds or m records, whatever comes first. But that for sure will create inconsistencies. In the Kafka partition the change with the +100 is record number 1000 by coincidence and that’s the last record for a batch, hence it will be committed by such a consumer. The next record with the -100 will be in the next batch and in worst case it takes n seconds until that one gets committed. Not good.
In short, in stream processing we don’t know when the last record of a transaction was read.
Kafka transactions
Things get dangerous when they are called the same but mean different things — Kafka transactions are a case in point. The problem the Kafka team solved two years ago is when one producer writes into multiple topics and fails in the middle. Then the data in all topics should be invisible to the consumers. Kafka transactions are the technical solution for this problem — it makes data in topics appear. Every produced record gets a transaction number assigned and in a separate topic the aborted and committed transactions are loaded. The consumer reads the data but pauses the first time it finds a transaction neither committed nor aborted.
This solves the atomicity to some degree (But questions, Is this the last record? Has all data of all topics being read?, are not answered) and the same with consistency. At the same time it creates a whole lot of troubles we are not used from the database world.
We can break any transactional consumer simply by adding a single transactional record to the topic and not committing it for a long time. The entire consumer will pause at that point. If an initial load is performed for multiple 100 millions of records, this will indeed take a while and thus pause all other data in the same topic.
The solution to these problems is a transaction timeout in Kafka, by default set to 60 seconds. With the unintended consequence that each transaction must be completed in that amount of time. That will prevent most initial loads and even large deltas might break the producer. Increasing the timeout helps the producer and creates even more troubles for the consumers.
No, Kafka transactions are aimed to support millions of tiny, short lived transactions like in the IoT world. But not for database transactions.
Transaction topic
The solution is for each producer to add the source transaction number to the record. The finance booking with all the columns plus one additional column containing the source database transaction number. But even that is not sufficient, we still don’t know if the current record is the last.
A convenient way would be when the producer marks the last record of a transaction as being the last one. It creates a bit of a mess in the producer logic. But the biggest problem is, it does not solve all scenarios either. If all data goes into a single topic partition, yes. But when the data is loaded in two partitions and one consumer for each partition collects the data, one will see the commit flag, the other will never get one.
This would work if the producer does assert a tight control about the partitions data is loaded, not Kafka, and it insert commit records in all partitions it did put data into. Doable, but takes away a lot of flexibility of Kafka.
Hence the only solution is a separate transaction topic and doing things differently than Kafka transactions.
Load the data but do not commit
The consumer does create a new database transaction for each source transaction and is using the transaction topic to decide when to commit the data in the target. The transaction records holds the information about the offset of the last record of each partition, otherwise it might happen that the transaction topic is read premature before all data from the other topics are consumed entirely. Remember: The order of messages is guaranteed only within a single partition, not across partitions!
Produce objects, not table rows
The ideal solution would be if one row contains all related data. In above example, the message payload must not be two rows with booking and counter booking, it could be one message with an array of bookings. Granted, a bit odd. Especially if millions of rows are produced in one transaction this will not be feasible.
A Sales order on the other hand that contains the order header data plus an array of all line items — a complete sales order document so to speak — is actually a good idea for other reasons as well. Does not solve consistency problems with other data, e.g. does the buyer master data record exist already?, but limits the inconsistency problem but some.
Where does that leave us?
Frankly, not in a good position. For this requirement to be achievable, the producer and consumer must work together in harmony. There is no out-of-the-box solution in Kafka and probably none feasible without severe downsides.
If some inconsistency can be coped with in your project, then fine. If not, be very careful!