Creating a Function

A Function is custom code that is used to extend the processing capabilities of DeltaStream. There are already a number of built-in functions available for users to use in their queries. Users can add more functions by implementing them and adding them to DeltaStream.

This tutorial will demonstrate how to write a Function, add it to DeltaStream, and use it in DeltaStream queries.

Functions are only valid for approved Organizations. Please reach out to us if you want to enable Functions in your Organization.

Writing a Function

DeltaStream queries are powered by Apache Flink. So, to write Functions, users will leverage Flink's APIs (Flink documentation found here). Currently, DeltaStream supports Flink's Scalar Functions and Aggregate Functions.

To assist in writing these Functions, we've provided this repository that includes examples and templates. Users can fork or copy the template to get started. Note that in order to write Functions, we require:

  • Java (preferably Java 17)

  • Maven

For this tutorial, let's assume we have a simple use case where we want to be able to reverse a String so that we have the following behavior:

  • Query: SELECT reverse(col1) AS res FROM my_source_stream;

  • Input: { "col1": "ReverseMe" }

  • Output: { "res": "eMesreveR" }

Under the src/main/java directory, we can add a new class called MyStringReverser (this class can also exist in other packages under src/main/java, i.e. src/main/java/x/y/z/MyStringReverser.java) .

Here's what the MyStringReverser implementation may look like:

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class MyStringReverser extends ScalarFunction {

    @Override
    public void open(FunctionContext context) {
        // Initialization logic
    }

    public String eval(String s) {
        // Processing logic
        return new StringBuilder(s).reverse().toString();
    }
}

In our example, our MyStringReverser class extends ScalarFunction, the API used to define a Scalar Function typed UDF. For our Function, since we have a one-to-one mapping of inputs to outputs (i.e. for every input String we output a single reversed version of that String), a Scalar Function is appropriate. If we wanted to define an Aggregate Function instead, our class would extend Flink's AggregateFunction class instead (an example can be found here).

Important Components for Functions that extends ScalarFunction:

  • Function open(FunctionContext context): Any initialization logic exists in the open() method. This method is also where metrics can be added, using the FunctionContext (docs on adding custom metrics).

  • Function eval(T val): For Functions that extend ScalarFunction, they must have an eval() method. This method is where the processing logic lies. The method signature should match the input and output types of your Function. In our case, the input and output values are both Strings, but Functions can have multiple input values with different types, and the output type does not need to match the input type (this example shows how a Function can be defined for Struct input and output types). Furthermore, the same Function can have multiple eval() methods defined.

Important Components for Functions that extends AggregateFunction:

  • Function open(FunctionContext context): Any initialization logic exists in the open() method. This method is also where metrics can be added, using the FunctionContext (docs on adding custom metrics).

  • Function createAccumulator(): This method creates and initializes the accumulator for this Function. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

  • Function getValue(MyAccumulator acc): This method is called every time an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.

  • Function accumulate(MyAccumulator acc, T val): This method is called on each input Row and contains the logic for updating the accumulator. The method signature should match the input types of your Function, with the accumulator being the first parameter. The Function can have multiple accumulate() methods defined with different custom types and arguments.

  • (Optional) Function retract(MyAccumulator acc, T val): This method must be implemented when used in queries with OVER windows. The logic in this method should retract the input values from the accumulator instance. The Function can have multiple retract() methods defined with different custom types and arguments.

  • (Optional) Function merge(MyAccumulator acc, Iterable<MyAccumulator> it): This method must be implemented when used in queries with SESSION windows and bounded aggregations. The logic in this method should merge a group of accumulator instances into a single accumulator instance.

After writing your Function, build a .jar file. If you used our template, there are instructions on how to build the .jar file using Maven.

Creating a Function in a DeltaStream Organization

Once a Function is written and built, the next step is to add this Function in DeltaStream. This consists of 2 steps:

  1. Uploading the .jar file as a Function Source

  2. Defining a Function from the Function Source

Adding a Function Source

Using a role with the CREATE_FUNCTION_SOURCE privilege, create a new Function Source using the CREATE FUNCTION SOURCE statement.

CREATE FUNCTION_SOURCE my_function_source WITH ( 
    'file' = '/path/to/my/jar/file/my-udf-1.0-SNAPSHOT.jar',
    'description' = 'Function source with method to reverse Strings'
);

Now, we can list our Function Sources to verify it was created:

my_db.public/my_store# LIST FUNCTION_SOURCES;
         Name        |          Description           | Language |  Owner   |      Created at      |      Updated at      | Status | Messages
---------------------+--------------------------------+----------+----------+----------------------+----------------------+--------+-----------
  my_function_source | Function source with method to | java     | sysadmin | 2024-05-15T15:10:36Z | 2024-05-15T15:11:10Z | ready  |
                     | reverse Strings                |          |          |                      |                      |        |

Adding a Function from a Function Source

Using a role with the CREATE_FUNCTION privilege, create a new Function using the CREATE FUNCTION statement. Note in this statement's WITH clause, we are specifying the Function Source we added in the previous step as well as the class name for the Function we wrote in the previous section.

CREATE FUNCTION reverse(s VARCHAR) RETURNS VARCHAR LANGUAGE JAVA WITH(
    'source.name'='my_function_source',
    'class.name'='x.y.z.MyStringReverser' -- Assuming MyStringReverser is in package x.y.z
);

Now, we can list our Functions to verify it was created:

my_db.public/my_store# LIST FUNCTIONS;
       Source        |      Class       |          Signature          |  Owner   | Properties |      Created at      |      Updated at
---------------------+------------------+-----------------------------+----------+------------+----------------------+-----------------------
  my_function_source | MyStringReverser | reverse(s VARCHAR) VARCHAR  | sysadmin | {}         | 2024-05-15T15:12:56Z | 2024-05-15T15:13:21Z

Writing SQL Queries with the new Function

Now that we have added our Function to DeltaStream, the final step is to actually use the Function in SQL queries. We can simply call the Function in using the method signature we defined when we did CREATE FUNCTION.

SELECT viewtime, userid, pageid, reverse(pageid) AS pid_reversed FROM pageviews;
 | {"viewtime":1715786783663,"userid":"User_4","pageid":"Page_24","pid_reversed":"42_egaP"}
 | {"viewtime":1715786784667,"userid":"User_6","pageid":"Page_54","pid_reversed":"45_egaP"}
 | {"viewtime":1715786785673,"userid":"User_5","pageid":"Page_98","pid_reversed":"89_egaP"}
...

Last updated