导读:在日常开发中常有这么一个场景,采集如日志等数据后以JSON形式存储到Kafka中,再由Flink从Kafka中获取数据并进行处理。但是有时候JSON比较复杂(多层嵌套),在FlinkSQL中解析起来比较麻烦,下面将讨论Flink SQL(1.10版本) 如何解析复杂JSON。
官网Demo
//JSON Format
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats
首先查看官网给出的一个例子,大致的解决思路为使用 format.json-schema,自定义一个format schema。
//官网例子
CREATE TABLE MyUserTable (
...
) WITH (
'format.type' = 'json', -- required: specify the format type
'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default
'format.fields.0.name' = 'lon', -- optional: define the schema explicitly using type information.
'format.fields.0.data-type' = 'FLOAT', -- This overrides default behavior that uses table's schema as format schema.
'format.fields.1.name' = 'rideTime',
'format.fields.1.data-type' = 'TIMESTAMP(3)',
'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP.
'{ -- This also overrides the default behavior.
"type": "object",
"properties": {
"lon": {
"type": "number"
},
"rideTime": {
"type": "string",
"format": "date-time"
}
}
}'
)
分析:Flink 在解析 JSON 的时候,对于复杂的 JSON 可以通过自定义format schema来支持。如果table schema 和 format schema相同,则可以自动派生 json 的 schema(但这种往往不适用于解析复杂JSON )。
实战例子
了解了官网的例子之后,我们手动试验一下。
1、从Kafka中获取复杂JSON用于测试,JSON 如下:
{"code":0,"data":{"request":{"name":"test","id":"ce1beb37-ed3e-4365-8e44-c3bd1d249cfc"}},"message":"SUCCESS","retryCount":1,"success":true}
2、编写format.json-schema
通过参考官网Demo,发现第一层的 retryCount 可直接就映射到字段上,而 data 是多层嵌套,所以定义data 的类型为object ,而properties则是其json的内层数据。我们的例子中为多层嵌套,那么简化对应的 json-schema 如下:
'format.json-schema' = '{
"type": "object",
"properties": {
"retryCount": {type: "string"},
"data":{type: "object",
"properties" : {
"request":{type: "object",
"properties" : {
"id" : {type:"string"}
}
}
}
}
}
}'
3、定义table schema
从上面的 json schame 和 Flink SQL 的映射关系可以看出,data对应的table 字段的类型是ROW,所以 table schema 应是如下:
CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
......
}
4、使用的时候,直接用 "."的方式即可
Table table = bsTableEnv.sqlQuery("SELECT data.request.id AS ID,retryCount FROM sourceTable");
5、Kafka SourceTable完整例子
CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'XXXX',
'connector.properties.zookeeper.connect' = 'XXXX:2181',
'connector.properties.bootstrap.servers' = 'XXXX:9092',
'connector.properties.group.id' = 'XXXXX',
'format.json-schema' = '{
"type": "object",
"properties": {
"retryCount": {type: "string"},
"data":{type: "object",
"properties" : {
"request":{type: "object",
"properties" : {
"id" : {type:"string"}
}
}
}
}
}
}',
'format.type' = 'json');
最后
以上就是在FlinkSQL1.10中处理复杂JSON的一种方式,通过定义format.json schema实现。而在查看Flink中文邮件列表中也发现了其他的一些不错思路,如下:
通过在上游时将就转义成一个String放到JSON的一个field中,这样Flink解析出来就是一个String,
然后编写UDTF进行处理,感兴趣的朋友也可以尝试一下。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。