AWS S3

Amazon Web Services Simple Storage Service (AWS S3) is a highy-scalable object storage service commonly used for business use cases involving very large amounts of data, such as data lakes, backup and restore, archival, and more..

This document walks you through setting up AWS S3 to use as a source data store in DeltaStream.

Before You Begin

Work with your internal engineering team to set up an AWS S3 account. You can start with the AWS S3 documentation.

For this setup guide you must also have created a stream defined in DeltaStream labeled pageviews, which is backed by a topic in an Apache Kafka Store. More details on creating a stream in DeltaStream.

Adding S3 as a DeltaStream Data Store

  1. Open DeltaStream. In the lefthand navigation, click Resources ( ). The Resources page displays, with the Data Stores tab active.

  1. Click + Add Data Store, and when the Choose a Data Store window opens, click S3.

  1. Click Next. The Add Data Store window opens, displaying S3 fields you must complete:

  • Store Type – S3

  • Name – A name to identify your DeltaStream data store

  • Add One or More URIs To Connect – the URI for a bucket or folder

  • AWS Region – The region where the bucket containing your data resides.

  • Assume IAM Role ARN

  • IAM Role External ID

  • Static AWS Credentials (Not Recommended)

    • AWS Access Key

    • AWS Secret Access Key

  1. Click Add to create and save the data store. Your S3 store displays on the Resources page in your list of data stores.

Note You can also use the DeltaStream CLI to create an S3 data store (just called "store" in the CLI):

CREATE STORE s3_store WITH (
'type' = S3,
'aws.iam_role_arn' = 'arn:aws:iam::123456789012:role/S3StoreDeltaStreamRole',
'aws.iam_external_id' = '12342f59e61ef7a34f9822a6226e5857',
'uris' = 's3://s3-data-bucket/',
'aws.region' = 'AWS us-east-2'
);

IAM Role Requirements for S3 Stores

To allow the DeltaStream platform to access your S3 bucket using an IAM Role, you must add a trust relationship to a DeltaStream IAM Role to the IAM Role you’ve configured for S3 access. This trust relationship allows DeltaStream to assume the role securely.

Please contact DeltaStream Support to obtain a DeltaStream platform specific IAM Role ARN for your organization. Once you have it, add the following trust policy statement to your IAM Role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "TrustPolicy",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::{DELTASTREAM_PLATFORM_ACCOUNT_ID}:role/{DELTASTREAM_ACCOUNT_ROLE}",
                ]
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "12342f59e61ef7a34f9822a6226e5857"
                }
            }
        }
    ]
}

Note: Be sure to replace {DELTASTREAM_PLATFORM_ACCOUNT_ID} and {DELTASTREAM_ACCOUNT_ROLE} with the actual values provided by DeltaStream Support. Also, ensure that value provided for "sts:ExternalId" condition matches the one you will pass as 'aws.iam_external_id' when creating DeltaStream S3 STORE.

Recommended IAM Permissions for S3 Store

In addition to the trust policy you will need to ensure that the IAM role specified in the CREATE STORE statement for an S3-type store has sufficient permissions to read from and write to the designated S3 bucket.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket-name",
        "arn:aws:s3:::your-bucket-name/*"
      ]
    }
  ]
}

Define a DeltaStream Stream

In this step, you create a stream called pageviews_cdc that is backed by data on S3.

The S3 source periodically discovers file splits under the configured paths and emits them for processing. Discovery interval controls how often the connector looks for new S3 objects. On each run it lists keys in the service’s server-side order (for general-purpose buckets, that’s lexicographic by key) and emits files to the job, enforcing a configurable maximum splits per interval cap. The connector tracks an internal watermark: the last accepted key for each configured prefix. Therefore, any key less than or equal to that watermark is considered already processed and is skipped on later runs (and after restarts). To ensure new data is picked up reliably in long-running jobs, write files under key prefixes that increase lexicographically over time (e.g., YYYY/MM/DD/HH/) and use zero-padded sequence parts (e.g., part-000001.jsonl). Avoid backfilling into older, lexicographically smaller prefixes; if needed, place backfills under a new, higher-sorting path so they appear after the currently existing content. Check S3-Specific Parameters to see how you can configure the discovery interval and max cap on file splits.

In the DeltaStream workspace, run the following SQL statement:

CREATE STREAM pageviews_s3 (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'store' = 's3_store',
's3.uri'='s3://your_bucket_uri/jsonl/',
's3.discovery.interval.seconds'=15,
'value.format'='jsonl'
);

Notes

value.format can be jsonl or json.

s3.discovery.interval.seconds is optional. The default is 10 seconds.

Process Streaming Data and Sink to S3

For the steps below, assume you already have a stream called pageviews defined, which is backed by a topic in Kafka. Assume also there is an S3 store labelled S3_Test_Store. Now perform a simple filter on the pageviews stream and sink the results into S3:

SELECT * FROM pageviews_s3 WITH ('s3.discovery.interval.seconds'=5);

Notes:

  • Files are processed in alphanumeric order

  • New files are discovered based on 's3.discovery.interval.seconds' property

Inspect the S3 Data Store

  1. In the lefthand navigation, click Resources ( ). This displays a list of the existing stores.

  2. Click your S3 store to open the Data Store page and view its contents.

Last updated