FlinkSQL处理复杂JSON的思路_flink处理json数据

导读:在日常开发中常有这么一个场景,采集如日志等数据后以JSON形式存储到Kafka中,再由Flink从Kafka中获取数据并进行处理。但是有时候JSON比较复杂(多层嵌套),在FlinkSQL中解析起来比较麻烦,下面将讨论Flink SQL(1.10版本) 如何解析复杂JSON。

官网Demo

//JSON Format

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats

首先查看官网给出的一个例子,大致的解决思路为使用 format.json-schema,自定义一个format schema。

//官网例子
CREATE TABLE MyUserTable (
  ...
) WITH (
  'format.type' = 'json',                   -- required: specify the format type
  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail if a field is missing or not, false by default

  'format.fields.0.name' = 'lon',           -- optional: define the schema explicitly using type information.
  'format.fields.0.data-type' = 'FLOAT',    -- This overrides default behavior that uses table's schema as format schema.
  'format.fields.1.name' = 'rideTime',
  'format.fields.1.data-type' = 'TIMESTAMP(3)',

  'format.json-schema' =                    -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP.
    '{                                      -- This also overrides the default behavior.
      "type": "object",
      "properties": {
        "lon": {
          "type": "number"
        },
        "rideTime": {
          "type": "string",
          "format": "date-time"
        }
      }
    }'
)

分析:Flink 在解析 JSON 的时候,对于复杂的 JSON 可以通过自定义format schema来支持。如果table schema 和 format schema相同,则可以自动派生 json 的 schema(但这种往往不适用于解析复杂JSON )。

实战例子

了解了官网的例子之后,我们手动试验一下。

1、从Kafka中获取复杂JSON用于测试,JSON 如下:

{"code":0,"data":{"request":{"name":"test","id":"ce1beb37-ed3e-4365-8e44-c3bd1d249cfc"}},"message":"SUCCESS","retryCount":1,"success":true}

2、编写format.json-schema

通过参考官网Demo,发现第一层的 retryCount 可直接就映射到字段上,而 data 是多层嵌套,所以定义data 的类型为object ,而properties则是其json的内层数据。我们的例子中为多层嵌套,那么简化对应的 json-schema 如下:

'format.json-schema' = '{
        "type": "object",
        "properties": {
            "retryCount": {type: "string"},
            "data":{type: "object",
                   "properties" : {
                      "request":{type: "object",
                         "properties" : {
                          "id" : {type:"string"}
                         }
                      }
                   }
             }
        }
    }'

3、定义table schema

从上面的 json schame 和 Flink SQL 的映射关系可以看出,data对应的table 字段的类型是ROW,所以 table schema 应是如下:

CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
  ......
  }

4、使用的时候,直接用 "."的方式即可

Table table = bsTableEnv.sqlQuery("SELECT data.request.id AS ID,retryCount FROM sourceTable");

5、Kafka SourceTable完整例子

CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'XXXX',
'connector.properties.zookeeper.connect' = 'XXXX:2181',
'connector.properties.bootstrap.servers' = 'XXXX:9092',
'connector.properties.group.id' = 'XXXXX',
'format.json-schema' = '{
        "type": "object",
        "properties": {
            "retryCount": {type: "string"},
            "data":{type: "object",
                   "properties" : {
                      "request":{type: "object",
                         "properties" : {
                          "id" : {type:"string"}
                         }
                      }
                   }
             }
        }
    }',
'format.type' = 'json');

最后

以上就是在FlinkSQL1.10中处理复杂JSON的一种方式,通过定义format.json schema实现。而在查看Flink中文邮件列表中也发现了其他的一些不错思路,如下:

通过在上游时将就转义成一个String放到JSON的一个field中,这样Flink解析出来就是一个String,
然后编写UDTF进行处理,感兴趣的朋友也可以尝试一下。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。