Using an AWS S3 Store as a Source to Feed an MSK Topic
This article examines how you can load any data from AWS S3 into DeltaStream to enrich other DeltaStream objects, before writing the final data into any other supported data store.
Before you begin
You must already have an Amazon Web Services account
Creating the stream
Create a stream from the S3 file and point it to the Kafka topic
CREATE STREAM bronze_taxi_json (
"VendorID" BIGINT,
tpep_pickup_datetime BIGINT, -- epoch-seconds
tpep_dropoff_datetime BIGINT,
passenger_count DOUBLE,
trip_distance DOUBLE,
"RatecodeID" DOUBLE,
store_and_fwd_flag STRING,
"PULocationID" BIGINT,
"DOLocationID" BIGINT,
payment_type BIGINT,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
improvement_surcharge DOUBLE,
total_amount DOUBLE,
congestion_surcharge DOUBLE,
airport_fee DOUBLE)
WITH (
'store' = 'yellow-taxi-s3',
'timestamp'= 'tpep_pickup_datetime',
'value.format'='JSONL',
's3.uri' = 's3://s3-demo-bucket/yellow-taxi');
Tip You do not need to create a new stream for each s3 file. Instead, define the stream for a folder. The stream reads all the existing files and waits for new files to read as they arrive. Simply have the URI point to the folder; pointing to the folder watches all existing and future files. This is the default behavior.
From this point forward, you can treat this stream as you would any other stream in DeltaStream.
If your system experiences memory issues, here's how to adjust the size to get it to read your files:
CREATE STREAM yt_2023_01
WITH (
'value.format' = 'json',
'store' = 'demo-data',
'topic.partitions' = 1,
'topic.replicas' = 3)
AS SELECT *
FROM bronze_taxi_json;
Last updated