Define KSQL STRUCT on JSON valued topic with different types












0















(Edit: slight edits to better reflect intention, but large edit due to progress made.)



A topic "t_raw" is given messages of multiple types, where they all contain a common "type" key:



{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}


Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT for everything, but my current effort has me doing this:



create stream raw (type varchar, data varchar) 
with (kafka_topic='t_raw', value_format='JSON');


for the first level, then



create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as 
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';


This seems to work, but with the recent addition of STRUCT, is there a way to use it in lieu of extractjsonfield as done above?



ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2


If not with STRUCT, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql, ergo the apache-kafka-streams tag)?



Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>



ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) 
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}


There is some (not-so-recent) discussion on being able to do something like



CREATE STREAM key1 ( a INT, b VARCHAR ) AS 
SELECT data->* from some_input where type = 'key1';


FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield bug and enabled this solution.



The real data has several more similar message types. They all contain "type" and "data" keys (and no others at the top-level), and almost all have the "ts" timestamp equivalent nested within "data".










share|improve this question

























  • I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way

    – cricket_007
    Nov 23 '18 at 2:23











  • @MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.

    – r2evans
    Nov 23 '18 at 5:16











  • @cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional "data" definition there, nor in the def within create stream ..., so that doesn't surprise me wholly. Thanks.

    – r2evans
    Nov 23 '18 at 5:17






  • 1





    For this case, you should mention this in the question :)

    – Matthias J. Sax
    Nov 23 '18 at 5:22











  • Working on an edit as we type ...

    – r2evans
    Nov 23 '18 at 5:25


















0















(Edit: slight edits to better reflect intention, but large edit due to progress made.)



A topic "t_raw" is given messages of multiple types, where they all contain a common "type" key:



{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}


Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT for everything, but my current effort has me doing this:



create stream raw (type varchar, data varchar) 
with (kafka_topic='t_raw', value_format='JSON');


for the first level, then



create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as 
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';


This seems to work, but with the recent addition of STRUCT, is there a way to use it in lieu of extractjsonfield as done above?



ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2


If not with STRUCT, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql, ergo the apache-kafka-streams tag)?



Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>



ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) 
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}


There is some (not-so-recent) discussion on being able to do something like



CREATE STREAM key1 ( a INT, b VARCHAR ) AS 
SELECT data->* from some_input where type = 'key1';


FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield bug and enabled this solution.



The real data has several more similar message types. They all contain "type" and "data" keys (and no others at the top-level), and almost all have the "ts" timestamp equivalent nested within "data".










share|improve this question

























  • I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way

    – cricket_007
    Nov 23 '18 at 2:23











  • @MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.

    – r2evans
    Nov 23 '18 at 5:16











  • @cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional "data" definition there, nor in the def within create stream ..., so that doesn't surprise me wholly. Thanks.

    – r2evans
    Nov 23 '18 at 5:17






  • 1





    For this case, you should mention this in the question :)

    – Matthias J. Sax
    Nov 23 '18 at 5:22











  • Working on an edit as we type ...

    – r2evans
    Nov 23 '18 at 5:25
















0












0








0








(Edit: slight edits to better reflect intention, but large edit due to progress made.)



A topic "t_raw" is given messages of multiple types, where they all contain a common "type" key:



{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}


Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT for everything, but my current effort has me doing this:



create stream raw (type varchar, data varchar) 
with (kafka_topic='t_raw', value_format='JSON');


for the first level, then



create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as 
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';


This seems to work, but with the recent addition of STRUCT, is there a way to use it in lieu of extractjsonfield as done above?



ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2


If not with STRUCT, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql, ergo the apache-kafka-streams tag)?



Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>



ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) 
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}


There is some (not-so-recent) discussion on being able to do something like



CREATE STREAM key1 ( a INT, b VARCHAR ) AS 
SELECT data->* from some_input where type = 'key1';


FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield bug and enabled this solution.



The real data has several more similar message types. They all contain "type" and "data" keys (and no others at the top-level), and almost all have the "ts" timestamp equivalent nested within "data".










share|improve this question
















(Edit: slight edits to better reflect intention, but large edit due to progress made.)



A topic "t_raw" is given messages of multiple types, where they all contain a common "type" key:



{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}


Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT for everything, but my current effort has me doing this:



create stream raw (type varchar, data varchar) 
with (kafka_topic='t_raw', value_format='JSON');


for the first level, then



create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as 
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';


This seems to work, but with the recent addition of STRUCT, is there a way to use it in lieu of extractjsonfield as done above?



ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2


If not with STRUCT, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql, ergo the apache-kafka-streams tag)?



Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>



ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> ) 
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}


There is some (not-so-recent) discussion on being able to do something like



CREATE STREAM key1 ( a INT, b VARCHAR ) AS 
SELECT data->* from some_input where type = 'key1';


FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield bug and enabled this solution.



The real data has several more similar message types. They all contain "type" and "data" keys (and no others at the top-level), and almost all have the "ts" timestamp equivalent nested within "data".







apache-kafka apache-kafka-streams ksql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 23 '18 at 9:55









Robin Moffatt

6,7671329




6,7671329










asked Nov 22 '18 at 21:59









r2evansr2evans

26.2k33058




26.2k33058













  • I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way

    – cricket_007
    Nov 23 '18 at 2:23











  • @MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.

    – r2evans
    Nov 23 '18 at 5:16











  • @cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional "data" definition there, nor in the def within create stream ..., so that doesn't surprise me wholly. Thanks.

    – r2evans
    Nov 23 '18 at 5:17






  • 1





    For this case, you should mention this in the question :)

    – Matthias J. Sax
    Nov 23 '18 at 5:22











  • Working on an edit as we type ...

    – r2evans
    Nov 23 '18 at 5:25





















  • I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way

    – cricket_007
    Nov 23 '18 at 2:23











  • @MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.

    – r2evans
    Nov 23 '18 at 5:16











  • @cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional "data" definition there, nor in the def within create stream ..., so that doesn't surprise me wholly. Thanks.

    – r2evans
    Nov 23 '18 at 5:17






  • 1





    For this case, you should mention this in the question :)

    – Matthias J. Sax
    Nov 23 '18 at 5:22











  • Working on an edit as we type ...

    – r2evans
    Nov 23 '18 at 5:25



















I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way

– cricket_007
Nov 23 '18 at 2:23





I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way

– cricket_007
Nov 23 '18 at 2:23













@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.

– r2evans
Nov 23 '18 at 5:16





@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.

– r2evans
Nov 23 '18 at 5:16













@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional "data" definition there, nor in the def within create stream ..., so that doesn't surprise me wholly. Thanks.

– r2evans
Nov 23 '18 at 5:17





@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional "data" definition there, nor in the def within create stream ..., so that doesn't surprise me wholly. Thanks.

– r2evans
Nov 23 '18 at 5:17




1




1





For this case, you should mention this in the question :)

– Matthias J. Sax
Nov 23 '18 at 5:22





For this case, you should mention this in the question :)

– Matthias J. Sax
Nov 23 '18 at 5:22













Working on an edit as we type ...

– r2evans
Nov 23 '18 at 5:25







Working on an edit as we type ...

– r2evans
Nov 23 '18 at 5:25














1 Answer
1






active

oldest

votes


















2














Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null value.



Test data setup



Populate some test data into the topic:



kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF


Dump the topic to KSQL console for inspection:



ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>


Model the source stream of data



Create a stream over it. Note the use of STRUCT and reference of every possible column:



CREATE STREAM T (TYPE VARCHAR, 
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');


Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:



ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated


Query the types individually, using the -> operator to access the nested elements:



ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2


Persist the data in separate Kafka topics:



Populate target topics with the separated data:



ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------


Schema for the new streams:



ksql> DESCRIBE TYPE_1;

Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------


Topics underpin each KSQL stream:



ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------





share|improve this answer
























  • Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

    – r2evans
    Nov 23 '18 at 20:34











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438413%2fdefine-ksql-struct-on-json-valued-topic-with-different-types%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









2














Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null value.



Test data setup



Populate some test data into the topic:



kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF


Dump the topic to KSQL console for inspection:



ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>


Model the source stream of data



Create a stream over it. Note the use of STRUCT and reference of every possible column:



CREATE STREAM T (TYPE VARCHAR, 
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');


Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:



ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated


Query the types individually, using the -> operator to access the nested elements:



ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2


Persist the data in separate Kafka topics:



Populate target topics with the separated data:



ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------


Schema for the new streams:



ksql> DESCRIBE TYPE_1;

Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------


Topics underpin each KSQL stream:



ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------





share|improve this answer
























  • Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

    – r2evans
    Nov 23 '18 at 20:34
















2














Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null value.



Test data setup



Populate some test data into the topic:



kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF


Dump the topic to KSQL console for inspection:



ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>


Model the source stream of data



Create a stream over it. Note the use of STRUCT and reference of every possible column:



CREATE STREAM T (TYPE VARCHAR, 
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');


Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:



ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated


Query the types individually, using the -> operator to access the nested elements:



ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2


Persist the data in separate Kafka topics:



Populate target topics with the separated data:



ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------


Schema for the new streams:



ksql> DESCRIBE TYPE_1;

Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------


Topics underpin each KSQL stream:



ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------





share|improve this answer
























  • Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

    – r2evans
    Nov 23 '18 at 20:34














2












2








2







Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null value.



Test data setup



Populate some test data into the topic:



kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF


Dump the topic to KSQL console for inspection:



ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>


Model the source stream of data



Create a stream over it. Note the use of STRUCT and reference of every possible column:



CREATE STREAM T (TYPE VARCHAR, 
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');


Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:



ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated


Query the types individually, using the -> operator to access the nested elements:



ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2


Persist the data in separate Kafka topics:



Populate target topics with the separated data:



ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------


Schema for the new streams:



ksql> DESCRIBE TYPE_1;

Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------


Topics underpin each KSQL stream:



ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------





share|improve this answer













Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null value.



Test data setup



Populate some test data into the topic:



kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF


Dump the topic to KSQL console for inspection:



ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>


Model the source stream of data



Create a stream over it. Note the use of STRUCT and reference of every possible column:



CREATE STREAM T (TYPE VARCHAR, 
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');


Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:



ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated


Query the types individually, using the -> operator to access the nested elements:



ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1'  LIMIT 2;
1 | hello
2 | hello2

ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2


Persist the data in separate Kafka topics:



Populate target topics with the separated data:



ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';

Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';

Message
----------------------------
Stream created and running
----------------------------


Schema for the new streams:



ksql> DESCRIBE TYPE_1;

Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;

Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------


Topics underpin each KSQL stream:



ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------






share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 23 '18 at 9:54









Robin MoffattRobin Moffatt

6,7671329




6,7671329













  • Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

    – r2evans
    Nov 23 '18 at 20:34



















  • Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

    – r2evans
    Nov 23 '18 at 20:34

















Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

– r2evans
Nov 23 '18 at 20:34





Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed struct versus the extractjsonfield method?

– r2evans
Nov 23 '18 at 20:34


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438413%2fdefine-ksql-struct-on-json-valued-topic-with-different-types%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

404 Error Contact Form 7 ajax form submitting

How to know if a Active Directory user can login interactively

TypeError: fit_transform() missing 1 required positional argument: 'X'