Using an AWS S3 Store as a Source to Feed an MSK Topic

This article examines how you can load any data from AWS S3 into DeltaStream to enrich other DeltaStream objects, before writing the final data into any other supported data store.

Before you begin

Creating the stream

  1. Create a stream from the S3 file and point it to the Kafka topic

CREATE STREAM bronze_taxi_json (
    "VendorID"             BIGINT,
    tpep_pickup_datetime    BIGINT,   -- epoch-seconds
    tpep_dropoff_datetime   BIGINT,
    passenger_count         DOUBLE,
    trip_distance           DOUBLE,
    "RatecodeID"            DOUBLE,
    store_and_fwd_flag      STRING,
    "PULocationID"          BIGINT,
    "DOLocationID"          BIGINT,
    payment_type            BIGINT,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    congestion_surcharge    DOUBLE,
    airport_fee             DOUBLE)
WITH (
    'store' = 'yellow-taxi-s3',
    'timestamp'= 'tpep_pickup_datetime',
    'value.format'='JSONL',
    's3.uri' = 's3://s3-demo-bucket/yellow-taxi');

From this point forward, you can treat this stream as you would any other stream in DeltaStream.

If your system experiences memory issues, here's how to adjust the size to get it to read your files:

CREATE STREAM yt_2023_01
WITH (
    'value.format' = 'json',
    'store' = 'demo-data',
    'topic.partitions' = 1,
    'topic.replicas' = 3)
AS SELECT *
FROM bronze_taxi_json;

Last updated