GENERATE STREAM DDL and GENERATE CHANGELOG DDL

Syntax

GENERATE STREAM DDL stream_name WITH (stream_parameter = value [, ...]);

GENERATE CHANGLEOG DDL changelog_name WITH(stream_parameter = value [, ...]);

GENERATE CHANGLEOG DDL changelog_name PRIMARY KEY(pk_name) WITH(stream_parameter = value [, ...]);

Description

This command generates a CREATE DDL for STREAM or CHANGELOG from a given schema or data — for example, Data Store — that has corresponding Protocol Buffers and Descriptors. The command specially simplifies creating DDL for complex descriptors.

You can generate DDL if the current role has USAGE privileges on the Data Store.

Arguments

stream_name

the name of the new stream.

PRIMARY KEY

Optional for Changelog, if the user provide the primary key names, they will be added to the DDL, otherwise it will be empty.

topic

Name of the Kafka topic for which we use its assigned data schema (for Protobuf descritor and Avro schema from SchemaRegistry).

required for AVRO, PROTOBUF

value.format

AVRO, PROTOBUF, JSON

Required

key.format

AVRO, PROTOBUF, JSON

Optional

data.json.value.content

A string containing one or more JSON objects separated by commas. It serves as a sample of data used to infer the schema of value field in Kafka.

Required only for JSON

data.json.key.content

A string containing one or more JSON objects separated by commas. It serves as a sample of data used to infer the schema of the key in Kafka.

Required only for JSON

Examples

Generate a "create stream ddl" for an entity in the current store which has aprotbuf decriptor

generate stream ddl myStream with('topic'='pageviews_pb', 'value.format'='protobuf', 'key.format'='protobuf');

CREATE STREAM myyStream(
 "viewtime" BIGINT NOT NULL,
  "userid" VARCHAR NOT NULL,
  "pageid" VARCHAR NOT NULL
 ) WITH ('key.format'='protobuf', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_pb', 'value.format'='protobuf');

Generate a "create stream ddl" for an entity in the current store which has AVRO in Confluent Schema Registry.

generate stream ddl myStream with('topic'='pageviews_avro', 'value.format'='avro', 'key.format'='avro');   

CREATE STREAM myStream(
 "viewtime" BIGINT ,
  "userid" VARCHAR ,
  "pageid" VARCHAR 
) WITH ('key.format'='avro', 'key.type'='STRUCT<"userid" VARCHAR>', 'topic'='pageviews_avro', 'value.format'='avro');     

Generate a "create stream ddl" for an entity which its content is JSON

generate stream ddl myStream with ('value.format'='json', 'data.json.value.content'='{"viewtime":1629453600000,"userid":"user_123","pageid":"page_1"},{"viewtime":1629457200000,"userid":"user_456","pageid":"page_2"},{"viewtime":1629460800000,"userid":"user_789","pageid":"page_3"}' );


CREATE STREAM myStream(
 "viewtime" BIGINT,
  "userid" VARCHAR,
  "pageid" VARCHAR 
) WITH ('value.format'='json'); 
generate changelog ddl myChangelog with ('value.format'='json', 'data.json.value.content'='{"viewtime":1629453600000,"userid":"user_123","pageid":"page_1"},{"viewtime":1629457200000,"userid":"user_456","pageid":"page_2"},{"viewtime":1629460800000,"userid":"user_789","pageid":"page_3"}' );

CREATE CHANGELOG myChangelog(
 "viewtime" BIGINT,
 "userid" VARCHAR,
 "pageid" VARCHAR,
  PRIMARY KEY() 
) WITH ('value.format'='json');

Last updated