GENERATE STREAM DDL

Syntax

GENERATE STREAM DDL stream_name WITH (stream_parameter = value [, ...]);

Description

This command generates a CREATE DDL for STREAM from a given schema or data — for example, Data Store — that has corresponding Protocol Buffers and Descriptors. The command specially simplifies creating DDL for complex descriptors.

You can generate DDL if the current role has USAGE privileges on the Data Store.

Stream Parameters

Parameter Name
Description

stream_name

The name of the new stream.

primary key

If you provide the primary key names they are added to the DDL. Otherwise, this is empty. Required? Yes.

topic

Name of the Kafka topic whose assigned data schema you're using (for Protobuf descriptor and Avro schema from SchemaRegistry). Required? Yes, for AVRO, PROTOBUF

value. format

AVRO, PROTOBUF, JSON

Required? Yes.

key.format

AVRO, PROTOBUF, JSON

Required? No.

data.json.value.content

A string containing one or more JSON objects separated by commas. It serves as a data sample you use to infer the schema of the value field in Kafka.

Required? Required only for JSON

data.json.key.content

A string containing one or more JSON objects separated by commas. It serves as a data sample you used to infer the schema of the key in Kafka.

Required? Required only for JSON

postgresql.table.name

Name of the source table in the POSTGRESQL store.

Required: only for POSTGRESQL store.

Type: String

postgresql.schema.name

Name of the schema in the POSTGRESQL store containing the source table. Required: only for POSTGRESQL store.

Type: String

postgresql.db.name

Name of the database in the POSTGRESQL store containing the source table.

Required: only for POSTGRESQL store.

Type: String

Generating "Create Stream" DDL examples

Generate a "create stream" ddl for an entity in the current store, which includes a protobuf descriptor.

GENERATE STREAM DDL myStream with('topic'='pageviews_pb', 'value.format'='protobuf', 
'key.format'='protobuf');

The output would be:

CREATE STREAM myStream(
 "viewtime" BIGINT NOT NULL,
  "userid" VARCHAR NOT NULL,
  "pageid" VARCHAR NOT NULL
 ) WITH ('key.format'='protobuf', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_pb', 'value.format'='protobuf');

Generate a "create stream" ddl for an entity in the current store, which includes AVRO in the Confluent Schema Registry.

GENERATE STREAM DDL myStream with('topic'='pageviews_avro', 'value.format'='avro', 
'key.format'='avro');

The output would be:

CREATE STREAM myStream(
 "viewtime" BIGINT ,
  "userid" VARCHAR ,
  "pageid" VARCHAR 
) WITH ('key.format'='avro', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_avro', 'value.format'='avro');     

Generate a "create stream" ddl for an entity whose content is JSON.

GENERATE STREAM DDL myStream with ('value.format'='json', 'data.json.value.content'='{"viewtime":1629453600000,"userid":"user_123","pageid":"page_1"},{"viewtime":1629457200000,"userid":"user_456","pageid":"page_2"},{"viewtime":1629460800000,"userid":"user_789","pageid":"page_3"}' );

The output would be:

CREATE STREAM myStream(
 "viewtime" BIGINT,
  "userid" VARCHAR,
  "pageid" VARCHAR 
) WITH ('value.format'='json'); 

Generate a "create stream" ddl for a Postgres table in a CDC Pipeline

GENERATE STREAM DDL myStream with('value.format'='json', 'store'='pgStore', 'postgresql.table.name'='pgTb', 'postgresql.schema.name'='pgSchema', 'postgresql.db.name'='pgDb');
CREATE STREAM mystream( op VARCHAR,
 ts_ms BIGINT,
 "before" STRUCT<"id" BIGINT, "first_name" VARCHAR, "last_name" VARCHAR, "email" VARCHAR, "biography" VARCHAR>,
 "after" STRUCT<"id" BIGINT, "first_name" VARCHAR, "last_name" VARCHAR, "email" VARCHAR, "biography" VARCHAR>,
 "source" STRUCT<"db" VARCHAR, "schema" VARCHAR, "table" VARCHAR, "lsn" BIGINT> 
) WITH ('postgresql.db.name'='pgDb', 'postgresql.schema.name'='pgSchema', 'postgresql.table.name'='pgTb', 'store'='pgStore');

Last updated