# Create a Function

A [function](https://docs.deltastream.io/overview/core-concepts/function) is custom code used to extend DeltaStream's processing capabilities. There are multiple [built-in functions](https://docs.deltastream.io/reference/sql-syntax/query/functions/built-in-functions) available for you to use in your queries, but you can create and implement more functions and add them to DeltaStream.

This article demonstrates how to write a function, add it to DeltaStream, and use it in DeltaStream queries.

{% hint style="info" %}
**Note** Functions are only valid for approved organizations. Please contact us if you wish to enable functions in your organization.
{% endhint %}

## Writing a Function

DeltaStream queries are powered by Apache Flink. To write functions, you leverage Flink's APIs (please see [Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/udfs/#user-defined-functions) for more details).

DeltaStream supports Flink's [Scalar Functions](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/udfs/#scalar-functions) and [Aggregate Functions](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/functions/udfs/#aggregate-functions).

To help you write these functions, we've provided this repository that includes [examples](https://github.com/deltastreaminc/deltastream-examples/tree/main/udf/java/examples) and [templates](https://github.com/deltastreaminc/deltastream-examples/tree/main/udf/java/templates). You can fork or copy the template to get started.

To write functions, you need:

* Java (preferably Java 17)
* Maven

This article assumes a simple use case wherein you wish to reverse a string such that you have the following behavior:

* Query: `SELECT reverse(col1) AS res FROM my_source_stream;`
* Input: `{ "col1": "ReverseMe" }`
* Output: `{ "res": "eMesreveR" }`

Under the `src/main/java` directory, you can add a new class called `MyStringReverser`. This class can also exist in other packages under `src/main/java`, such as `src/main/java/x/y/z/MyStringReverser.java`.

The `MyStringReverser` implementation looks as follows:

```java
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class MyStringReverser extends ScalarFunction {

    @Override
    public void open(FunctionContext context) {
        // Initialization logic
    }

    public String eval(String s) {
        // Processing logic
        return new StringBuilder(s).reverse().toString();
    }
}
```

In this example, your `MyStringReverser` class `extends ScalarFunction`, the API used to define a scalar function-typed UDF. As there's a one-to-one mapping of inputs to outputs (that is, for every input string you output a single reversed version of that string), a scalar function is appropriate. To define an aggregate function instead, your class would extend Flink's `AggregateFunction` class ([find an example here](https://github.com/deltastreaminc/deltastream-examples/blob/main/udf/java/examples/src/main/java/examples/aggr/WeightedAvg.java)).

### Important Components for Functions that `extends ScalarFunction`:

* **Function `open(FunctionContext context)`:**\
  Any initialization logic exists in the `open()` method. You can use FunctionContext to add metrics to this function. [More on adding custom metrics](https://docs.deltastream.io/reference/metrics/custom-metrics-in-functions).
* **Function `eval(T val)`:**\
  Functions that extend `ScalarFunction` must have an `eval()` method. This method is where the processing logic lies. The method signature should match the input and output types of your function. In this case, the input and output values are both strings. However, functions can have multiple input values with different types, and the output type does not need to match the input type. Furthermore, the same function can have multiple `eval()` methods defined.

### Important Components for Functions that `extends AggregateFunction`:

* **Function `open(FunctionContext context)`:**\
  Any initialization logic exists in the `open()` method. You can use FunctionContext to add metrics to this function. [More on adding custom metrics](https://docs.deltastream.io/reference/metrics/custom-metrics-in-functions).
* **Function `createAccumulator()`:**\
  This method creates and initializes the accumulator for this function. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.
* **Function `getValue(MyAccumulator acc)`:**\
  This method is called every time an aggregation result is materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.
* **Function `accumulate(MyAccumulator acc, T val)`:**\
  This method is called on each input row. It contains the logic for updating the accumulator. The method signature should match the input types of your function, with the accumulator as the first parameter. The function can have multiple `accumulate()` methods defined with different custom types and arguments.
* **(Optional) Function `retract(MyAccumulator acc, T val)`:**\
  You must implement this method when you use it in queries with `OVER` windows. The logic in this method should retract the input values from the accumulator instance. The function can have multiple `retract()` methods defined with different custom types and arguments.
* **(Optional) Function `merge(MyAccumulator acc, Iterable<MyAccumulator> it)`:**\
  You must implement this method when you use it in queries with `SESSION` windows and bounded aggregations. The logic in this method merges a group of accumulator instances into a single accumulator instance.

After you write your function, build a `.jar` file. If you used the DeltaStream [template](https://github.com/deltastreaminc/deltastream-examples/tree/main/udf/java/templates), you can review instructions on how to build the `.jar` file using Maven.

## Creating a Function in a DeltaStream Organization

When you have written and built a function, you can add it in DeltaStream. This consists of 2 steps:

1. Uploading the `.jar` file as a Function Source
2. Defining a Function from the Function Source

### Adding a Function Source

Using a role with the [`CREATE_FUNCTION_SOURCE` privilege](https://docs.deltastream.io/overview/core-concepts/access-control#available-privileges), create a new function source using the [`CREATE FUNCTION SOURCE`](https://docs.deltastream.io/reference/sql-syntax/ddl/create-function_source) statement.

```sql
CREATE FUNCTION_SOURCE my_function_source WITH ( 
    'file' = '/path/to/my/jar/file/my-udf-1.0-SNAPSHOT.jar',
    'description' = 'Function source with method to reverse Strings'
);
```

Now you can [list your function sources](https://docs.deltastream.io/reference/sql-syntax/command/list-function_sources) to verify it was created:

```
my_db.public/my_store# LIST FUNCTION_SOURCES;
         Name        |          Description           | Language |  Owner   |      Created at      |      Updated at      | Status | Messages
---------------------+--------------------------------+----------+----------+----------------------+----------------------+--------+-----------
  my_function_source | Function source with method to | java     | sysadmin | 2024-05-15T15:10:36Z | 2024-05-15T15:11:10Z | ready  |
                     | reverse Strings                |          |          |                      |                      |        |
```

### Adding a Function from a Function Source

Using a role with the [`CREATE_FUNCTION`](https://docs.deltastream.io/overview/core-concepts/access-control#available-privileges) privilege, create a new function using the [`CREATE FUNCTION`](https://docs.deltastream.io/reference/sql-syntax/ddl/create-function) statement. Note that in this statement's `WITH` clause you specify the function source added in the previous step, plus the class name for the function you wrote in the previous section.

```sql
CREATE FUNCTION reverse(s VARCHAR) RETURNS VARCHAR LANGUAGE JAVA WITH(
    'source.name'='my_function_source',
    'class.name'='x.y.z.MyStringReverser' -- Assuming MyStringReverser is in package x.y.z
);
```

Now [list your Function](https://docs.deltastream.io/reference/sql-syntax/command/list-functions) to verify it was created:

```
my_db.public/my_store# LIST FUNCTIONS;
       Source        |      Class       |          Signature          |  Owner   | Properties |      Created at      |      Updated at
---------------------+------------------+-----------------------------+----------+------------+----------------------+-----------------------
  my_function_source | MyStringReverser | reverse(s VARCHAR) VARCHAR  | sysadmin | {}         | 2024-05-15T15:12:56Z | 2024-05-15T15:13:21Z
```

## Writing SQL Queries with the New Function

Now that you have added your function to DeltaStream, the final step is to actually use the function in a SQL query. To do this, simply call the function in using the method signature defined when you ran `CREATE FUNCTION`.

```sql
SELECT viewtime, userid, pageid, reverse(pageid) AS pid_reversed FROM pageviews;
 | {"viewtime":1715786783663,"userid":"User_4","pageid":"Page_24","pid_reversed":"42_egaP"}
 | {"viewtime":1715786784667,"userid":"User_6","pageid":"Page_54","pid_reversed":"45_egaP"}
 | {"viewtime":1715786785673,"userid":"User_5","pageid":"Page_98","pid_reversed":"89_egaP"}
...
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/how-do-i.../creating-a-function.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
