Define KSQL STRUCT on JSON valued topic with different types
(Edit: slight edits to better reflect intention, but large edit due to progress made.)
A topic "t_raw"
is given messages of multiple types, where they all contain a common "type"
key:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT
for everything, but my current effort has me doing this:
create stream raw (type varchar, data varchar)
with (kafka_topic='t_raw', value_format='JSON');
for the first level, then
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';
This seems to work, but with the recent addition of STRUCT
, is there a way to use it in lieu of extractjsonfield
as done above?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
If not with STRUCT
, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql
, ergo the apache-kafka-streams tag)?
Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> )
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
There is some (not-so-recent) discussion on being able to do something like
CREATE STREAM key1 ( a INT, b VARCHAR ) AS
SELECT data->* from some_input where type = 'key1';
FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield
bug and enabled this solution.
The real data has several more similar message types. They all contain "type"
and "data"
keys (and no others at the top-level), and almost all have the "ts"
timestamp equivalent nested within "data"
.
apache-kafka apache-kafka-streams ksql
|
show 1 more comment
(Edit: slight edits to better reflect intention, but large edit due to progress made.)
A topic "t_raw"
is given messages of multiple types, where they all contain a common "type"
key:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT
for everything, but my current effort has me doing this:
create stream raw (type varchar, data varchar)
with (kafka_topic='t_raw', value_format='JSON');
for the first level, then
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';
This seems to work, but with the recent addition of STRUCT
, is there a way to use it in lieu of extractjsonfield
as done above?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
If not with STRUCT
, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql
, ergo the apache-kafka-streams tag)?
Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> )
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
There is some (not-so-recent) discussion on being able to do something like
CREATE STREAM key1 ( a INT, b VARCHAR ) AS
SELECT data->* from some_input where type = 'key1';
FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield
bug and enabled this solution.
The real data has several more similar message types. They all contain "type"
and "data"
keys (and no others at the top-level), and almost all have the "ts"
timestamp equivalent nested within "data"
.
apache-kafka apache-kafka-streams ksql
I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way
– cricket_007
Nov 23 '18 at 2:23
@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql
mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.
– r2evans
Nov 23 '18 at 5:16
@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional"data"
definition there, nor in the def withincreate stream ...
, so that doesn't surprise me wholly. Thanks.
– r2evans
Nov 23 '18 at 5:17
1
For this case, you should mention this in the question :)
– Matthias J. Sax
Nov 23 '18 at 5:22
Working on an edit as we type ...
– r2evans
Nov 23 '18 at 5:25
|
show 1 more comment
(Edit: slight edits to better reflect intention, but large edit due to progress made.)
A topic "t_raw"
is given messages of multiple types, where they all contain a common "type"
key:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT
for everything, but my current effort has me doing this:
create stream raw (type varchar, data varchar)
with (kafka_topic='t_raw', value_format='JSON');
for the first level, then
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';
This seems to work, but with the recent addition of STRUCT
, is there a way to use it in lieu of extractjsonfield
as done above?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
If not with STRUCT
, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql
, ergo the apache-kafka-streams tag)?
Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> )
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
There is some (not-so-recent) discussion on being able to do something like
CREATE STREAM key1 ( a INT, b VARCHAR ) AS
SELECT data->* from some_input where type = 'key1';
FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield
bug and enabled this solution.
The real data has several more similar message types. They all contain "type"
and "data"
keys (and no others at the top-level), and almost all have the "ts"
timestamp equivalent nested within "data"
.
apache-kafka apache-kafka-streams ksql
(Edit: slight edits to better reflect intention, but large edit due to progress made.)
A topic "t_raw"
is given messages of multiple types, where they all contain a common "type"
key:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
Ultimately, I need to split this into other streams where they will be chopped/aggregated/processed. I'd like to be able to use STRUCT
for everything, but my current effort has me doing this:
create stream raw (type varchar, data varchar)
with (kafka_topic='t_raw', value_format='JSON');
for the first level, then
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as
select
extractjsonfield(data, '$.ts') as ts,
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c,
extractjsonfield(data, '$.d') as d
from raw where type='key2';
This seems to work, but with the recent addition of STRUCT
, is there a way to use it in lieu of extractjsonfield
as done above?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
If not with STRUCT
, is there a straight-forward way to do this with vanilla kafka-streams (vice ksql
, ergo the apache-kafka-streams tag)?
Is there a more kafka-esque/efficient/elegant way to parse this?
I cannot define it as an empty STRUCT<>
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> )
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
There is some (not-so-recent) discussion on being able to do something like
CREATE STREAM key1 ( a INT, b VARCHAR ) AS
SELECT data->* from some_input where type = 'key1';
FYI: the above solution will not work in confluent-5.0.0, a recent patch fixed the extractjsonfield
bug and enabled this solution.
The real data has several more similar message types. They all contain "type"
and "data"
keys (and no others at the top-level), and almost all have the "ts"
timestamp equivalent nested within "data"
.
apache-kafka apache-kafka-streams ksql
apache-kafka apache-kafka-streams ksql
edited Nov 23 '18 at 9:55
Robin Moffatt
6,7671329
6,7671329
asked Nov 22 '18 at 21:59
r2evansr2evans
26.2k33058
26.2k33058
I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way
– cricket_007
Nov 23 '18 at 2:23
@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql
mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.
– r2evans
Nov 23 '18 at 5:16
@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional"data"
definition there, nor in the def withincreate stream ...
, so that doesn't surprise me wholly. Thanks.
– r2evans
Nov 23 '18 at 5:17
1
For this case, you should mention this in the question :)
– Matthias J. Sax
Nov 23 '18 at 5:22
Working on an edit as we type ...
– r2evans
Nov 23 '18 at 5:25
|
show 1 more comment
I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way
– cricket_007
Nov 23 '18 at 2:23
@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-ksql
mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.
– r2evans
Nov 23 '18 at 5:16
@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional"data"
definition there, nor in the def withincreate stream ...
, so that doesn't surprise me wholly. Thanks.
– r2evans
Nov 23 '18 at 5:17
1
For this case, you should mention this in the question :)
– Matthias J. Sax
Nov 23 '18 at 5:22
Working on an edit as we type ...
– r2evans
Nov 23 '18 at 5:25
I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way
– cricket_007
Nov 23 '18 at 2:23
I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way
– cricket_007
Nov 23 '18 at 2:23
@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-
ksql
mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.– r2evans
Nov 23 '18 at 5:16
@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-
ksql
mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.– r2evans
Nov 23 '18 at 5:16
@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional
"data"
definition there, nor in the def within create stream ...
, so that doesn't surprise me wholly. Thanks.– r2evans
Nov 23 '18 at 5:17
@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional
"data"
definition there, nor in the def within create stream ...
, so that doesn't surprise me wholly. Thanks.– r2evans
Nov 23 '18 at 5:17
1
1
For this case, you should mention this in the question :)
– Matthias J. Sax
Nov 23 '18 at 5:22
For this case, you should mention this in the question :)
– Matthias J. Sax
Nov 23 '18 at 5:22
Working on an edit as we type ...
– r2evans
Nov 23 '18 at 5:25
Working on an edit as we type ...
– r2evans
Nov 23 '18 at 5:25
|
show 1 more comment
1 Answer
1
active
oldest
votes
Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null
value.
Test data setup
Populate some test data into the topic:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
Dump the topic to KSQL console for inspection:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
Model the source stream of data
Create a stream over it. Note the use of STRUCT
and reference of every possible column:
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');
Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
Query the types individually, using the ->
operator to access the nested elements:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
Persist the data in separate Kafka topics:
Populate target topics with the separated data:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
Schema for the new streams:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
Topics underpin each KSQL stream:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'edstruct
versus theextractjsonfield
method?
– r2evans
Nov 23 '18 at 20:34
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438413%2fdefine-ksql-struct-on-json-valued-topic-with-different-types%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null
value.
Test data setup
Populate some test data into the topic:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
Dump the topic to KSQL console for inspection:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
Model the source stream of data
Create a stream over it. Note the use of STRUCT
and reference of every possible column:
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');
Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
Query the types individually, using the ->
operator to access the nested elements:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
Persist the data in separate Kafka topics:
Populate target topics with the separated data:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
Schema for the new streams:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
Topics underpin each KSQL stream:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'edstruct
versus theextractjsonfield
method?
– r2evans
Nov 23 '18 at 20:34
add a comment |
Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null
value.
Test data setup
Populate some test data into the topic:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
Dump the topic to KSQL console for inspection:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
Model the source stream of data
Create a stream over it. Note the use of STRUCT
and reference of every possible column:
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');
Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
Query the types individually, using the ->
operator to access the nested elements:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
Persist the data in separate Kafka topics:
Populate target topics with the separated data:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
Schema for the new streams:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
Topics underpin each KSQL stream:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'edstruct
versus theextractjsonfield
method?
– r2evans
Nov 23 '18 at 20:34
add a comment |
Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null
value.
Test data setup
Populate some test data into the topic:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
Dump the topic to KSQL console for inspection:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
Model the source stream of data
Create a stream over it. Note the use of STRUCT
and reference of every possible column:
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');
Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
Query the types individually, using the ->
operator to access the nested elements:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
Persist the data in separate Kafka topics:
Populate target topics with the separated data:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
Schema for the new streams:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
Topics underpin each KSQL stream:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
Yes, you can do this - KSQL doesn't mind if a column doesn't exist, you just get a null
value.
Test data setup
Populate some test data into the topic:
kafkacat -b kafka:29092 -t t_raw -P <<EOF
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
EOF
Dump the topic to KSQL console for inspection:
ksql> PRINT 't_raw' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"ROWTIME":1542965737436,"ROWKEY":"null","type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"ROWTIME":1542965737437,"ROWKEY":"null","type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
^CTopic printing ceased
ksql>
Model the source stream of data
Create a stream over it. Note the use of STRUCT
and reference of every possible column:
CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
TS VARCHAR,
A INT,
B VARCHAR,
C INT,
D VARCHAR>)
WITH (KAFKA_TOPIC='t_raw',
VALUE_FORMAT='JSON');
Set offset to earliest so that we query the whole topic, and then use KSQL to access the full stream:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
ksql> SELECT * FROM T;
1542965737436 | null | key1 | {TS=2018-11-20 19:20:21.1, A=1, B=hello, C=null, D=null}
1542965737436 | null | key2 | {TS=2018-11-20 19:20:22.2, A=1, B=null, C=11, D=goodbye}
1542965737436 | null | key1 | {TS=2018-11-20 19:20:23.3, A=2, B=hello2, C=null, D=null}
1542965737437 | null | key2 | {TS=2018-11-20 19:20:24.4, A=3, B=null, C=22, D=goodbye2}
^CQuery terminated
Query the types individually, using the ->
operator to access the nested elements:
ksql> SELECT DATA->A,DATA->B FROM T WHERE TYPE='key1' LIMIT 2;
1 | hello
2 | hello2
ksql> SELECT DATA->A,DATA->C,DATA->D FROM T WHERE TYPE='key2' LIMIT 2;
1 | 11 | goodbye
3 | 22 | goodbye2
Persist the data in separate Kafka topics:
Populate target topics with the separated data:
ksql> CREATE STREAM TYPE_1 AS SELECT DATA->TS, DATA->A, DATA->B FROM T WHERE TYPE='key1';
Message
----------------------------
Stream created and running
----------------------------
ksql> CREATE STREAM TYPE_2 AS SELECT DATA->TS, DATA->A, DATA->C, DATA->D FROM T WHERE TYPE='key2';
Message
----------------------------
Stream created and running
----------------------------
Schema for the new streams:
ksql> DESCRIBE TYPE_1;
Name : TYPE_1
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__B | VARCHAR(STRING)
--------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> DESCRIBE TYPE_2;
Name : TYPE_2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__TS | VARCHAR(STRING)
DATA__A | INTEGER
DATA__C | INTEGER
DATA__D | VARCHAR(STRING)
--------------------------------------
Topics underpin each KSQL stream:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
t_raw | true | 1 | 1 | 2 | 2
TYPE_1 | true | 4 | 1 | 0 | 0
TYPE_2 | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
answered Nov 23 '18 at 9:54
Robin MoffattRobin Moffatt
6,7671329
6,7671329
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'edstruct
versus theextractjsonfield
method?
– r2evans
Nov 23 '18 at 20:34
add a comment |
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'edstruct
versus theextractjsonfield
method?
– r2evans
Nov 23 '18 at 20:34
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed
struct
versus the extractjsonfield
method?– r2evans
Nov 23 '18 at 20:34
Thanks! Nice walk-through. I don't know yet how best to benchmark ... do you know if there is any "significant" difference (in speed, robustness, etc) between using this union'ed
struct
versus the extractjsonfield
method?– r2evans
Nov 23 '18 at 20:34
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53438413%2fdefine-ksql-struct-on-json-valued-topic-with-different-types%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I know this is possible with Avro, and making that c field in the data have some otherwise default value in the Avro schema, but not sure if JSON can work this way
– cricket_007
Nov 23 '18 at 2:23
@MatthiasJ.Sax, the reason I had (and have kept) the apache-kafka-streams is that, since this is a stream and not (yet) a table, it might be solved via a non-
ksql
mechanism. I'm open to it, even though it isn't a proficiency of mine. Thanks for the edit, though.– r2evans
Nov 23 '18 at 5:16
@cricket_007, I didn't see anything in the avro spec that allowed me to have a conditional
"data"
definition there, nor in the def withincreate stream ...
, so that doesn't surprise me wholly. Thanks.– r2evans
Nov 23 '18 at 5:17
1
For this case, you should mention this in the question :)
– Matthias J. Sax
Nov 23 '18 at 5:22
Working on an edit as we type ...
– r2evans
Nov 23 '18 at 5:25