LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Writing a Function
  • Important Components for Functions that extends ScalarFunction:
  • Important Components for Functions that extends AggregateFunction:
  • Creating a Function in a DeltaStream Organization
  • Adding a Function Source
  • Adding a Function from a Function Source
  • Writing SQL Queries with the New Function
  1. How do I...?

Create a Function

PreviousCreate a Compute Pool to Work with IcebergNextSecure my Connection to a Data Store

Last updated 7 days ago

A is custom code used to extend DeltaStream's processing capabilities. There are multiple available for you to use in your queries, but you can create and implement more functions and add them to DeltaStream.

This article demonstrates how to write a function, add it to DeltaStream, and use it in DeltaStream queries.

Note Functions are only valid for approved organizations. Please contact us if you wish to enable functions in your organization.

Writing a Function

DeltaStream queries are powered by Apache Flink. To write functions, you leverage Flink's APIs (please see for more details).

DeltaStream supports Flink's and .

To help you write these functions, we've provided this repository that includes and . You can fork or copy the template to get started.

To write functions, you need:

  • Java (preferably Java 17)

  • Maven

This article assumes a simple use case wherein you wish to reverse a string such that you have the following behavior:

  • Query: SELECT reverse(col1) AS res FROM my_source_stream;

  • Input: { "col1": "ReverseMe" }

  • Output: { "res": "eMesreveR" }

Under the src/main/java directory, you can add a new class called MyStringReverser. This class can also exist in other packages under src/main/java, such as src/main/java/x/y/z/MyStringReverser.java.

The MyStringReverser implementation looks as follows:

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class MyStringReverser extends ScalarFunction {

    @Override
    public void open(FunctionContext context) {
        // Initialization logic
    }

    public String eval(String s) {
        // Processing logic
        return new StringBuilder(s).reverse().toString();
    }
}

Important Components for Functions that extends ScalarFunction:

  • Function eval(T val): Functions that extend ScalarFunction must have an eval() method. This method is where the processing logic lies. The method signature should match the input and output types of your function. In this case, the input and output values are both strings. However, functions can have multiple input values with different types, and the output type does not need to match the input type. Furthermore, the same function can have multiple eval() methods defined.

Important Components for Functions that extends AggregateFunction:

  • Function createAccumulator(): This method creates and initializes the accumulator for this function. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

  • Function getValue(MyAccumulator acc): This method is called every time an aggregation result is materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.

  • Function accumulate(MyAccumulator acc, T val): This method is called on each input row. It contains the logic for updating the accumulator. The method signature should match the input types of your function, with the accumulator as the first parameter. The function can have multiple accumulate() methods defined with different custom types and arguments.

  • (Optional) Function retract(MyAccumulator acc, T val): You must implement this method when you use it in queries with OVER windows. The logic in this method should retract the input values from the accumulator instance. The function can have multiple retract() methods defined with different custom types and arguments.

  • (Optional) Function merge(MyAccumulator acc, Iterable<MyAccumulator> it): You must implement this method when you use it in queries with SESSION windows and bounded aggregations. The logic in this method merges a group of accumulator instances into a single accumulator instance.

Creating a Function in a DeltaStream Organization

When you have written and built a function, you can add it in DeltaStream. This consists of 2 steps:

  1. Uploading the .jar file as a Function Source

  2. Defining a Function from the Function Source

Adding a Function Source

CREATE FUNCTION_SOURCE my_function_source WITH ( 
    'file' = '/path/to/my/jar/file/my-udf-1.0-SNAPSHOT.jar',
    'description' = 'Function source with method to reverse Strings'
);
my_db.public/my_store# LIST FUNCTION_SOURCES;
         Name        |          Description           | Language |  Owner   |      Created at      |      Updated at      | Status | Messages
---------------------+--------------------------------+----------+----------+----------------------+----------------------+--------+-----------
  my_function_source | Function source with method to | java     | sysadmin | 2024-05-15T15:10:36Z | 2024-05-15T15:11:10Z | ready  |
                     | reverse Strings                |          |          |                      |                      |        |

Adding a Function from a Function Source

CREATE FUNCTION reverse(s VARCHAR) RETURNS VARCHAR LANGUAGE JAVA WITH(
    'source.name'='my_function_source',
    'class.name'='x.y.z.MyStringReverser' -- Assuming MyStringReverser is in package x.y.z
);
my_db.public/my_store# LIST FUNCTIONS;
       Source        |      Class       |          Signature          |  Owner   | Properties |      Created at      |      Updated at
---------------------+------------------+-----------------------------+----------+------------+----------------------+-----------------------
  my_function_source | MyStringReverser | reverse(s VARCHAR) VARCHAR  | sysadmin | {}         | 2024-05-15T15:12:56Z | 2024-05-15T15:13:21Z

Writing SQL Queries with the New Function

Now that you have added your function to DeltaStream, the final step is to actually use the function in a SQL query. To do this, simply call the function in using the method signature defined when you ran CREATE FUNCTION.

SELECT viewtime, userid, pageid, reverse(pageid) AS pid_reversed FROM pageviews;
 | {"viewtime":1715786783663,"userid":"User_4","pageid":"Page_24","pid_reversed":"42_egaP"}
 | {"viewtime":1715786784667,"userid":"User_6","pageid":"Page_54","pid_reversed":"45_egaP"}
 | {"viewtime":1715786785673,"userid":"User_5","pageid":"Page_98","pid_reversed":"89_egaP"}
...

In this example, your MyStringReverser class extends ScalarFunction, the API used to define a scalar function-typed UDF. As there's a one-to-one mapping of inputs to outputs (that is, for every input string you output a single reversed version of that string), a scalar function is appropriate. To define an aggregate function instead, your class would extend Flink's AggregateFunction class ().

Function open(FunctionContext context): Any initialization logic exists in the open() method. You can use FunctionContext to add metrics to this function. .

Function open(FunctionContext context): Any initialization logic exists in the open() method. You can use FunctionContext to add metrics to this function. .

After you write your function, build a .jar file. If you used the DeltaStream , you can review instructions on how to build the .jar file using Maven.

Using a role with the , create a new function source using the statement.

Now you can to verify it was created:

Using a role with the privilege, create a new function using the statement. Note that in this statement's WITH clause you specify the function source added in the previous step, plus the class name for the function you wrote in the previous section.

Now to verify it was created:

function
built-in functions
Flink documentation
Scalar Functions
Aggregate Functions
examples
templates
find an example here
More on adding custom metrics
More on adding custom metrics
template
CREATE_FUNCTION_SOURCE privilege
CREATE FUNCTION SOURCE
list your function sources
CREATE_FUNCTION
CREATE FUNCTION
list your Function