DynamoDB Streams is one of my favorite features of DynamoDB. It powers a bunch of cool integrations (hello, Global Tables and Rockset)! It also enables really useful patterns within your own applications, such as materialized aggregations or updates to reduce transaction contention.
My friend Rahul emailed me with some great questions about DynamoDB Streams. In responding to him, I realized a lot of this could be publicly available. Hence, this post.
I begin with some introductory information about DynamoDB Streams, but I don't go deep there -- the DynamoDB documentation on Streams is pretty good. The heart of this post is practical -- some common questions with opinionated guidance.
What are DynamoDB Streams?
DynamoDB Streams is a combination of change data capture on your DynamoDB table with a stream-based mechanism for processing those events. Let's unpack that.
Change data capture refers to a process for recording all changes that happen within your database. Everytime you change your data -- inserting a new record, updating an existing record, or deleting a record -- something will record that change for use by another system.
Streams are a pattern for storing data and enabling processing by multiple, independent consumers. They're everywhere in computer science and have been for ages, but they've become much more popular with the rise of Apache Kafka. For more on streams, check out What should I know about streams?.
The combination of these two elements means that you get an ordered*, complete history of all changes in your DynamoDB table that you can programmatically process.
This is very powerful.
* - Mostly! See caveats in the DynamoDB Streams vs. Kinesis Streams section below.
How would I use DynamoDB Streams?
Ok, ok, I have a changelog of all the updates in my database. Who cares? Shouldn't I be happier with the data in my database?
Yes and no. The organized, current record of your database is helpful, but the record of events is useful as well.
I usually split the use cases for DynamoDB Streams into three buckets:
Adding additional functionality within my service;
Sharing information with other services;
Exporting data for analytics.
For the first use case, I'm taking other actions within my given service based on changes in my database.
Often, this involves making other changes within my DynamoDB table. For example, I may want to maintain pre-calculated aggregations, such as the number of likes for a Tweet or the number of available workers in a call center queue. By listening to records being inserted, updated, or deleted, I know how to increment my counters accordingly.
This is often work that could be done as part of the initial request as part of a DynamoDB Transaction. However, it is often better to avoid transactions where possible, particularly in high-contention use cases.
For the second category, I may want to share information with other services. If I have a microservices architecture, one service may need to update their state or perform some actions based on data changes within a different service. A new e-commerce Order may trigger a payment attempt elsewhere or a new User creation can trigger an onboarding sequence. By hooking into the database events, I can avoid the latency and difficulty around partial failures that would occur if I were to perform all of these actions in the initial flow. Many of the patterns from event-driven architectures are applicable here.
Finally, I may want to send my DynamoDB data to an external analytics system. DynamoDB is great at OLTP but not so great at OLAP patterns. By hooking into the DynamoDB Stream, I can be continuously sending data to more OLAP-focused systems that specialize in that work.
What should I know about streams?
Streams are a tricky beast for those new to it. There are much better resources out there about streams (see I Heart Logs to get started), but there are two key points that are worth pointing out.
First, streams are immutable. This isn't like a normal queue where you insert messages and delete them as they are processed. A stream is more comparable to an append-only file.
The append-only nature enables low-latency, high-throughput reads and writes. It also (in theory) enables fan-out to multiple independent consumers. Because records are immutable, each consumer can read independently but is in charge of maintaining its position within the stream.
This adds some complications around failures -- see the discussion on failures below for more.
The second key point of streams is that they scale horizontally through the use of "shards" (DynamoDB / Kinesis) or "partitions" (Kafka). The names are different, but the concepts are the same. When writing records to streams, they generally require a shard or partition key that will be used to route the record to the proper partition. You may be familiar with a similar concept through DynamoDB partitions.
The use of shards helps with scalability, but it also increases the maintenance burden for you. Depending on the streaming solution you're using, you may need to manage the number of shards in order to handle all of your events. Further, each of your consumers now needs to manage not just a single position in a stream but its position for all shards in a stream. Because of this, most people use higher-level stream processing libraries or services to manage state around processing.
Frequently Asked Questions
Should I use Lambda or KCL (or EventBridge Pipes!) to process my DynamoDB Streams?
When processing streams, whether DynamoDB Streams, Kinesis Streams, or Kafka Streams, there's a lot of management work related to the stream itself rather than to your business logic. For example:
- Keeping track of the number of shards to ensure you're consuming from all of them;
- Maintaining a consumers position within a particular shard;
- Handling shard splits and shard merges (though I don't believe DynamoDB Streams has shard merges. Other implementations do).
Further, if one of your workers dies, you'll want to detect that and spin up a new worker to avoid falling behind.
This is a lot of work! AWS might even describe it as 'undifferentiated heavy lifting'. What you really care about is the logic to process your stream records as they come through.
To help with this, there are two ways to process your DynamoDB Streams:
The KCL is pretty interesting. It's basically a Java wrapper around your stream processing application. The Java code handles all the management work around stream processing, and your application only has to think about how to operate on the records it receives.
Likewise, the AWS Lambda integration really simplifies stream processing for you. Your Lambda function will receive a batch of stream records for you to process. You indicate that the batch was processed by having your function return successfully. This indicates to the event source mapping that the batch was processed and the processor can update its checkpoint for the consumer.
Both of these make it much easier for you to process DynamoDB Streams. So which one should you use?
My recommendation is to use Lambda for processing your DynamoDB Streams in almost all situations. It's dead simple and as fully managed as you can get. Further, it may be cheaper. You generally have to pay $0.02 per 100,000 DynamoDB Stream read request units (basically, GetRecords calls that return up to 1MB of data). However, you are not charged for this if you're using AWS Lambda to process your streams.
Another benefit of using the AWS Lambda integration is that you can filter records at the stream level. This means that you can filter out records that you don't care about before they even get to your Lambda function. This can save you money and processing time. In general, Lambda is pretty cheap so this won't save you a ton of money, but it can help in some situations.
For high-volume applications with predictable load, it could be cheaper to use the KCL. In order to save money here, you'd need to bank on the reduced compute cost of getting high utilization from an EC2 instance as compared to Lambda. I generally don't see Lambda bills that break the bank, but it could be different for extremely high volume stream use cases (think: FINRA processing trading data).
Remember: if you opt in to the KCL world, you'll be on the hook for additional operational burdens as well. Further, you will have to pay for the DynamoDB Streams GetRecords API calls. In weighing potential savings, make sure to factor these in. Also, if you're using the DynamoDB Streams integration (as opposed to Kinesis Data Streams, discussed below), you can only use v1 of the KCL.
Update: Nik Pinski, a Principal Engineer on the EventBridge team, reminded me that EventBridge Pipes are able to connect to DynamoDB Streams as well. This is a great option if you're moving your DynamoDB Stream data straight into another AWS system, such as SNS, SQS, EventBridge, or Firehose. This likely fits best with the second and third categories of use cases above -- sharing with other services (e.g. via SNS or an EventBridge Bus) or for exporting to an analytics system (e.g. via Firehose). It won't work quite as well if you're doing something more custom, such as incrementing an aggregation in your DynamoDB table or sending welcome emails to new users.
Should I use DynamoDB Streams or Kinesis Streams for DynamoDB change data capture?
Throughout this post, I've been using the term "DynamoDB Streams" when a more accurate term might be "DynamoDB change data capture". The original implementation of DynamoDB change data capture was to the DynamoDB Streams implementation. However, in 2020, the DynamoDB team announced that you could send your change events to Amazon Kinesis instead of using DynamoDB Streams.
Amazon Kinesis is a dedicated streaming solution within AWS that can be used for many things, not just DynamoDB. It's actually a whole suite of streaming solutions, including Kinesis Video Streams for video streaming and Kinesis Data Firehose for piping data to S3 or Redshift. The most directly comparable service to DynamoDB Streams is Kinesis Data Streams.
I still recommend DynamoDB Streams in most cases. However, there are two main benefits to using Kinesis Streams:
More consumers. DynamoDB wants you to use no more than two consumers on your DynamoDB Stream. Further, if you are using Global Tables, that counts as one of your consumers.
Kinesis, on the other hand, allows for up to 20 consumers of a stream when using enhanced fan-out. Further, you'll get higher throughput and lower latency from the enhanced fan-out implementation than you will from standard DynamoDB Streams.
Direct connection to Kinesis Firehose. As mentioned above, Kinesis Firehose is a tool for buffering data and sending it to a downstream destination like S3, Redshift, or OpenSearch. It's very common to use Firehose to archive your data or to enable analytics queries with Amazon Athena.
With Kinesis Data Streams, I can create a Firehose that uses the stream as a source directly. However, if I'm using DynamoDB Streams, then I need to process the stream and manually push the data into Kinesis Firehose. This creates additional work for me and counts as one of the consumers on my DynamoDB Stream.
There are a few other benefits of Kinesis Streams, such as a configurable retention period, that may factor in as well.
Given these benefits of Kinesis Data Streams, why do I still recommend DynamoDB Streams in most cases? Three main reasons, one of which looms large over the others:
Fully managed. If I use Kinesis Data Streams, I need to manage the number of shards in my stream. This means scaling up and down to meet load or, more likely, being over-provisioned to account for peak traffic.
With DynamoDB Streams, I don't need to think about that at all. DynamoDB manages all the shards for me. Serverless FTW.
Ordering and duplicates. In using DynamoDB Streams, it is guaranteed I won't receive duplicate records of write operations in my stream. Further, records within a shard will be strictly ordered according to the time it occurred.
If you're using the Kinesis Data Streams integration, you can receive duplicate or out-of-order records in rare circumstances. Ideally you're developing your application in a way that can handle this, but it can add complications.
Pricing. DynamoDB Streams don't cost anything to enable, whereas you need to provision Kinesis Data Streams and pay on both a per-shard and a per-request rate. Further, as discussed above, you don't pay for DynamoDB Streams reads when processing with Lambda. Save those pennies and upsize your value meal.
For most people, the key question is which they value more -- the ability to process with more consumers or the benefit of avoiding shard maintenance. I really dislike shard maintenance and thus opt for DynamoDB Streams, but the benefit of more consumers is really big.
So while we're here, my #awswishlist is to make everybody happy by increasing the number of consumers on a DynamoDB Stream. I don't need all 20 of Kinesis, but 5 would be awesome.
What counts as a consumer for my consumer limit?
I mentioned above that DynamoDB Streams wants you to have no more than two consumers per stream. Note that this includes all consumers of your stream, including some surprising ones.
Each of the following counts as a consumer:
- AWS Lambda function via EventSourceMapping;
- Kinesis Client Library worker;
- EventBridge Pipes configuration;
- DynamoDB Global Tables (Global Tables use DDB Streams for cross-region replication!);
- Your home-rolled AWS SDK integration.
Note that the implementation for this limit is at the API level -- the DynamoDB Streams GetRecords API will start throttling your requests. While you could muddle through with more consumers and a delay, I wouldn't recommend it.
Do DynamoDB Streams consume read or write throughput on my table?
Nope! DynamoDB Streams are separate from the RCU and WCU consumption on your table. Thus, you don't have to worry about table-level throttling from your stream. If you read 1 MB of data from your DynamoDB Streams, it won't count as read requests against your table.
However, remember that DynamoDB Streams do have their own limits. The two-consumer limit on your DynamoDB Streams is downstream of a read-request limit on the stream. If you make too many read requests per second, you will be throttled.
How do I handle errors in my DynamoDB Stream?
This is the thing that trips people up the most when working with streams for the first time.
Remember that in processing streams, the records in the stream persist. You don't delete them like you might with a queue. Rather, the consumer is responsible for keeping track of its location in the stream as a way to keep moving forward.
This means that errors are hard! What happens if you receive a record that you don't know how to process or that is failing in some way? If you throw an error in your Lambda function or your KCL consumer, you won't make progress on your stream. You'll receive the same batch of records for re-processing. If the error isn't transitory, this could stop all progress in your consumer.
In general, you need to think about two high-level concerns:
- What would happen to my downstream application if all processing stopped?
- What would happen to my downstream application if I skipped a record (even temporarily)?
Most applications won't tolerate long periods of paused processing. That triggers an urgent alarm as users will see stale data. Further, notice that it wouldn't be all data that's delayed. It would be only the data within the particular shard that had an error. This could result in systems getting out of sync and in weird states.
However, many systems will have issues with skipping a record unless you specifically design for it. This could happen, for example, if an item is going through some workflow. Your application may find it difficult to move from State A to State C without having the State B transition.
In general, you can usually design your system to handle temporarily skipped records, so consider what that looks like with stream processing.
Additionally, consider the impact of batches on your processing. Imagine you receive a batch of 100 records, and the 45th record ends up throwing an error. If you fail the batch, you'll receive all the records again, including the 44 that completely successfully. If you're not careful about idempotency, this could lead to wonky results in your downstream applications.
If idempotency is a problem, AWS Lambda also provides failure handling mechanisms to avoid retries in the face of partial failures. You can indicate some items succeeded while others failed, or you can bisect your batch to discover the poison pill record.
Update: Heitor Lessa, a Principal Architect at AWS, reminded me that Lambda Powertools has helpers for error handling. These are super helpful -- the BatchProcessor will generate a proper error response in the case of partial failures, and the Idempotency decorator will help guard against duplicate processing of a function. Check these out to see how they work, but the key point is that they help abstract a lot of the boilerplate around these tricky areas. Then, you only need to think about processing an individual record and throwing an error if something fails.
All of these failure scenarios require careful consideration of your application requirements and potential failure modes.
Will every write operation to my DynamoDB table be reflected in my DynamoDB Stream?
Yes and no! This was a surprising lesson I learned recently from the excellent Pete Naylor.
Let's start with the yes part -- for the region in which a write occurs, you will see a record of that write operation in the DynamoDB Stream for that region.
However, the case is more nuanced if you're using DynamoDB Global Tables. With Global Tables, DynamoDB will use your DynamoDB Stream to replicate writes across tables in multiple regions. In the region that was not the source for the original write, DynamoDB may skip the application of earlier writes if there are multiple write operations to the same item in a short period.
To make this more practical, imagine you have a DynamoDB table in two regions -- us-east-1 and eu-west-1. There is an item, "Item A", that receives three updates ("Update X", "Update Y", and "Update Z") in a short period of time in the us-east-1 region.
The DynamoDB Stream in us-east-1 will have a record of each of the three write operations. However, in eu-west-1, it's possible that the service doing the replication will see that Item A has three updates in close succession and thus only apply Update Z to the table in eu-west-1. Accordingly, the DynamoDB Stream in eu-west-1 would only have a record of the last write operation.
Note that this only happens in non-source regions with Global Tables -- each write will be reflected in the DynamoDB Stream of the source region (though you could have multiple source regions for your table!). Further, you will never receive a record in your DynamoDB Stream that shows an item in a state that never existed (such as if it was a combination of Update X and Update Z but not showing Update Y yet).
Thanks again to Pete Naylor for this one -- check out his thread on this here.
What do my DynamoDB Stream records look like?
In the follow-up to this post, I got a few questions and comments related to the shape of the records when consuming from a DynamoDB Stream.
I don't want to take up space with the actual format of a Stream record as the reference documentation is pretty good for that. However, I do want to cover a few main points about the shape.
First, you will receive a batch of records. Stream processing almost always works in batches as you're working with high-velocity data. You can control the batch size when processing your DynamoDB Stream, but you'll still get an array even if you set the batch size to one. Make sure you're handling the array according, including thinking about processing in parallel and how that will be affected if you have multiple records affecting the same key.
Second, you have some control over the data included in the record. When configuring a DynamoDB Stream, you get to choose how much data is included in the stream record. At the sparse end, you can use the
KEYS_ONLY configuration so that only the primary key of the item affected will be included in the record. At the verbose end, you can use
NEW_AND_OLD_IMAGES to have each stream record include both the item as it looked before the write operation and the item as it looked after the write operation. For in-between options, there are
NEW_IMAGE configurations as well.
I generally go for the
NEW_AND_OLD_IMAGES option as it provides the most flexibility. The main reason to use smaller images is to (1) increase the number of records in a single batch, and (2) reduce costs from processing. If you're using AWS Lambda for processing your stream, the second point doesn't really apply as you don't pay for GetRecords requests from your stream.
Note that you cannot change the stream view type after you create the stream. You would need to delete your stream and create a new one, which could cause some data loss if you rely heavily on streams in your processing.
Third, each record indicates the type of modification. In each record, there will be an
eventName property that indicates how the item was altered. It will be one of three values:
INSERT, indicating the item did not exist before and was created as part of the write operation;
MODIFY, indicating the item did exist before and was altered (but not deleted) as part of the write;
REMOVE, indicating the item did exist before and was deleted as part of the write.
Importantly, the eventName does not correspond 1:1 with
DeleteItem in the DynamoDB API!
If you do a
PutItem operation, you will get an
MODIFY record depending on whether a previous item existed. The same principal holds for the
UpdateItem operation. Further, the
DeleteItem operation won't throw an error if the item doesn't exist, so even a successful
DeleteItem request won't necessary result in a
REMOVE record in your stream. Think carefully about this when designing your stream processor!
Thanks to Mike Roberts for noting this behavior on the
Can I distinguish between TTL deletes and my own application deletes?
So background -- DynamoDB has a feature called Time to Live (TTL) that allows you to set an expiration time on items in your table. When the expiration time is reached, DynamoDB will automatically delete the item from your table. This is a great feature for data that has a natural expiration time, such as session data or temporary data.
When an item is deleted due to TTL, it will be reflected in your DynamoDB Stream. However, the
eventName property will be
REMOVE just like a normal delete. In certain cases, you may want to distinguish between a TTL delete and a normal delete. For example, you may want to log TTL deletes to a different location than normal deletes or perform certain cleanup actions when an item is deleted due to TTL.
If your item was deleted from DynamoDB TTL, it will have a
userIdentity property in the stream record with the following shape:
If you see this in your Stream, you know the delete was from a TTL operation.
Note that the
userIdentity property does not exist for other operations, so be careful around accessing it!
Thanks to Shahar Mosek for this note and others in this post!
If a DynamoDB Stream flows into a river when no one is around, does it make a sound?
I hope this was a helpful overview of DynamoDB Streams. As I mentioned earlier, this came out of an email discussion. If you have additional questions about DynamoDB Streams, hit me up and I'll add them to this post.
Further, if you have other questions about DynamoDB or other AWS services, I'm available to discuss them as well!