Creating a Function
Last updated
Last updated
A is custom code used to extend DeltaStream's processing capabilities. There are multiple available for you to use in your queries, but you can create and implement more functions and add them to DeltaStream.
This tutorial demonstrates how to write a function, add it to DeltaStream, and use it in DeltaStream queries.
DeltaStream queries are powered by Apache Flink. To write functions, you leverage Flink's APIs ().
DeltaStream supports Flink's and .
To help you write these functions, we've provided this repository that includes and . You can fork or copy the template to get started.
To write functions, you need:
Java (preferably Java 17)
Maven
This tutorial assumes a simple use case wherein you wish to reverse a string such that you have the following behavior:
Query: SELECT reverse(col1) AS res FROM my_source_stream;
Input: { "col1": "ReverseMe" }
Output: { "res": "eMesreveR" }
Under the src/main/java
directory, you can add a new class called MyStringReverser
. This class can also exist in other packages under src/main/java
, suhc as src/main/java/x/y/z/MyStringReverser.java
.
The MyStringReverser
implementation may look as follows:
extends ScalarFunction
:Function eval(T val)
:
Functions that extend ScalarFunction
must have an eval()
method. This method is where the processing logic lies. The method signature should match the input and output types of your function. In this case, the input and output values are both strings. However, functions can have multiple input values with different types, and the output type does not need to match the input type. Furthermore, the same function can have multiple eval()
methods defined.
extends AggregateFunction
:Function createAccumulator()
:
This method creates and initializes the accumulator for this function. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.
Function getValue(MyAccumulator acc)
:
This method is called every time an aggregation result is materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.
Function accumulate(MyAccumulator acc, T val)
:
This method is called on each input row. It contains the logic for updating the accumulator. The method signature should match the input types of your function, with the accumulator as the first parameter. The function can have multiple accumulate()
methods defined with different custom types and arguments.
(Optional) Function retract(MyAccumulator acc, T val)
:
You must implement this method when you use it in queries with OVER
windows. The logic in this method should retract the input values from the accumulator instance. The function can have multiple retract()
methods defined with different custom types and arguments.
(Optional) Function merge(MyAccumulator acc, Iterable<MyAccumulator> it)
:
You must implement this method when you use it in queries with SESSION
windows and bounded aggregations. The logic in this method should merge a group of accumulator instances into a single accumulator instance.
When you have written and built a function, you can add it in DeltaStream. This consists of 2 steps:
Uploading the .jar
file as a Function Source
Defining a Function from the Function Source
Now that you have added your function to DeltaStream, the final step is to actually use the function in SQL queries. To do this, simply call the function in using the method signature defined when you did CREATE FUNCTION
.
In this example, your MyStringReverser
class extends ScalarFunction
, the API used to define a scalar function-typed UDF. As there's a one-to-one mapping of inputs to outputs (that is, for every input string you output a single reversed version of that string), a scalar function is appropriate. To define an aggregate function instead, your class would extend Flink's AggregateFunction
class ().
Function open(FunctionContext context)
:
Any initialization logic exists in the open()
method. You can use FunctionContext to add metrics to this function. .
Function open(FunctionContext context)
:
Any initialization logic exists in the open()
method. You can use FunctionContext to add metrics to this function. .
After you write your function, build a .jar
file. If you used the DeltaStream , you can review instructions on how to build the .jar
file using Maven.
Using a role with the , create a new function source using the .
Now you can to verify it was created:
Using a role with the , create a new function using the . Note in this statement's WITH
clause, you specify the function source added in the previous step, plus the class name for the function you wrote in the previous section.
Now to verify it was created: