# JOIN

## Syntax

```sql
FROM relation_reference
        [WITH(source_property = value [, ...])]
    join_type relation_reference
        [WITH(source_property = value [, ...])]
    [WITHIN size time_unit]
    ON join_criteria]
```

## Description

`JOIN` is the SQL operator to combine records from two Relations according to a given join criteria. Join criteria, specified as the `ON` clause, can only be a Boolean comparison expression that declares equality of a pair of matching columns, one from each Relation, e.g. `left.col = right.col`. A Relation alias can be specified for each Relation in the join clause. The alias can then be used to refer to the Relation in other clauses in the query, such as `SELECT` or `WHERE` clauses. This is useful in case referred Relations have duplicate column names. DeltaStream has support for `LEFT JOIN`, `INNER JOIN`, and `FULL JOIN`. Currently, two types of join are supported: interval join and temporal join.

### Arguments

#### Join Types

| Join Type          | Description                                                                                 |
| ------------------ | ------------------------------------------------------------------------------------------- |
| `JOIN`             | Returns records that have matching values in both Relations.                                |
| `INNER JOIN`       | Same as `JOIN`.                                                                             |
| `FULL JOIN`        | Returns all records when there is a match in either the left or right Relation.             |
| `FULL OUTER JOIN`  | Same as `FULL JOIN`.                                                                        |
| `LEFT JOIN`        | Returns all records from the left Relation and the matched records from the right Relation. |
| `LEFT OUTER JOIN`  | Same as `LEFT JOIN`.                                                                        |
| `RIGHT JOIN`       | Not supported.                                                                              |
| `RIGHT OUTER JOIN` | Not supported.                                                                              |

## Interval Join (Stream/Stream)

Interval join combines records coming from two Streams according to a join condition and a time dimension. The `ON` clause defines the join condition in the form of equality of two columns from the source Streams. The time dimension is specified using either the `WITHIN` clause, which defines a time interval, or via predicates in the `ON` clause that bounds the event time on both sides. Two records are matching if they have the same value for referred columns in the join criteria and the difference between their timestamps is valid according to the time interval specified in the `WITHIN` clause or event time bounding predicates in the `ON` clause. For interval joins, `INNER`, `LEFT`, and `FULL` join types are supported.

### Examples

#### Join two Streams

Given `orders` and `shipments` [#\_stream](https://docs.deltastream.io/overview/core-concepts/databases#_stream "mention")s with the following example records:

```
           orders                               shipments
+-----------+--------+--------+  +--------------+---------+-----------+
| ordertime | itemid | price  |  | shipmenttime | orderid |      city |
+-----------+--------+--------+  +--------------+---------+-----------+
|      2000 |      2 |  18.23 |  |         6000 |       2 | Palo Alto |
|      3000 |      5 | 187.88 |  |        14000 |       5 |   Chicago |
|     10000 |      2 |  52.32 |  |        15000 |       2 |  New York |
|     12000 |      5 |  74.99 |  +--------------+---------+-----------+
|     13000 |      4 | 892.54 |
|     18000 |      2 | 123.90 |
|     23000 |      5 |  40.44 |
|     26000 |      2 |  12.88 |
+-----------+--------+--------+
```

When the following `SELECT` statement is run:

```sql
SELECT 
  o.ordertime,
  o.itemid AS oid, 
  s.orderid AS sid, 
  o.price, 
  s.city AS shipment_city 
FROM 
  orders o WITH ('timestamp'='ordertime')
  JOIN shipments s WITH ('timestamp'='shipmenttime')
  WITHIN 10 seconds ON o.itemid = s.orderid;
```

We get the following output:

```
Output:
+-----------+-----+-----+--------+---------------+
| ordertime | oid | sid | price  | shipment_city |
+-----------+-----+-----+--------+---------------+
|      2000 |   2 |   2 |  18.23 |     Palo Alto |
|     10000 |   2 |   2 |  52.32 |     Palo Alto |
|     10000 |   2 |   2 |  52.32 |      New York |
|     12000 |   5 |   5 |  74.99 |       Chicago |
|     18000 |   2 |   2 | 123.90 |      New York |
|     23000 |   5 |   5 |  40.44 |       Chicago |
+-----------+-----+-----+--------+---------------+
```

**NOTE:** The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this join query. Since this is a streaming query, more records will be outputted as data arrives in the source. Notice that not every record from `orders` is joined and outputted. The reason for this is because the join type is `JOIN` and the join condition is that the `orders` record’s `itemid` matches the `shipments` record’s `orderid` within 10 seconds of the `orders` record’s `ordertime`. For example, the second `orders` record has an `ordertime` of `3000`, but the only `shipments` record with the corresponding `orderid` has a `shipmenttime` of `14000`, which is not within 10 seconds of the `ordertime`. Similarly, the `orders` record with `itemid=2` and `ordertime=26000` is not within 10 seconds of either of the `shipments` records with the corresponding `orderid`. However, the `orders` record with `itemid=2` and `ordertime=10000` is within 10 seconds of both `shipments` records with a corresponding `orderid`, so the `orders` record joins on both `shipments` records and produces 2 records in the output, one for each successful join.

#### Interval self-join

The following query performs a temporal join, joining the `pageviews` Stream with itself when the `pageid` values match and the `userid` values don’t match within 5 minutes of the left Relation’s timestamp. Essentially, the query finds all the pairs of unique users that visited the same `pageid` within 5 minutes of the event’s timestamp.

```sql
SELECT 
  a.userid AS userid1, 
  b.userid AS userid2, 
  a.pageid AS pgid 
FROM
  pageviews a WITH ('timestamp' = 'viewtime')
  JOIN pageviews b WITH ('timestamp' = 'viewtime')
  WITHIN 5 minutes ON a.pageid = b.pageid
WHERE a.userid != b.userid;
```

## Temporal Join (Stream/Changelog)

A temporal join combines records coming from a Stream with a Changelog according to a join condition on records’ timestamps. A join condition is defined using the `ON` clause and specifies the equality of a pair of columns, one from the Stream side and one from the Changelog. Any column or field can be picked from the Stream, as long as it is defined with the `NOT NULL` constraint. The column from the Changelog has to be the primary column. Records’ timestamps are used to correlate records from the Stream side with the relevant version of a record from the Changelog side. For temporal joins, only `INNER` and `LEFT` join types are supported.

### Examples

#### Join a Stream and Changelog

Given `pageviews` [#stream](https://docs.deltastream.io/overview/core-concepts/databases#stream "mention") and `users_log` [#changelog](https://docs.deltastream.io/overview/core-concepts/databases#changelog "mention") with a `PRIMARY KEY` of `userid` with the following example records:

```
         pageviews                                   users_log
+----------+--------+---------+  +--------------+--------+----------+-----------------+
| viewtime | userid | pageid  |  | registertime | userid | regionid |    interests    |
+----------+--------+---------+  +--------------+--------+----------+-----------------+
|     2000 | User_2 | Page_34 |  |         6000 | User_2 | Region_1 | [News, Movies]  |
|     3000 | User_5 | Page_67 |  |        14000 | User_5 | Region_1 | [Games, Sports] |
|    10000 | User_2 | Page_90 |  |        15000 | User_2 | Region_1 | [Sports, News]  |
|    12000 | User_5 | Page_34 |  +--------------+--------+----------+-----------------+
|    13000 | User_4 | Page_89 |
|    18000 | User_2 | Page_21 |
|    23000 | User_5 | Page_76 |
|    26000 | User_2 | Page_23 |
+----------+--------+---------+
```

When the following `SELECT` statement is run:

```sql
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p WITH ('timestamp'='viewtime')
  JOIN users_log u WITH ('timestamp'='registertime')
  ON u.userid = p.userid;
```

We get the following output:

```
Output:
+----------+--------+--------+---------+--------------+
| viewtime |  pvid  |  uid   | pageid  | top_interest |
+----------+--------+--------+---------+--------------+
|    10000 | User_2 | User_2 | Page_90 | News         |
|    18000 | User_2 | User_2 | Page_21 | Sports       |
|    23000 | User_5 | User_5 | Page_76 | Games        |
|    26000 | User_2 | User_2 | Page_23 | Sports       |
+----------+--------+--------+---------+--------------+
```

**NOTE:** The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this join query. Since this is a streaming query, more records will be outputted as data arrives in the source. Notice that not every record from `pageviews` is joined and outputted. The reason for this is because the join type is `JOIN` and the join condition is that the `pageviews` record’s `userid` matches the `users_log` record’s `userid` at the event timestamp. For example, the first `pageviews` record has a `viewtime` of `2000`, but there has yet to be any `user_logs` records with `registertime <= 2000`. Thus, there is no corresponding record to join with and nothing is returned. Notice the `pageviews` record with `userid=User_2` and `viewtime=10000` matches with the `users_log` record with `userid=User_2` and `registertime=6000`, while the `pageviews` record with `userid=User_2` and `viewtime=18000` matches with the `users_log` record with `userid=User_2` and `registertime=15000`. Think of the `users_log` Changelog as a constantly updating state of the world with a `userid` as the `PRIMARY KEY`. In this example, for each `pageviews` record, we are looking up the current information in the `users_log` Changelog for the corresponding `userid`. At timestamp `10000`, `User_2` has a registered `top_interest` as `News`. At timestamp `15000`, `User_2`’s `top_interest` changes to `Sports`, so for `pageviews` records with `userid=User_2` with a timestamp > `15000`, the joined top interest will report `Sports` instead of `News`.

#### Temporal join with separate source properties

The following performs a temporal join between Stream `pageviews` and Changelog `Users` on equal `userid` values. The Stream on the left side of the join will start ingesting data starting from the epoch timestamp `1675225325286`, and the right side of the join will start ingesting data from the earliest available records.

```sql
SELECT 
  p.userid AS "PvID", 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p
  WITH (
    'starting.position' = 'timestamp', 
    'starting.position.timestamp.ms' = '1675225325286',
    'timestamp' = 'viewtime'
  ) 
JOIN "Users" u WITH ( 'starting.position' = 'earliest' )
ON u.userid = p.userid;
```

#### Temporal join on columns with non-primitive data types

The following shows an example of temporal join using a join criteria defined on a pair of columns whose data type is `STRUCT` (i.e. non-primitive). The example includes the DDL statements for the Changelog and Stream relations, along with the temporal join's query. Note that the `contactinfo` column in the `users` Stream is defined with the NOT NULL constraint. This requirement is essential for temporal join, as both columns involved in the join criteria must not contain NULL values.

```sql
-- DDL for the Changelog
CREATE CHANGELOG users_log (
  registertime BIGINT, 
  userid VARCHAR, 
  interests ARRAY<VARCHAR>, 
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, 
  PRIMARY KEY(contactinfo)
) WITH (
  'topic'='users_log', 
  'value.format'='json');

-- DDL for the Stream
CREATE STREAM "users" (
  registertime BIGINT,
  userid VARCHAR, 
  interests ARRAY<VARCHAR>,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL)
WITH (
  'topic'='users', 
  'key.format'='json', 
  'key.type'='STRUCT<userid VARCHAR>', 
  'value.format'='json');
  
-- Temporal Join Query
SELECT
  l.userid AS uid, 
  l.contactinfo AS contact,
  r.interests AS hobbies
FROM users l JOIN users_log r 
ON l.contactinfo = r.contactinfo;
```

#### Temporal join with idle Changelog side

The following example performs a temporal join between the `pageviews` stream and the `users_log` Changelog. The source idleness property is set to 120 seconds for the Changelog side. This means that if `users_log` stops receiving updates, it will be marked as an idle source after 2 minutes and excluded from the watermark computation in the JOIN operator.

```sql
SELECT 
  p.userid,
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p
  JOIN users_log u WITH ('source.idle.timeout.millis' = 120000)
  ON u.userid = p.userid;
```

## Regular Equi-Join (Stream/Stream)

A regular equi-join combines records from two Streams based on an equality condition defined in the `ON` clause. The `ON` clause specifies the join condition as an equality between one column from each input Stream. In a regular join, any new record or update on either side of the join is reflected in the entire join result. Since both input Streams are fully maintained in the job’s state, the required state size to compute the join result can grow indefinitely as more records are ingested into the input entities or produced as intermediate results. You can configure a time-to-live (TTL) for the query state using the `state.ttl.millis` property (Check [#query\_properties](https://docs.deltastream.io/reference/sql-syntax/query/select/..#query_properties "mention")). However, be aware that enabling a state TTL may affect the correctness or completeness of the query results. You can also use the source property `flink.sql.state.ttl` to set the idle state retention time for each source relation. This allows fine-grained control over state retention when sides of a join have different activity patterns. (Check [#source\_properties](https://docs.deltastream.io/reference/sql-syntax/query/from#source_properties "mention")). Regular equi-join supports the following join types: **INNER**, **LEFT OUTER**, and **FULL OUTER**.

### Examples

#### Regular join between two streams

The following query joins the `pageviews` Stream with the `users` Stream based on the equality of the `userid` column from each side. It enriches each `pageview` record with the corresponding user’s city information retrieved from the `users` Stream.

```sql
SELECT 
  p.viewtime,
  p.userid AS uid, 
  p.pageid, 
  u.contactinfo->city AS city
FROM 
  pageviews p JOIN users u 
  ON p.userid = u.userid;
```

#### Regular join with source TTL

In the following query, the source state TTL is used to set the idle state retention time for each source relation (Check [#source\_properties](https://docs.deltastream.io/reference/sql-syntax/query/from#source_properties "mention")). The join state for the `pageviews` is retained for 300 seconds, while the join state for the `users` side is retained for 2 hours.

```sql
SELECT *
FROM pageviews p WITH ('flink.sql.state.ttl' = '300s')
JOIN users u WITH ('flink.sql.state.ttl' = '2h')
WITHIN 5 MINUTES
ON p.userid = u.userid;
```

## Changelog Join (Changelog/Changelog)

A Changelog join combines records from two [upsert mode Changelogs](https://docs.deltastream.io/reference/ddl/create-changelog#changelog_parameters) based on an equality condition defined in the `ON` clause. Both sides of the join must be persisted upsert mode Changelogs: a changelog created with `enable.upsert.mode` set to `true` and backed by a physical entity (not a subquery or virtual relation). When either side receives an update to an existing primary key, the join result is recomputed for that key. Note that the join semantics follows the regular join semantics and not temporal join.\
The output of a Changelog join is itself an upsert mode Changelog. The PRIMARY KEY of the result is composed of the primary key columns from both sides that are included in the `SELECT` projection.

**Constraints**

* Both sides must be Changelog with `enable.upsert.mode` set to `true`.
* Both sides must be primitive (persisted) Changelogs - subqueries and virtual changelogs are not supported as join inputs.
* Only INNER JOIN is supported. LEFT JOIN, FULL JOIN, and other outer join types are not allowed.
* The WITHIN clause must not be specified. Changelog joins are not windowed.
* At least one side of the join must have all of its PRIMARY KEY columns referenced in the ON clause.
* All primary key columns must be included in the SELECT projection.
* Changelog joins are not supported in multi-way joins (3 or more relations).

### Examples

Given two upsert mode changelogs, `orders` and `customers`, defined as:

```sql
CREATE CHANGELOG orders (
    ts BIGINT,
    orderid VARCHAR,
    userid VARCHAR,
    price INTEGER,
    PRIMARY KEY(orderid)
) WITH (
    'value.format' = 'json',
    'enable.upsert.mode' = true
);
CREATE CHANGELOG customers (
    customer_since BIGINT,
    id VARCHAR,
    name VARCHAR,
    city VARCHAR,
    PRIMARY KEY(id)
) WITH (
    'value.format' = 'json',
    'enable.upsert.mode' = true
);
```

The following query joins `orders` with `customers` on the customer id. The resulting Changelog has `PRIMARY KEY(id, oid)` and is automatically created in upsert mode:

```sql
CREATE CHANGELOG enriched_orders AS
SELECT c.id, orderid AS oid, price, c.city
FROM orders o JOIN customers c
ON o.userid = c.id;
```

## Lookup Join (Stream/Table)

A lookup join combines records from a Stream with a Table based on an equality condition defined in the `ON` clause. The join condition must specify one column from each side that determines how records are matched. The lookup join operates using processing time when retrieving matching records. This means that as records arrive on the Stream side, their corresponding rows from the Table are fetched based on the Table’s contents at that exact moment in time.

Currently, lookup joins are supported for tables created on PostgreSQL entities.

### Examples

Lets assume the `users_info` Table, backed by a Postgres entity, is defined as:

```sql
CREATE TABLE users_info (
  userid VARCHAR,
  city VARCHAR, 
  score INTEGER) 
WITH ('store'='pgstore', 
      'postgresql.db.name'='info', 
      'postgresql.schema.name'='public', 
      'postgresql.table.name'='users_info');
```

The following query joins the `pageviews` Stream with the `users_info` Table based on the equality of the `userid` column from each side. It enriches each `pageview` record with the corresponding user’s city and the score information retrieved from the Postgres Table.

```sql
SELECT p.userid,
       p.pageid, 
       u.city, 
       u.score 
FROM pageviews p LEFT JOIN users_info u 
ON p.userid = u.userid;
```

## Multi-Way Join

A multi-way join combines records from three or more Relations in a single query statement by chaining multiple JOIN clauses. Instead of creating intermediate Streams for each two-way join, a multi-way join lets you express the entire enrichment or correlation logic in one statement. Multi-way joins support two semantics, determined by the types of the right-side Relations:

* **Temporal multi-way join**: A Stream joined to two or more Changelogs. Each Stream record is enriched with the latest matching row from each Changelog, using the same temporal join semantics as a two-way Temporal Join (Check [#temporal-join-stream-changelog](#temporal-join-stream-changelog "mention")).
* **Interval multi-way join**: A Stream joined to two or more Streams. Records are correlated across Streams within specified time windows, using the same interval join semantics as a two-way Interval Join (Check [#interval-join-stream-stream](#interval-join-stream-stream "mention")).

### Syntax

```sql
FROM stream_reference
        [WITH(source_property = value [, ...])]
    join_type relation_reference_1
        [WITH(source_property = value [, ...])]
    [WITHIN size time_unit]
    ON join_criteria_1
    join_type relation_reference_2
        [WITH(source_property = value [, ...])]
    [WITHIN size time_unit]
    ON join_criteria_2
    ...
```

### Constraints

* The left-most Relation must be a Stream (or a subquery over a Stream).
* All right-side Relations must be persisted Relations (not computed relations such as subqueries).
* All right-side Relations must be the same type: either all Streams (interval join) or all Changelogs (temporal join). Mixing Streams and Changelogs on the right side is not supported.
* Only `INNER JOIN` and `LEFT JOIN` are supported. FULL JOIN and RIGHT JOIN are not allowed in multi-way joins.
* Upsert-mode Changelogs (`'enable.upsert.mode' = true`) are not supported on the right side of a multi-way join.
* Changelog-to-Changelog joins are not supported in multi-way joins.
* Lookup joins (right side is a Table) are not supported in multi-way joins.
* Each join pair's `ON` clause must reference at least one column from the left-most (base) Relation and at least one column from the corresponding right-side Relation. The `ON` clause may additionally reference columns from previously joined Relations.
* For temporal join pairs, all primary key columns of the Changelog side must be included in the `ON` clause, following the same rules as a two-way Temporal Join [#temporal-join-stream-changelog](#temporal-join-stream-changelog "mention").
* The `WITHIN` clause is optional on each interval join pair; time-bounding may alternatively be expressed via predicates in the ON clause, following the same rules as a two-way Interval Join [#interval-join-stream-stream](#interval-join-stream-stream "mention").
* The output of a multi-way join is always a Stream.

### Examples

#### Temporal multi-way join: Stream with two Changelogs

The following query enriches each order from the `orders` Stream with the customer's region from the `users` Changelog and the product's name and price from the `products` Changelog. Each order is matched against the latest version of the corresponding user and product at the time of the order.

Given `orders` Stream, `users` Changelog with `PRIMARY KEY(userid)`, and `products` Changelog with `PRIMARY KEY(productid)` with the following example records:

```sql
             orders                                  users                           products
+------+---------+--------+-----------+-----+  +------+--------+----------+  +------+-----------+----------+-------+
|   ts | orderid | userid | productid | qty |  |   ts | userid |   region |  |   ts | productid |    pname | price |
+------+---------+--------+-----------+-----+  +------+--------+----------+  +------+-----------+----------+-------+
| 2000 | Order_1 | User_1 | Prod_1    |   3 |  | 1000 | User_1 | Region_A |  | 1000 | Prod_1    | Widget   |    10 |
| 3500 | Order_2 | User_1 | Prod_1    |   2 |  | 2000 | User_2 | Region_B |  | 1500 | Prod_2    | Gadget   |    20 |
| 5000 | Order_3 | User_1 | Prod_1    |   4 |  | 4000 | User_1 | Region_C |  | 3000 | Prod_1    | Widget_2 |    15 |
| 6000 | Order_4 | User_2 | Prod_2    |   2 |  +------+--------+----------+  +------+-----------+----------+-------+
+------+---------+--------+-----------+-----+
```

When the following query runs:

```sql
SELECT
  o.orderid,
  o.userid,
  u.region,
  p.productid,
  p.pname,
  (o.qty * p.price) AS total_price
FROM
  orders o WITH ('timestamp'='ts')
  JOIN users u WITH ('timestamp'='ts') ON o.userid = u.userid
  JOIN products p WITH ('timestamp'='ts') ON o.productid = p.productid;
```

We get the following output:

```
+---------+--------+----------+-----------+----------+-------------+
| orderid | userid |   region | productid |    pname | total_price |
+---------+--------+----------+-----------+----------+-------------+
| Order_1 | User_1 | Region_A | Prod_1    | Widget   |          30 |
| Order_2 | User_1 | Region_A | Prod_1    | Widget_2 |          30 |
| Order_3 | User_1 | Region_C | Prod_1    | Widget_2 |          60 |
| Order_4 | User_2 | Region_B | Prod_2    | Gadget   |          40 |
+---------+--------+----------+-----------+----------+-------------+
```

**NOTE**: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

Each order is enriched with the user and product information that was current at the time of the order. For example, `Order_2` at timestamp `3500` sees the updated product name `Widget_2` (updated at timestamp `3000`) but still sees `User_1` in `Region_A` because the region update to `Region_C` does not occur until timestamp `4000`. By `Order_3` at timestamp `5000`, both the product and user updates have taken effect.

#### Interval multi-way join: three Streams with WITHIN

The following query correlates user activity across three Streams: `clicks`, `impressions`, and `conversions` , matching events from the same user within a 10-second window on each join pair.

Given `clicks`, `impressions`, and `conversions` Streams with the following example records:

```sql
        clicks                  impressions                 conversions
+------+--------+--------+  +------+--------+------+  +------+--------+------+--------+
|   ts | userid | pageid |  |   ts | userid | adid |  |   ts | userid | adid | amount |
+------+--------+--------+  +------+--------+------+  +------+--------+------+--------+
| 5000 | User_1 | Page_A |  | 8000 | User_1 | Ad_X |  |12000 | User_1 | Ad_X |     50 |
|20000 | User_2 | Page_B |  |23000 | User_2 | Ad_Y |  |26000 | User_2 | Ad_Y |     75 |
|40000 | User_3 | Page_C |  +------+--------+------+  +------+--------+------+--------+
+------+--------+--------+
```

When the following query runs:

```sql
SELECT
  c.userid,
  c.pageid,
  i.adid,
  v.amount
FROM
  clicks c WITH ('timestamp'='ts')
  JOIN impressions i WITH ('timestamp'='ts')
    WITHIN 10 SECONDS ON c.userid = i.userid
  JOIN conversions v WITH ('timestamp'='ts')
    WITHIN 10 SECONDS ON c.userid = v.userid;
```

We get the following output:

```
+--------+--------+------+--------+
| userid | pageid | adid | amount |
+--------+--------+------+--------+
| User_1 | Page_A | Ad_X |     50 |
| User_2 | Page_B | Ad_Y |     75 |
+--------+--------+------+--------+
```

**NOTE**: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

`User_1`'s click at timestamp `5000` matches the impression at `8000` (within `10` seconds) and the conversion at `12000` (within `10` seconds of the click). `User_2`'s events also fall within the window. `User_3`'s click at `40000` has no matching impression or conversion within `10` seconds, so it does not appear in the output.

## CROSS JOIN UNNEST (Collection Expansion)

### Syntax

```sql
FROM relation_reference
CROSS JOIN UNNEST(collection_expression) [WITH ORDINALITY]
AS alias(column_name [, ...])
```

`CROSS JOIN UNNEST` expands an `ARRAY` or `MAP` column into individual rows, producing one output row for every element in the collection. Each row from the left-side relation is combined with each expanded element from the collection, similar to a cross join. If the collection is empty or `NULL`, the corresponding row from the left side is dropped (no output is produced for that row).\
`UNNEST` supports the following variations:

| Variation                     | Alias Columns     | Description                                                                      |
| ----------------------------- | ----------------- | -------------------------------------------------------------------------------- |
| `UNNEST(array_col)`           | `(element)`       | Expands each array element into a separate row.                                  |
| `UNNEST(map_col)`             | `(key, value)`    | Expands each map entry into a row with a key column and a value column.          |
| `UNNEST(...) WITH ORDINALITY` | `(..., position)` | Adds a 1-based position column indicating the element's index in the collection. |

### Constraints

* Only `CROSS JOIN` is supported with `UNNEST`. Other join types (`LEFT JOIN`, `INNER JOIN`, etc.) are not allowed.
* `CROSS JOIN` can only be used with `UNNEST` on the right side. Joining two regular relations with `CROSS JOIN` is not supported.
* An alias with column names is required: `AS alias(col1, ...)`. Omitting the alias or the column list is not allowed.
* The number of column aliases must match the expected output: 1 for an array, 2 for a map, plus 1 additional column if `WITH ORDINALITY` is used.
* The left-side Relation must be a Stream or Changelog.
* Chaining multiple `CROSS JOIN UNNEST` clauses or combining `CROSS JOIN UNNEST` with other joins in a multi-way join is not supported. To work around this, wrap the `CROSS JOIN UNNEST` in a subquery and join the result with other Relations.
* The expression inside `UNNEST(...)` must evaluate to an `ARRAY` or `MAP` type. Passing a scalar column (e.g., `VARCHAR`, `INTEGER`) results in an error.

### Examples

#### Expand an array column&#x20;

The following query expands the `interests` array column from the `users` Stream into individual rows: one row per interest per user.

```sql
SELECT u.userid,
       t.hobby 
FROM users u 
  CROSS JOIN UNNEST(u.interests) AS t(hobby);
```

If a users record has `userid = 'User_1'` and `interests = ['News', 'Sports', 'Games']`, the query produces three output rows:&#x20;

```
+--------+--------+
| userid |  hobby |
+--------+--------+
| User_1 |   News |
| User_1 | Sports |
| User_1 |  Games |
+--------+--------+
```

#### Expand a map column&#x20;

The following query expands the `contactinfo` map column into key-value rows.&#x20;

```sql
SELECT u.userid, 
       t.info_key, 
       t.info_val 
FROM users u 
  CROSS JOIN UNNEST(u.contactinfo) AS t(info_key, info_val);
```

#### Array expansion with ordinality&#x20;

The following query expands the `interests` array and includes the 1-based position of each element.&#x20;

```sql
SELECT u.userid,
       t.hobby,
       t.pos
FROM users u
  CROSS JOIN UNNEST(u.interests) WITH ORDINALITY AS t(hobby, pos);
```

If a users record has `interests = ['News', 'Sports']`, the output includes:

```
+--------+--------+-----+
| userid |  hobby | pos |
+--------+--------+-----+
| User_1 |   News |   1 |
| User_1 | Sports |   2 |
+--------+--------+-----+
```

#### UNNEST inside a subquery joined to another Stream&#x20;

To combine an unnested collection with another Stream, wrap the `CROSS JOIN UNNEST` in a subquery and use a standard interval or temporal join on the outer query.&#x20;

```sql
SELECT p.userid,
       sub.hobby 
FROM pageviews p WITH ('timestamp' = 'viewtime') 
  JOIN ( SELECT rowtime(u) AS rt, 
                u.userid,
                t.hobby 
         FROM users u 
           CROSS JOIN UNNEST(u.interests) AS t(hobby)
       ) sub WITH ('timestamp' = 'rt')
  WITHIN 1 MINUTE 
  ON p.userid = sub.userid;
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/reference/sql-syntax/query/select/join.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
