AWS S3

Amazon Web Services Simple Storage Service (AWS S3) is a highy-scalable object storage service commonly used for business use cases involving very large amounts of data, such as data lakes, backup and restore, archival, and more..

This document walks you through setting up AWS S3 to use as a source data store in DeltaStream.

Before You Begin

Work with your internal engineering team to set up an AWS S3 account. You can start with the AWS S3 documentation.

For this setup guide you must also have created a stream defined in DeltaStream labeled pageviews, which is backed by a topic in an Apache Kafka Store. More details on creating a stream in DeltaStream.

Adding S3 as a DeltaStream Data Store

  1. Open DeltaStream. In the lefthand navigation, click Resources ( ). The Resources page displays, with the Data Stores tab active.

  1. Click + Add Data Store, and when the Choose a Data Store window opens, click S3.

  1. Click Next. The Add Data Store window opens, displaying S3 fields you must complete:

  • Store Type – S3

  • Name – A name to identify your DeltaStream data store

  • Add One or More URIs To Connect – the URI for a bucket or folder

  • AWS Region – The region where the bucket containing your data resides.

  • Assume IAM Role ARN

  • IAM Role External ID

  • Static AWS Credentials (Not Recommended)

    • AWS Access Key

    • AWS Secret Access Key

  1. Click Add to create and save the data store. Your S3 store displays on the Resources page in your list of data stores.

Note You can also use the DeltaStream CLI to create an S3 data store (just called "store" in the CLI):

CREATE STORE s3_store WITH (
'type' = S3,
'aws.access_key_id' = 'abc',
'aws.secret_access_key' = 'xyz',
'uris' = 's3://ctan-playground-data/',
'aws.region' = 'AWS us-east-2'
);

Define a DeltaStream Stream

In this step, you create a stream called pageviews_cdc that is backed by data in a Kafka topic.

In the DeltaStream workspace, run the following SQL statement:

CREATE STREAM pageviews_s3 (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'store' = 's3_store',
's3.uri'='s3://your_bucket_uri/jsonl/',
's3.discovery.interval.seconds'=15,
'value.format'='jsonl'
);

Notes

value.format is optional..

s3.discovery.interval.seconds is optional. The default is 10 seconds.

Process Streaming Data and Sink to S3

For the steps below, assume you already have a stream called pageviews defined, which is backed by a topic in Kafka. Assume also there is an S3 store labelled S3_Test_Store. Now perform a simple filter on the pageviews stream and sink the results into S3:

SELECT * FROM pageviews_s3 WITH ('s3.discovery.interval.seconds'=5);

Notes:

  • Files are processed in alphanumeric order

  • New files are discovered based on 's3.discovery.interval.seconds' property

Inspect the S3 Data Store

  1. In the lefthand navigation, click Resources ( ). This displays a list of the existing stores.

  2. Click your S3 store to open the Data Store page and view its contents.

Last updated