QLDB Streams is a feature that allows changes made to the journal to be continuously written in near real time to a destination Kinesis Data Stream. Consumers can subscribe to the stream, and take appropriate action. There are a number of advantages of this approach:
- QLDB Streams provides a continuous flow of data from a specified ledger in near real time
- QLDB Streams provides an at-least-once delivery guarantee
- Multiple streams can be created with different start/end dates and times. This provides the ability to go back and replay all document revisions from a specific point in time.
- Up to 20 consumers (soft limit) can be configured to consume data from a Kinesis Data Stream
To try out QLDB Streams for yourself, take a look at our demo application - QLDB Demo App.
Configuring QLDB Stream Resource
There is CloudFormation support for configuring a QLDB Stream resource. There are a number of required properties that need to be set:
- InclusiveStartTime - the start date and time from which to start streaming journal data, and which cannot be in the future. If it is before the ledger's creation, QLDB defaults this value to the ledger's creation date time.
- KinesisConfiguration - the configuration settings for the destination Kinesis data stream, which specifies whether aggregation should be enabled and the ARN of the stream
- LedgerName - the name of the ledger
- RoleArn - the ARN of the IAM role to grant QLDB permission to write to data to Kinesis
- StreamName - the name of the QLDB journal stream
QLDB Streams are supported by CloudFormation. The following CloudFormation template will create:
- A QLDB Ledger
- A Kinesis Data Stream
- A QLDB Stream to publish changes in the ledger to the data stream
AWSTemplateFormatVersion: 2010-09-09
Resources:
QLDBLedger:
Type: AWS::QLDB::Ledger
Properties:
Name: qldb-guide-ledger
DeletionProtection: false
PermissionsMode: ALLOW_ALL
QLDBKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: qldb-kinesis-stream
ShardCount: 1
QLDBStreamRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- qldb.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
ManagedPolicyArns:
- !Ref QLDBStreamManagedPolicy
QLDBStreamManagedPolicy:
Type: 'AWS::IAM::ManagedPolicy'
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: QLDBGuideStreamPermissionsSID
Effect: Allow
Action:
- 'kinesis:PutRecord*'
- 'kinesis:DescribeStream'
- 'kinesis:ListShards'
Resource:
Fn::GetAtt: [QLDBKinesisStream, Arn]
QLDBStream:
Type: AWS::QLDB::Stream
Properties:
InclusiveStartTime: "2020-05-29T00:00:00Z"
KinesisConfiguration:
AggregationEnabled: true
StreamArn:
Fn::GetAtt: [QLDBKinesisStream, Arn]
LedgerName: !Ref QLDBLedger
RoleArn:
Fn::GetAtt: [QLDBStreamRole, Arn]
StreamName: qldb-stream
This template can be run using the following command:
aws cloudformation deploy --template-file ./qldb-stream.yaml \
--stack-name qldb-guide-demo --capabilities CAPABILITY_IAM
QLDB Stream Record Types
There are three different types of records written by QLDB. All of them use a common top-level format consisting of the QLDB Stream ARN, the record type, and the payload:
{
qldbStreamArn: string,
recordType: CONTROL | BLOCK | REVISION_DETAILS,
payload: {
// data
}
}
CONTROL Record
A CONTROL record is the first record written to Kinesis, and the last record written when an end date/time is specified. The payload simply states whether this is the first event ‘CREATED’ or the last event ‘COMPLETED’.
{
controlRecordType:"CREATED/COMPLETED"
}
BLOCK Record
A block summary record represents the details of a block that has been committed to QLDB as part of a transaction. All interaction with QLDB takes place within a transaction. In the demo application, when a new Bicycle Licence is created, there are 3 steps carried out:
- A lookup is made on the table to check the provided email address is unique
- A new licence record is created
- The licence record is updated to include the document ID generated and returned by QLDB in step 2
The resulting BLOCK record for this is shown below:
{
blockAddress: {...},
...
transactionInfo: {
statements: [
{
statement: "SELECT Email FROM BicycleLicence AS b WHERE b.Email = ?\",
startTime: 2020-07-05T09:37:11.253Z,
statementDigest: {{rXJNhQbB4tyQLAqYYCj6Ahcar2D45W3ySfxy1yTVTBY=}}
},
{
statement: "INSERT INTO BicycleLicence ?\",
startTime: 2020-07-05T09:37:11.290Z,
statementDigest: {{DnDQJXtKop/ap9RNk9iIyrJ0zKSFYVciscrxiOZypqk=}}
},
{
statement: "UPDATE BicycleLicence as b SET b.GUID = ?, b.LicenceId = ? WHERE b.Email = ?\",
startTime: 2020-07-05T09:37:11.314Z,
statementDigest: {{xxEkXzdXLX0/jmz+YFoBXZFFpUy1H803ph1OF2Lof0A=}}
}
],
documents: {...}
},
revisionSummaries: [{...}]
}
All of the PartiQL statements executed are included in the BLOCK record, including SELECT statements, as they form part of the same transaction. If multiple tables are used, then statements against all tables carried out in the same transaction will appear in the BLOCK record.
REVISION_DETAILS record
The REVISION_DETAILS record represents a document revision that is committed to the ledger. The payload contains the latest committed view, along with the associated table name and Id. If three tables are updated within one transaction, this will result in one BLOCK record and three REVISION_DETAILS records. An example of one of the records is shown below:
{
tableInfo: {
tableName: "Orders",
tableId: "LY4HO2JU3bX99caTIXJonG"
},
revision: {
blockAddress: {...},
hash: {{hrhsCwsNPzLjCsOBHRtSkMCh2JGrB6q0eOGFswyQBPU=}},
data: {
OrderId: "12345",
Item: "ABC12345",
Quantity: 1
},
metadata: {
id: "3Ax1in3Mt7L0YvVb6XhYyn",
version: 0,
txTime: 2020-07-05T18:22:14.019Z,
txId: "84MQSpihZfxFzpQ4fGyXtX"
}
}
}
Processing Events in AWS Lambda
By default, the QLDB Stream is configured to support record aggregation in Kinesis Data Streams. This allows QLDB to publish multiple stream records in a single Kinesis Data Stream record. This can greatly improve throughput, and improve cost optimisation as pricing for PUTs are by 25KB payload “chunks”.
The demo application makes use of the Nodejs Kinesis Aggregation and Disaggregation Modules. A Kinesis record event consists of an array of Kinesis records in the structure below:
{
Records: [
{
kinesis: {
...
data: '...',
approximateArrivalTimestamp: 1593728523.059
},
...
}
]
};
Inside the handler of the AWS Lambda function, the records passed in are processed one at a time for each element in the array using the map()
function. Each record calls out to promiseDeaggregate
and then to processRecords
.
await Promise.all(
event.Records.map(async (kinesisRecord) => {
const records = await promiseDeaggregate(kinesisRecord.kinesis);
await processRecords(records);
})
);
``
The promiseDeaggregate
function uses the deaggregateSync
interface which handles the record aggregation, with each deaggregated record being returned as a resolved Promise
.
const promiseDeaggregate = (record) =>
new Promise((resolve, reject) => {
deagg.deaggregateSync(record, computeChecksums, (err, responseObject) => {
if (err) {
//handle/report error
return reject(err);
}
return resolve(responseObject);
});
});
Once returned, the record is then processed. This involves decoding the base64 encoded data. The payload is the actual Ion binary record published by QLDB to the stream. This is loaded into memory using ion-js
, and then any relevant processing can take place.
async function processRecords(records) {
await Promise.all(
records.map(async (record) => {
// Kinesis data is base64 encoded so decode here
const payload = Buffer.from(record.data, "base64");
// payload is the actual ion binary record published by QLDB to the stream
const ionRecord = ion.load(payload);
...
})
);
}
Ion values implement the strongly-typed dom.Value
interface, so you can use DOM methods to make it easy to work with in JavaScript
.
// retrieve the version and id from the metadata section of the message
const version = ionRecord.payload.revision.metadata.version.numberValue();
const id = ionRecord.payload.revision.metadata.id.stringValue();
In the case of the demo, the only record types processed were REVISION_DETAILS with all others being skipped.
// Only process records where the record type is REVISION_DETAILS
if (ionRecord.recordType.stringValue() !== REVISION_DETAILS) {
console.log(`Skipping record of type ${ion.dumpPrettyText(ionRecord.recordType)}`);
} else {
// process record
}
If a document has been deleted, there will be no data
section present in the REVISION_DETAILS
record. The metadata
section will still be present, which allows the id
to be retrieved.
// Check to see if the data section exists.
if (! ionRecord.payload.revision.data) {
Log.debug('No data section so handle as a delete');
...
} else {
const points = ionRecord.payload.revision.data.penaltyPoints.numberValue();
const postcode = ionRecord.payload.revision.data.postcode.stringValue();
...
}
Handling duplicate and out-of-order records
The following statements about QLDB are set out in the AWS developer guide:
QLDB streams provide an at-least-once delivery guarantee. Each data record that is produced by a QLDB stream is delivered to Kinesis Data Streams at least once. The same records can appear in a Kinesis data stream multiple times. So you must have deduplication logic in the consumer application layer if your use case requires it.
There are also no ordering guarantees. In some circumstances, QLDB blocks and revisions can be produced in a Kinesis data stream out of order.
Each BLOCK record includes the blockAddress
:
blockAddress: {
strandId: "GJMmYanMuDRHevK9X6MX3h",
sequenceNo: 3
}
This details the sequence number of the block within the ledger. As QLDB is immutable, each block gets appended to the end of the journal.
Each REVISION_DETAILS record includes the version
number of the document in the metadata
section. Each document uses an incrementing version number with the creation of the record being version 0
.
If necessary, the use of one or both of these values can help to handle duplicate or out-of-order records.
Streaming data from QLDB to DynamoDB
The source code for streaming data from QLDB to DynamoDB can be found here. For more details about streaming data from QLDB to DynamoDB, see this blog post
As the full document revision is sent each time, creates and updates to DynamoDB can be handled by an upsert. To make this work, the id
and version
of the document that is contained in the metadata
section are used. The id
is used as the primary key, and the version
as an attribute of the item. The upsert
code sample is shown below:
const updateLicence = async (id, points, postcode, version) => {
const params = {
TableName: TABLE_NAME,
Key: { pk: id },
UpdateExpression: 'set penaltyPoints=:points, postcode=:code, version=:version',
ExpressionAttributeValues: {
':points': points,
':code': postcode,
':version': version,
},
ConditionExpression: 'attribute_not_exists(pk) OR version < :version',
};
try {
await dynamodb.update(params).promise();
} catch(err) {
Log.error(`Unable to update licence: ${id}. Error: ${err}`);
}
};
The critical part is the ConditionExpression
. This specifies that the item will be created ONLY if one of the following conditions is true:
- There is no existing item with this
id
as the primary key (to allow for creates), OR - The version is greater than the value of the current version attribute
If neither of these conditions are true, then no update will take place, and the following error is thrown:
ConditionalCheckFailedException: The conditional request failed
There is a challenge with deletes. This is because an older update document revision may arrive after the delete revision has been received. This maybe the case when a QLDB stream is first created and streams all current revisions of documents to date. In the demo, we handled this by using a concept of a ‘soft delete’ or ‘tombstone’, by marking the item with an isDeleted
attribute but without actually deleting it. This means we use an update
and not a delete
.
const params = {
TableName: TABLE_NAME,
Key: { pk: id },
UpdateExpression: 'set version=:version, isDeleted=:isDeleted',
ExpressionAttributeValues: {
':version': version,
':isDeleted': true,
},
ConditionExpression: 'attribute_not_exists(pk) OR version < :version',
};
Streaming data from QLDB to Elasticsearch
For more details about streaming data from QLDB to Elasticsearch, see this blog post