Skip to content

Streaming Data to OpenSearch

In previous posts I described how to:

This was all created in a source code repository that anyone could try out for themselves. Recently, Sheen Brisals wrote a great post on "Why Serverless Teams should embrace Continuous Refactoring".

Given that, I thought I would go back and update the repository, in line with new features and changes over the past 12 months:

  • Updating the QLDB permissions mode to STANDARD
  • Implementing fine grained access control for all functions accessing QLDB
  • Updating to the latest version of Nodejs
  • Switching from webpack to esbuild for tree shaking
  • Migrating from ElasticSearch to OpenSearch
  • Configuring Amazon Cognito authentication for OpenSearch Dashboards
  • Implementing custom metrics

This blog post focuses on the last three updates.

Amazon OpenSearch Service

AWS announced on 8th Sept 2021 that they had renamed Amazon ElasticSearch Service to Amazon OpenSearch Service. This is an Apache 2.0-licensed fork of ElasticSearch that is community-driven and open source.

In the previous deployment, ElasticSearch was configured to run within your VPC. This is still the recommended approach from a security standpoint. However, to make things simpler for people to get up and running, I wanted to deploy OpenSearch with a public endpoint instead. In addition, I wanted to protect access to OpenSearch Dashboards using Amazon Cognito.

The full source code can be found at QLDB Simple Demo, but lets walk through the main steps involved.

Setting up Amazon Cognito

The first thing to setup in our serverless.yml file is the Cognito User Pool. This provides the user directory to control access to OpenSearch Dashboards. The setup below asks for name and email attributes at sign up, as well as a username and password. The email will be verified by entering a code that is sent to the specified email address.

    Type: AWS::Cognito::UserPool
        CaseSensitive: false
        - email
    UserPoolName: !Sub qldb-demo-user-pool
        - Name: email
        AttributeDataType: String
        Mutable: false
        Required: true
        - Name: name
        AttributeDataType: String
        Mutable: true
        Required: true

The next step is the UserPoolDomain. This provides a domain name to be used as part of the hosted UI.

    Type: AWS::Cognito::UserPoolDomain 
    UserPoolId: !Ref OSUserPool 
    Domain: "qldb-opensearch-demo"

After this, we define the Cognito Identity Pool. We use this to provide a way to grant temporary AWS credentials for users. This is necessary, as to support a public endpoint on the OpenSearch domain, we need to enable fine-grained access control or apply a restrictive access policy. We don't configure the CognitoIdentityProviders, as these will be created for us by the OpenSearch service.

    Type: AWS::Cognito::IdentityPool
    IdentityPoolName: qldb-demo-identity-pool
    AllowUnauthenticatedIdentities: true

Next we create two roles, one for an authenticated identity, and one for an unauthenticated identity. The one for the authenticated identity is shown below:

    Type: "AWS::IAM::Role"
        Version: "2012-10-17"
        - Effect: "Allow"
            Federated: ""
            - "sts:AssumeRoleWithWebIdentity"
                "": !Ref OSIdentityPool
                "": authenticated

The above is a trust policy for an authenticated role. It allows a federated user from (the issuer of the OpenID Connect token) to assume this role. It also places a condition, that restricts the aud of the token (the client ID of the relying party) to be the Cognito Identity Pool, and the amr of the token contains the value authenticated. When Amazon Cognito creates a token, it will set the amr of the token to be either unauthenticated or authenticated. There is no policy attached to this identity, as we are going to control access to OpenSearch through a resource policy attached to the OpenSearch domain.

After defining the two roles with the associated policies, we map them to the Identity Pool using an IdentityPoolRoleAttachment

    Type: "AWS::Cognito::IdentityPoolRoleAttachment"
    IdentityPoolId: !Ref OSIdentityPool
        authenticated: !GetAtt CognitoAuthorizedRole.Arn
        unauthenticated: !GetAtt CognitoUnAuthorizedRole.Arn

Then its time to define a role that the OpenSearch service can assume, that includes permissions to configure the Cognito user and identity pools and use them for authentication. This can be done using the AmazonOpenSearchServiceCognitoAccess AWS-managed policy as shown below:

    Type: 'AWS::IAM::Role'
    RoleName: 'CognitoAccessForAmazonOpenSearch'
        Version: '2012-10-17'
        - Effect: Allow
            - 'sts:AssumeRole'
    Path: "/"
        - arn:aws:iam::aws:policy/AmazonOpenSearchServiceCognitoAccess

Finally, its time to create the OpenSearch domain, with the CognitoOptions referencing the role defined above, as well as the Cognito user and identity pool.

    Type: AWS::OpenSearchService::Domain
    DomainName: "#{AWS::StackName}"
    EngineVersion: 'OpenSearch_1.0'
        Enabled: true
        IdentityPoolId: !Ref OSIdentityPool
        RoleArn: !GetAtt OSCognitoRole.Arn
        UserPoolId: !Ref OSUserPool
        Version: "2012-10-17"
        - Effect: Allow
                - !GetAtt CognitoAuthorizedRole.Arn
            - es:ESHttpGet
            - es:ESHttpPost
            Resource: arn:aws:es:${self:provider.region}:#{AWS::AccountId}:domain/#{AWS::StackName}/*
        - Effect: Allow
                AWS: "*"
            Action: es:ESHttp*
            Resource: arn:aws:es:${self:provider.region}:#{AWS::AccountId}:domain/#{AWS::StackName}/*
                    "aws:PrincipalArn": arn:aws:iam::#{AWS::AccountId}:role/qldb-streams-es-lambda-role-${self:provider.stage}

All access to the domain is controlled through the resource policy that is applied. The first statement allows the authenticated Cognito user to access the domain. The second statement allows access to the domain from the execution role attached to the AWS Lambda function. You might notice that this is defined in a different way. A circular dependency exists, as the Lambda function needs the OpenSearch domain endpoint which is set up as an environment variable. Using a condition and the aws:PrincipalArn key allows you to compare the ARN of the principal that made the request with the ARN specified in the policy at runtime, rather than at deployment time which otherwise failed.

Accessing OpenSearch Dashboard

Now the stack is deployed, we can access the OpenSearch Dashboard. The easiest place to start is by launching the Hosted UI. You can find the link in the Cognito User Pool under App Client Settings:

Cognito Launch HostedUI

This will allow you to sign up and verify your new account with a code sent to your email address. Once verified and signed in, you can click the heading to visualise and analyse your data.

OpenSearch Dashboards

From here, click the button to add your data:


Next, create an index pattern. If you are using the sample applications, then the index pattern is simply licence:


Now, you can go into Dev Tools and run queries, use metrics aggregation and combine filter and query contexts amongst other features. You can also create visualisations from the raw data in OpenSearch.

Custom CloudWatch Metrics

In a previous blog post on tips to prevent a serverless wreck, I strongly advocated the use of metrics to monitor an application. The CloudWatch Embedded Metric Format allows you to create custom metrics, that can be used for visualisations and alarming for real-time incident detection.

In this sample application, an AWS Lambda function is used to consume an aggregated set of records from a Kinesis Data Stream, and write any new records (inserts, updates or deletes) to OpenSearch. Each request to OpenSearch hits a REST API endpoint, and returns an HTTP Status Code. We can use the returned HTTP Status Code as a custom metric as follows:

    const { createMetricsLogger, Unit } = require('aws-embedded-metrics');

    const metrics = createMetricsLogger();
    metrics.setNamespace('OpenSearch Status Codes');
    metrics.putDimensions({ StatusCode: `HTTP_${statusCode}` });
    metrics.putMetric('ProcessedRecords', 1, Unit.Count);
    await metrics.flush();

This code sets the namespace to be 'OpenSearch Status Codes'. This is the name that will appear in custom namespaces in CloudWatch metrics. We use the returned status code as the dimension. A dimension is a name/value pair that is part of the identity of a metric. Each time we process a record, we use a count of 1 as the unit.

This results in a log entry in CloudWatch that looks similar to below:

    "LogGroup": "qldb-streams-es-dev",
    "ServiceName": "qldb-streams-es-dev",
    "ServiceType": "AWS::Lambda::Function",
    "StatusCode": "HTTP_200",
    "executionEnvironment": "AWS_Lambda_nodejs14.x",
    "memorySize": "512",
    "functionVersion": "$LATEST",
    "logStreamId": "2021/12/11/[$LATEST]6924daa324e8482099ebcad5c1168c9a",
    "_aws": {
        "Timestamp": 1639244821514,
        "CloudWatchMetrics": [
                "Dimensions": [
                "Metrics": [
                        "Name": "ProcessedRecords",
                        "Unit": "Count"
                "Namespace": "OpenSearch Status Codes"
    "ProcessedRecords": 1

When streaming records from Kinesis to OpenSearch, there were a handful of status codes commonly returned:

  • HTTP 200 - a successful update to an existing document with an HTTP POST
  • HTTP 201 - a successful insert of a new document, or completely overwriting an existing document with an HTTP PUT
  • HTTP 409 - an error where the version of the document attempted to be inserted is less than or equal to the one that already exists. This can happen as each data record produced by a QLDB stream is delivered to Kinesis at least once, the same records can appear multiple times, and there are no ordering guarantees. The sample application implements external versioning to resolve this.

After streaming a number of records to OpenSearch, we can create a simple CloudWatch Dashboard from the custom metric that looks as follows:



So there we have it. This post has shown how to quickly get up and running with OpenSearch, configuring Cognito to protect OpenSearch Dashboards, and set up a custom CloudWatch Metrics dashboard for monitoring the HTTP Status Codes that are returned. Please reach out if you have any feedback or suggestions.

To find out more, read the QLDB Guide, follow the curated list of articles, tools and resources at Awesome QLDB or try it out our online demo to see QLDB in action at QLDB Demo