GENERATE STREAM DDL
Syntax
GENERATE STREAM DDL stream_name WITH (stream_parameter = value [, ...]);
Description
This command generates a CREATE DDL for STREAM from a given schema or data — for example, Data Store — that has corresponding Protocol Buffers and Descriptors. The command specially simplifies creating DDL for complex descriptors.
You can generate DDL if the current role has USAGE
privileges on the Data Store.
Stream Parameters
stream_name
The name of the new stream.
primary key
If you provide the primary key names they are added to the DDL. Otherwise, this is empty. Required? Yes.
topic
Name of the Kafka topic whose assigned data schema you're using (for Protobuf descriptor and Avro schema from SchemaRegistry). Required? Yes, for AVRO, PROTOBUF
value. format
AVRO, PROTOBUF, JSON
Required? Yes.
key.format
AVRO, PROTOBUF, JSON
Required? No.
data.json.value.content
A string containing one or more JSON objects separated by commas. It serves as a data sample you use to infer the schema of the value field in Kafka.
Required? Required only for JSON
data.json.key.content
A string containing one or more JSON objects separated by commas. It serves as a data sample you used to infer the schema of the key in Kafka.
Required? Required only for JSON
postgresql.table.name
Name of the source table in the POSTGRESQL store.
Required: only for POSTGRESQL store.
Type: String
postgresql.schema.name
Name of the schema in the POSTGRESQL store containing the source table. Required: only for POSTGRESQL store.
Type: String
postgresql.db.name
Name of the database in the POSTGRESQL store containing the source table.
Required: only for POSTGRESQL store.
Type: String
Generating "Create Stream" DDL examples
Generate a "create stream" ddl for an entity in the current store, which includes a protobuf descriptor.
GENERATE STREAM DDL myStream with('topic'='pageviews_pb', 'value.format'='protobuf',
'key.format'='protobuf');
The output would be:
CREATE STREAM myStream(
"viewtime" BIGINT NOT NULL,
"userid" VARCHAR NOT NULL,
"pageid" VARCHAR NOT NULL
) WITH ('key.format'='protobuf', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_pb', 'value.format'='protobuf');
Generate a "create stream" ddl for an entity in the current store, which includes AVRO in the Confluent Schema Registry.
GENERATE STREAM DDL myStream with('topic'='pageviews_avro', 'value.format'='avro',
'key.format'='avro');
The output would be:
CREATE STREAM myStream(
"viewtime" BIGINT ,
"userid" VARCHAR ,
"pageid" VARCHAR
) WITH ('key.format'='avro', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_avro', 'value.format'='avro');
Generate a "create stream" ddl for an entity whose content is JSON.
GENERATE STREAM DDL myStream with ('value.format'='json', 'data.json.value.content'='{"viewtime":1629453600000,"userid":"user_123","pageid":"page_1"},{"viewtime":1629457200000,"userid":"user_456","pageid":"page_2"},{"viewtime":1629460800000,"userid":"user_789","pageid":"page_3"}' );
The output would be:
CREATE STREAM myStream(
"viewtime" BIGINT,
"userid" VARCHAR,
"pageid" VARCHAR
) WITH ('value.format'='json');
Generate a "create stream" ddl for a Postgres table in a CDC Pipeline
GENERATE STREAM DDL myStream with('value.format'='json', 'store'='pgStore', 'postgresql.table.name'='pgTb', 'postgresql.schema.name'='pgSchema', 'postgresql.db.name'='pgDb');
CREATE STREAM mystream( op VARCHAR,
ts_ms BIGINT,
"before" STRUCT<"id" BIGINT, "first_name" VARCHAR, "last_name" VARCHAR, "email" VARCHAR, "biography" VARCHAR>,
"after" STRUCT<"id" BIGINT, "first_name" VARCHAR, "last_name" VARCHAR, "email" VARCHAR, "biography" VARCHAR>,
"source" STRUCT<"db" VARCHAR, "schema" VARCHAR, "table" VARCHAR, "lsn" BIGINT>
) WITH ('postgresql.db.name'='pgDb', 'postgresql.schema.name'='pgSchema', 'postgresql.table.name'='pgTb', 'store'='pgStore');
Last updated