LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Interval Join (Stream/Stream)
  • Examples
  • Temporal Join (Stream/Changelog)
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query
  4. SELECT

JOIN

Syntax

FROM relation_reference
        [WITH(source_property = value [, ...])]
    join_type relation_reference
        [WITH(source_property = value [, ...])]
    [WITHIN size time_unit]
    ON join_criteria]

Description

JOIN is the SQL operator to combine records from two Relations according to a given join criteria. Join criteria, specified as the ON clause, can only be a Boolean comparison expression that declares equality of a pair of matching columns, one from each Relation, e.g. left.col = right.col. A Relation alias can be specified for each Relation in the join clause. The alias can then be used to refer to the Relation in other clauses in the query, such as SELECT or WHERE clauses. This is useful in case referred Relations have duplicate column names. DeltaStream has support for LEFT JOIN, INNER JOIN, and FULL JOIN. Currently, two types of join are supported: interval join and temporal join.

Arguments

Join Types

Join Type
Description

JOIN

Returns records that have matching values in both Relations.

INNER JOIN

Same as JOIN.

FULL JOIN

Returns all records when there is a match in either the left or right Relation.

FULL OUTER JOIN

Same as FULL JOIN.

LEFT JOIN

Returns all records from the left Relation and the matched records from the right Relation.

LEFT OUTER JOIN

Same as LEFT JOIN.

RIGHT JOIN

Not supported.

RIGHT OUTER JOIN

Not supported.

Interval Join (Stream/Stream)

Interval join combines records coming from two Streams according to a join condition and a time dimension. The ON clause defines the join condition in the form of equality of two columns from the source Streams. The time dimension is specified using the WITHIN clause, which defines a time interval. Two records are matching if they have the same value for referred columns in the join criteria and the difference between their timestamps is valid according to the time interval in the WITHIN clause. For interval joins, INNER, LEFT, and FULL join types are supported.

Examples

Join two Streams

           orders                               shipments
+-----------+--------+--------+  +--------------+---------+-----------+
| ordertime | itemid | price  |  | shipmenttime | orderid |      city |
+-----------+--------+--------+  +--------------+---------+-----------+
|      2000 |      2 |  18.23 |  |         6000 |       2 | Palo Alto |
|      3000 |      5 | 187.88 |  |        14000 |       5 |   Chicago |
|     10000 |      2 |  52.32 |  |        15000 |       2 |  New York |
|     12000 |      5 |  74.99 |  +--------------+---------+-----------+
|     13000 |      4 | 892.54 |
|     18000 |      2 | 123.90 |
|     23000 |      5 |  40.44 |
|     26000 |      2 |  12.88 |
+-----------+--------+--------+

When the following SELECT statement is run:

SELECT 
  o.ordertime,
  o.itemid AS oid, 
  s.orderid AS sid, 
  o.price, 
  s.city AS shipment_city 
FROM 
  orders o WITH ('timestamp'='ordertime')
  JOIN shipments s WITH ('timestamp'='shipmenttime')
  WITHIN 10 seconds ON o.itemid = s.orderid;

We get the following output:

Output:
+-----------+-----+-----+--------+---------------+
| ordertime | oid | sid | price  | shipment_city |
+-----------+-----+-----+--------+---------------+
|      2000 |   2 |   2 |  18.23 |     Palo Alto |
|     10000 |   2 |   2 |  52.32 |     Palo Alto |
|     10000 |   2 |   2 |  52.32 |      New York |
|     12000 |   5 |   5 |  74.99 |       Chicago |
|     18000 |   2 |   2 | 123.90 |      New York |
|     23000 |   5 |   5 |  40.44 |       Chicago |
+-----------+-----+-----+--------+---------------+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this join query. Since this is a streaming query, more records will be outputted as data arrives in the source. Notice that not every record from orders is joined and outputted. The reason for this is because the join type is JOIN and the join condition is that the orders record’s itemid matches the shipments record’s orderid within 10 seconds of the orders record’s ordertime. For example, the second orders record has an ordertime of 3000, but the only shipments record with the corresponding orderid has a shipmenttime of 14000, which is not within 10 seconds of the ordertime. Similarly, the orders record with itemid=2 and ordertime=26000 is not within 10 seconds of either of the shipments records with the corresponding orderid. However, the orders record with itemid=2 and ordertime=10000 is within 10 seconds of both shipments records with a corresponding orderid, so the orders record joins on both shipments records and produces 2 records in the output, one for each successful join.

Interval self-join

The following query performs a temporal join, joining the pageviews Stream with itself when the pageid values match and the userid values don’t match within 5 minutes of the left Relation’s timestamp. Essentially, the query finds all the pairs of unique users that visited the same pageid within 5 minutes of the event’s timestamp.

SELECT 
  a.userid AS userid1, 
  b.userid AS userid2, 
  a.pageid AS pgid 
FROM
  pageviews a WITH ('timestamp' = 'viewtime')
  JOIN pageviews b WITH ('timestamp' = 'viewtime')
  WITHIN 5 minutes ON a.pageid = b.pageid
WHERE a.userid != b.userid;

Temporal Join (Stream/Changelog)

A temporal join combines records coming from a Stream with a Changelog according to a join condition on records’ timestamps. A join condition is defined using the ON clause and specifies the equality of a pair of columns, one from the Stream side and one from the Changelog. Any column or field can be picked from the Stream, as long as it is defined with the NOT NULL constraint. The column from the Changelog has to be the primary column. Records’ timestamps are used to correlate records from the Stream side with the relevant version of a record from the Changelog side. For temporal joins, only INNER and LEFT join types are supported.

Examples

Join a Stream and Changelog

         pageviews                                   users_log
+----------+--------+---------+  +--------------+--------+----------+-----------------+
| viewtime | userid | pageid  |  | registertime | userid | regionid |    interests    |
+----------+--------+---------+  +--------------+--------+----------+-----------------+
|     2000 | User_2 | Page_34 |  |         6000 | User_2 | Region_1 | [News, Movies]  |
|     3000 | User_5 | Page_67 |  |        14000 | User_5 | Region_1 | [Games, Sports] |
|    10000 | User_2 | Page_90 |  |        15000 | User_2 | Region_1 | [Sports, News]  |
|    12000 | User_5 | Page_34 |  +--------------+--------+----------+-----------------+
|    13000 | User_4 | Page_89 |
|    18000 | User_2 | Page_21 |
|    23000 | User_5 | Page_76 |
|    26000 | User_2 | Page_23 |
+----------+--------+---------+

When the following SELECT statement is run:

SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p WITH ('timestamp'='viewtime')
  JOIN users_log u WITH ('timestamp'='registertime')
  ON u.userid = p.userid;

We get the following output:

Output:
+----------+--------+--------+---------+--------------+
| viewtime |  pvid  |  uid   | pageid  | top_interest |
+----------+--------+--------+---------+--------------+
|    10000 | User_2 | User_2 | Page_90 | News         |
|    18000 | User_2 | User_2 | Page_21 | Sports       |
|    23000 | User_5 | User_5 | Page_76 | Games        |
|    26000 | User_2 | User_2 | Page_23 | Sports       |
+----------+--------+--------+---------+--------------+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this join query. Since this is a streaming query, more records will be outputted as data arrives in the source. Notice that not every record from pageviews is joined and outputted. The reason for this is because the join type is JOIN and the join condition is that the pageviews record’s userid matches the users_log record’s userid at the event timestamp. For example, the first pageviews record has a viewtime of 2000, but there has yet to be any user_logs records with registertime <= 2000. Thus, there is no corresponding record to join with and nothing is returned. Notice the pageviews record with userid=User_2 and viewtime=10000 matches with the users_log record with userid=User_2 and registertime=6000, while the pageviews record with userid=User_2 and viewtime=18000 matches with the users_log record with userid=User_2 and registertime=15000. Think of the users_log Changelog as a constantly updating state of the world with a userid as the PRIMARY KEY. In this example, for each pageviews record, we are looking up the current information in the users_log Changelog for the corresponding userid. At timestamp 10000, User_2 has a registered top_interest as News. At timestamp 15000, User_2’s top_interest changes to Sports, so for pageviews records with userid=User_2 with a timestamp > 15000, the joined top interest will report Sports instead of News.

Temporal join with separate source properties

The following performs a temporal join between Stream pageviews and Changelog Users on equal userid values. The Stream on the left side of the join will start ingesting data starting from the epoch timestamp 1675225325286, and the right side of the join will start ingesting data from the earliest available records.

SELECT 
  p.userid AS "PvID", 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p
  WITH (
    'starting.position' = 'timestamp', 
    'starting.position.timestamp.ms' = '1675225325286',
    'timestamp' = 'viewtime'
  ) 
JOIN "Users" u WITH ( 'starting.position' = 'earliest' )
ON u.userid = p.userid;

Temporal join on columns with non-primitive data types

The following shows an example of temporal join using a join criteria defined on a pair of columns whose data type is STRUCT (i.e. non-primitive). The example includes the DDL statements for the Changelog and Stream relations, along with the temporal join's query. Note that the contactinfo column in the users Stream is defined with the NOT NULL constraint. This requirement is essential for temporal join, as both columns involved in the join criteria must not contain NULL values.

-- DDL for the Changelog
CREATE CHANGELOG users_log (
  registertime BIGINT, 
  userid VARCHAR, 
  interests ARRAY<VARCHAR>, 
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, 
  PRIMARY KEY(contactinfo)
) WITH (
  'topic'='users_log', 
  'value.format'='json');

-- DDL for the Stream
CREATE STREAM "users" (
  registertime BIGINT,
  userid VARCHAR, 
  interests ARRAY<VARCHAR>,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL)
WITH (
  'topic'='users', 
  'key.format'='json', 
  'key.type'='STRUCT<userid VARCHAR>', 
  'value.format'='json');
  
-- Temporal Join Query
SELECT
  l.userid AS uid, 
  l.contactinfo AS contact,
  r.interests AS hobbies
FROM users l JOIN users_log r 
ON l.contactinfo = r.contactinfo;

Temporal join with idle Changelog side

The following example performs a temporal join between the pageviews stream and the users_log Changelog. The source idleness property is set to 120 seconds for the Changelog side. This means that if users_log stops receiving updates, it will be marked as an idle source after 2 minutes and excluded from the watermark computation in the JOIN operator.

SELECT 
  p.userid,
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p
  JOIN users_log u WITH('source.idle.timeout.millis'=120000)
  ON u.userid = p.userid;
PreviousFROMNextMATCH_RECOGNIZE

Last updated 5 days ago

Given orders and shipments s with the following example records:

If the Changelog side of the join (i.e. the enrichment side) does not continuously ingest updates, this will cause its watermark not advancing, turning it into an idle source. In this case, the join operator stops producing results, even when the stream side is continuously receiving new records, as the operator is waiting for the watermarks on both sides to advance. One way to unblock the join operator is by marking the Changelog side as an idle source and exclude it from the watermark computation. You can do this when writing the query by configuring the source property source.idle.timeout.millis for the Changelog side. Check for more details on this property. Check examples below to see how it can be used.

Given pageviews and users_log with a PRIMARY KEY of userid with the following example records:

#_stream
Stream
Changelog
Source Properties