GENERATE STREAM DDL and GENERATE CHANGELOG DDL
Syntax
GENERATE STREAM DDL stream_name WITH (stream_parameter = value [, ...]);
GENERATE CHANGLEOG DDL changelog_name WITH(stream_parameter = value [, ...]);
GENERATE CHANGLEOG DDL changelog_name PRIMARY KEY(pk_name) WITH(stream_parameter = value [, ...]);
Description
This command generates a CREATE DDL for STREAM or CHANGELOG from a given schema or data — for example, Data Store — that has corresponding Protocol Buffers and Descriptors. The command specially simplifies creating DDL for complex descriptors.
You can generate DDL if the current role has USAGE
privileges on the Data Store.
Arguments
stream_name
the name of the new stream.
PRIMARY KEY
Optional for Changelog, if the user provide the primary key names, they will be added to the DDL, otherwise it will be empty.
topic
Name of the Kafka topic for which we use its assigned data schema (for Protobuf descritor and Avro schema from SchemaRegistry).
required for AVRO, PROTOBUF
value.format
AVRO, PROTOBUF, JSON
Required
key.format
AVRO, PROTOBUF, JSON
Optional
data.json.value.content
A string containing one or more JSON objects separated by commas. It serves as a sample of data used to infer the schema of value field in Kafka.
Required only for JSON
data.json.key.content
A string containing one or more JSON objects separated by commas. It serves as a sample of data used to infer the schema of the key in Kafka.
Required only for JSON
Examples
Generate a "create stream ddl" for an entity in the current store which has aprotbuf decriptor
generate stream ddl myStream with('topic'='pageviews_pb', 'value.format'='protobuf', 'key.format'='protobuf');
CREATE STREAM myyStream(
"viewtime" BIGINT NOT NULL,
"userid" VARCHAR NOT NULL,
"pageid" VARCHAR NOT NULL
) WITH ('key.format'='protobuf', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_pb', 'value.format'='protobuf');
Generate a "create stream ddl" for an entity in the current store which has AVRO in Confluent Schema Registry.
generate stream ddl myStream with('topic'='pageviews_avro', 'value.format'='avro', 'key.format'='avro');
CREATE STREAM myStream(
"viewtime" BIGINT ,
"userid" VARCHAR ,
"pageid" VARCHAR
) WITH ('key.format'='avro', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_avro', 'value.format'='avro');
Generate a "create stream ddl" for an entity which its content is JSON
generate stream ddl myStream with ('value.format'='json', 'data.json.value.content'='{"viewtime":1629453600000,"userid":"user_123","pageid":"page_1"},{"viewtime":1629457200000,"userid":"user_456","pageid":"page_2"},{"viewtime":1629460800000,"userid":"user_789","pageid":"page_3"}' );
CREATE STREAM myStream(
"viewtime" BIGINT,
"userid" VARCHAR,
"pageid" VARCHAR
) WITH ('value.format'='json');
generate changelog ddl myChangelog with ('value.format'='json', 'data.json.value.content'='{"viewtime":1629453600000,"userid":"user_123","pageid":"page_1"},{"viewtime":1629457200000,"userid":"user_456","pageid":"page_2"},{"viewtime":1629460800000,"userid":"user_789","pageid":"page_3"}' );
CREATE CHANGELOG myChangelog(
"viewtime" BIGINT,
"userid" VARCHAR,
"pageid" VARCHAR,
PRIMARY KEY()
) WITH ('value.format'='json');
Last updated