‘Exactly Once’ processing with Spark Structured Streaming
Abstract
A stream processing system executes a set of operators continuously on a never-ending set of records. As the events (also called tuples, records, messages, rows) occur in real world they are fed into the streaming system as an when it happens to make sense out of them in conjunction with other source of tuples, historical context, and/or dynamic lookups in real time. Spark Streaming is one such system which can cater to processing of tuples in real time and generating business value of them. As these jobs run continuously on unbounded data there are certain characteristics expected out of them such as end to end latency, throughput, ordered evaluation, recovery, failure handling, guaranteed processing and delivery semantics. In this blog we primarily explore how to achieve ‘exactly once’ delivery semantics within the scope of Spark Structured Streaming and subsequently a few code samples on the experimentations done.
Introduction
Generally a stream processing applications supports one or more of the following delivery semantics.
- At most once: The consuming application will see the tuple only once or never. There might be data loss, but no duplicates are allowed. This is used for application where the loss of a few records are tolerable and end to end latency is a major requirement.
- At least once: The consuming application will always see the tuple however duplicates might be generated. This is the most often used SLA for majority of the use cases which can tolerate duplicates but no data loss.
- Exactly once: The holy grail of delivery semantics. The consuming application sees the tuples only once and processes it only once irrespective of failures, rebalancing, sharding, restarts upgrades etc.
- Ordered delivery: The consuming application sees the tuples in the same order as generated by the source system. Typically, this is achieved only within a partition for partition enabled sources as total ordering is very costly(although achievable).
Exactly once is a hard problem but with some support from the target system and the stream processing engine it can be achieved.
Traditionally we have looked at it from the producer’s perspective, as to whether the producing application can write a tuple once and only once for the consumer to consume. However, if we look at it from the consumer’s perspective, what matters is whether the consuming application can see only the unique records and discard the rest at the source before it’s given to the downstream processing elements.
Why ‘Exactly Once’ is hard ?
- In a distributed processing system any failure in the pipeline could reprocess the tuples generating duplicates
- The source system might resend the tuples if it didn’t receive a commit message.
- The operators can reprocess its tuples if it fails and restarts the processing from last stored state/tuples.
- The target system might go down after inserting the tuple but before sending an ack back causing the processing system to retry the delivery
- There could be a network partition due to which ack/commit messages are not delivered and/or delivered late.
So if it’s so hard how does systems such as Kafka Streams support for it ?
- Kafka producer implicitly retries the delivery of messages if an ack is not received from the broker within a time out or it gets an exception. This could lead to duplicates if broker is slow in responding or commits and fails to send an ack back.
- Each batch of Kafka messages are associated an unique monotonically increasing id which is sent along with the batch. The broker persists this and if it sees a message with the same id or lesser it discards the message as duplicate. This effectively makes the send operation idempotent.
- Just idempotency doesn’t solve the end to end exactly once. The consumer can still generate duplicates or a process can fail and reprocess tuples.
- Kafka added support for transactional writes across topics and partitions and also the consumer can be acked in the same transaction. This makes the read -> process -> write part of the same transaction making the entire operation atomic.
- Zombie processes are automatically handled by associating a transactional id with each producer and a version number. Any producer with a lesser version number than current are fenced.
- Consumers read only on ‘read_committed’ messages.
Please refer to the following excellent post for the details of transaction handling in Kafka https://www.confluent.io/blog/transactions-apache-kafka/
What is provided in Spark Streaming ?
- Spark provides a hook to uniquely identify a partition and a monotonically increasing epoch/version/batchId for each minibatch generated that is maintained even across failovers and restarts.
- Using these two information (partition, epoch) we can uniquely identify a duplicate attempt to write to an external target for a particular partition in a minibatch
- In case of a failure at any point in the pipeline Spark guarantees to replay the transformations following the lineage graph it maintains.
- With a deterministic set of transformations and an immutable set of data that it picks up from the external replayable source the same result is generated along with the same (partitionid, epoch).
- Spark provides a function ForEach to atomically commit a minibatch’s partition.
- The (partitioned, epoch) is maintained in the checkpoint directory across failover/restarts.
ForEachWrite Action
Modulus Operandi in Spark for Exactly Once
@Overridepublic boolean open(long partitionId, long epoch) {// Check if (partitionId, epoch) is committed.// If so exit else continue}
@Overridepublic void process(String record) {// Write record to external system}
@Overridepublic void close(Throwable errorOrNull) {// Commit the data records and the (partitionId, epoch) in the same call/transaction}
Achieving exactly once with specific targets
Now lets see how we can implement this for certain target systems. We need to either use the transactional capabilities of the target system or use an atomic operation intelligentlly
1. Relational Databases
- Leverage transactional capabilities of relational databases
- Have a auxiliary table in the database which maintains the last successfully committed (partitionId, epoch).
- Insert the above tuple in the same transaction as the actual records in the primary tables.
- When attempting to insert any new partition of data, first check if the existing epoch number is greater or equal to the epoch received for that particular partition. If so discard the attempt to write.
- If not insert the records of that partition and the (partitionId, epoch) tuple in the database as part of the same transaction and commit.
2. HDFS Target
- Leverage atomic ‘rename’ operation of HDFS intelligently
- For each partition create a part- file with the extension ‘.uncommitted’
- Write the records for that partition then rename the file to ‘.committed’ atomically.
- The consumer (custom HDFS Reader) can have a isolation level set to either ‘read_committed’ or ‘read_all. If read committed then it only reads the files with ‘.committed’ extension else it sees all (intermediate data).
- The file name should contain the (partitionId, epoch) tuple. If an attempt to write a .committed file then the attempt has to be discarded considering its a duplicate attempt.
- Else the file can be overwritten with the partition data discarding the previous one and committing(renaming) at the end.
- The consuming application if its configured for ‘read_committed’ only sees data exactly once.
3. Amazon S3
- Leverage S3’s atomic ‘close’ operation.
- Amazon S3 SDK v3 uploads data in chunks(multi part upload), collates them together when the close method is called and makes the S3 object available to the consumer only when the close is successful. The consumer don’t see the object if close is not called/successful.
- Create the S3 object with the (partitionId, epoch) in the name.
- In the close() method of ForEach, call S3’s close() which will atomically make it available.
- If Spark fails to commit to Kafka(source) after a successful S3’s close(), then the partition would be reattempted causing the operation to performed again. But this would be idempotent since the S3’s consumer would see the same file with the same data. There wont be any intermediate or duplicate file.
Confluent’s new S3 connect adapter takes the same approach for claiming exactly once support. Check out the following article https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once.
4. Kafka
- Leverage Kafka’s transactional capabilities.
- Enable idempotent writes to handle producer duplicates when the producer retries delivery
- Create a system topic ‘committed_batch_partitions’.
- Along with the actual data written to Kafka target topic also write the. (partitionId, epoc) to the system topic in the same transaction making them either commit both or none.
- When a new (partition, epoch) is seen in open() method first read all the messages from the system topic ‘committed_batch_partitions’ and check if (partitioned, epoch) already exists. If so discard the write attempt. If not proceed.
- The retention of the. ‘’committed_batch_partitions’ is ideally set to a low value.
- The downstream Kafka consumer should be set to isolation level ‘read_committed’.
Assumptions
- It is assumed that the upstream system(external source) is not generating duplicate records. If so a key based deduplication logic needs to be applied. Spark provides ‘dropduplicates’ function for this purpose which should be used in this case.
- If the Spark libraries are updated such that a different optimisation algorithm is enabled or the source produces a different number of partitions or a different dataset in between restarts then exactly once cannot be guaranteed with the (partitionID, batchId) tuple. All the operations has to be deterministic and the dataset read by the source has to be repeatable.
- Spark’s DataSourceV2 api provides ‘almost’ transactional capabilities to atomically commit all the partitions in a minibatch is not explored here.