LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Before You Begin
  • Adding S3 as a DeltaStream Data Store
  • Define a DeltaStream Stream
  • Process Streaming Data and Sink to S3
  • Inspect the S3 Data Store
  1. Integrations
  2. Setting up Data Store Integrations

AWS S3

PreviousSetting up Data Store IntegrationsNextClickHouse

Last updated 23 days ago

Amazon Web Services Simple Storage Service (AWS S3) is a highy-scalable object storage service commonly used for business use cases involving very large amounts of data, such as data lakes, backup and restore, archival, and more..

This document walks you through setting up AWS S3 to use as a source data store in DeltaStream.

Before You Begin

Work with your internal engineering team to set up an AWS S3 account. You can start with

For this setup guide you must also have created a stream defined in DeltaStream labeled pageviews, which is backed by a topic in an Apache Kafka Store. More .

Adding S3 as a DeltaStream Data Store

  1. Open DeltaStream. In the lefthand navigation, click Resources ( ). The Resources page displays, with the Data Stores tab active.

  1. Click + Add Data Store, and when the Choose a Data Store window opens, click S3.

  1. Click Next. The Add Data Store window opens, displaying S3 fields you must complete:

  • Store Type – S3

  • Name – A name to identify your DeltaStream data store

  • Add One or More URIs To Connect – the URI for a bucket or folder

  • AWS Region – The region where the bucket containing your data resides.

  • Assume IAM Role ARN

  • IAM Role External ID

  • Static AWS Credentials (Not Recommended)

    • AWS Access Key

    • AWS Secret Access Key

  1. Click Add to create and save the data store. Your S3 store displays on the Resources page in your list of data stores.

Note You can also use the DeltaStream CLI to create an S3 data store (just called "store" in the CLI):

CREATE STORE s3_store WITH (
'type' = S3,
'aws.access_key_id' = 'abc',
'aws.secret_access_key' = 'xyz',
'uris' = 's3://ctan-playground-data/',
'aws.region' = 'AWS us-east-2'
);

Define a DeltaStream Stream

In this step, you create a stream called pageviews_cdc that is backed by data in a Kafka topic.

In the DeltaStream workspace, run the following SQL statement:

CREATE STREAM pageviews_s3 (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'store' = 's3_store',
's3.uri'='s3://your_bucket_uri/jsonl/',
's3.discovery.interval.seconds'=15,
'value.format'='jsonl'
);

Notes

value.format is optional..

s3.discovery.interval.seconds is optional. The default is 10 seconds.

Process Streaming Data and Sink to S3

For the steps below, assume you already have a stream called pageviews defined, which is backed by a topic in Kafka. Assume also there is an S3 store labelled S3_Test_Store. Now perform a simple filter on the pageviews stream and sink the results into S3:

SELECT * FROM pageviews_s3 WITH ('s3.discovery.interval.seconds'=5);

Notes:

  • Files are processed in alphanumeric order

  • New files are discovered based on 's3.discovery.interval.seconds' property

Inspect the S3 Data Store

  1. Click your S3 store to open the Data Store page and view its contents.

In the lefthand navigation, click Resources ( ). This displays a list of the existing stores.

the AWS S3 documentation.
details on creating a stream in DeltaStream