博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink SQL解析Json格式数据的方法
阅读量:6879 次
发布时间:2019-06-26

本文共 24122 字,大约阅读时间需要 80 分钟。

1. Flink版本1.7.2

2. 引入依赖

使用maven构建工程,因此pom.xml添加如下依赖:

org.apache.flink
flink-table_2.11
1.7.2
org.apache.flink
flink-json
1.7.2
com.fasterxml.jackson.core
jackson-databind
2.9.8
joda-time
joda-time
2.10.1

3. Google Protobuf消息定义

3.1 消息定义

response.proto文件

syntax = "proto3";package com.google.protos;//搜索响应message SearchResponse {    uint64 search_time = 1;    uint32 code = 2;    Result results = 3;}//搜索结果message Result {    string id = 1;    repeated Item items = 2;}//搜索结果项message Item{    string id = 1;    string name = 2;    string title = 3;    string url = 4;    uint64 publish_time = 5;    float score = 6;    //推荐或者相似加权分值}

消息示例,包含嵌套对象results以及数组对象items:

{    "search_time":1553650604,    "code":200,    "results":{        "id":"449",        "items":[            {                "id":"47",                "name":"name47",                "title":"标题47",                "url":"https://www.google.com.hk/item-47",                "publish_time":1552884870,                "score":96.03            },            {                "id":"2",                "name":"name2",                "title":"标题2",                "url":"https://www.google.com.hk/item-2",                "publish_time":1552978902,                "score":16.06            },            {                "id":"60",                "name":"name60",                "title":"标题60",                "url":"https://www.google.com.hk/item-60",                "publish_time":1553444982,                "score":62.58            },            {                "id":"67",                "name":"name67",                "title":"标题67",                "url":"https://www.google.com.hk/item-67",                "publish_time":1553522957,                "score":12.17            },            {                "id":"15",                "name":"name15",                "title":"标题15",                "url":"https://www.google.com.hk/item-15",                "publish_time":1553525421,                "score":32.36            },            {                "id":"53",                "name":"name53",                "title":"标题53",                "url":"https://www.google.com.hk/item-53",                "publish_time":1553109227,                "score":52.13            },            {                "id":"70",                "name":"name70",                "title":"标题70",                "url":"https://www.google.com.hk/item-70",                "publish_time":1552781921,                "score":1.72            },            {                "id":"53",                "name":"name53",                "title":"标题53",                "url":"https://www.google.com.hk/item-53",                "publish_time":1553229003,                "score":5.31            },            {                "id":"30",                "name":"name30",                "title":"标题30",                "url":"https://www.google.com.hk/item-30",                "publish_time":1553282629,                "score":26.51            },            {                "id":"36",                "name":"name36",                "title":"标题36",                "url":"https://www.google.com.hk/item-36",                "publish_time":1552665833,                "score":48.76            }        ]    }}

3.2 Kakfa Producer发布随机响应Json串

import com.google.protos.GoogleProtobuf.*;import com.googlecode.protobuf.format.JsonFormat;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.DecimalFormat;import java.time.Instant;import java.util.Properties;import java.util.Random;import java.util.concurrent.TimeUnit;/** * @author lynn * @ClassName com.lynn.kafka.SearchResponsePublisher * @Description TODO * @Date 19-3-26 上午8:17 * @Version 1.0 **/public class SearchResponsePublisher {    private static final Logger LOG = LoggerFactory.getLogger(SearchResponsePublisher.class);    public String randomMessage(int results){        Random random = new Random();        DecimalFormat fmt = new DecimalFormat("##0.00");        SearchResponse.Builder response = SearchResponse.newBuilder();        response.setSearchTime(Instant.now().getEpochSecond())            .setCode(random.nextBoolean()?200:404);        Result.Builder result = Result.newBuilder()                .setId(""+random.nextInt(1000));        for (int i = 0; i < results; i++) {            int number = random.nextInt(100);            Item.Builder builder = Item.newBuilder()                    .setId(number+"")                    .setName("name"+number)                    .setTitle("标题"+number)                    .setUrl("https://www.google.com.hk/item-"+number)                    .setPublishTime(Instant.now().getEpochSecond() - random.nextInt(1000000))                    .setScore(Float.parseFloat(fmt.format(random.nextInt(99) + random.nextFloat())));            result.addItems(builder.build());        }        response.setResults(result.build());        return new JsonFormat().printToString(response.build());    }    /**     *     * @param args     */    public static void main(String[] args) throws InterruptedException{        if(args.length < 3){            System.err.println("Please input broker.servers and topic and records number!");            System.exit(-1);        }        String brokers = args[0];        String topic = args[1];        int recordsNumber = Integer.parseInt(args[2]);        LOG.info("I will publish {} records...", recordsNumber);        SearchResponsePublisher publisher = new SearchResponsePublisher();//        System.out.println(publisher.randomMessage(10));//        if(recordsNumber == 1000) return;        Properties props = new Properties();        props.put("bootstrap.servers", brokers);        //all:-1        props.put("acks", "all");        props.put("retries", 0);        props.put("batch.size", 16384);        props.put("linger.ms", 1);        props.put("buffer.memory", 33554432);        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        Producer
producer = new KafkaProducer<>(props); int count = 0; while (count++ < recordsNumber){ producer.send(new ProducerRecord
(topic, String.valueOf(Instant.now().toEpochMilli()), publisher.randomMessage(10))); TimeUnit.MILLISECONDS.sleep(100); }// producer.flush(); producer.close(); }}

4. 源代码Java:

4.1 引入pakcages

import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.Types;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.sinks.PrintTableSink;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;

4.2 源代码:

// set up the streaming execution environment        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        env.setParallelism(1);        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);        Kafka kafka = new Kafka().version("0.11")                .topic(sourceTopic)                .startFromEarliest()//                .startFromLatest()                .property("bootstrap.servers", brokers)                .property("group.id", "res")                .property("session.timeout.ms", "30000")                .sinkPartitionerFixed();        tableEnv.connect(kafka)                .withFormat(new Json()                        .failOnMissingField(false)                        .deriveSchema())                .withSchema(new Schema()                        .field("search_time", Types.LONG())                        .field("code", Types.INT())                        .field("results", Types.ROW(                                new String[]{"id", "items"},                                new TypeInformation[]{                                        Types.STRING(),                                        ObjectArrayTypeInfo.getInfoFor(Row[].class,  //Array.newInstance(Row.class, 10).getClass(),                                                Types.ROW(                                                        new String[]{"id", "name", "title", "url", "publish_time", "score"},                                                        new TypeInformation[]{Types.STRING(),Types.STRING(),Types.STRING(),Types.STRING(),Types.LONG(),Types.FLOAT()}                                                        ))})                        )).inAppendMode().registerTableSource("tb_json");//item[1] item[10] 数组下标从1开始String sql4 = "select search_time, code, results.id as result_id, items[1].name as item_1_name, items[2].id as item_2_id\n"                + "from tb_json";        Table table4 = tableEnv.sqlQuery(sql4);        tableEnv.registerTable("tb_item_2", table4);        LOG.info("------------------print {} schema------------------", "tb_item_2");        table4.printSchema();        tableEnv.registerTableSink("console4",                new String[]{"f0", "f1", "f2", "f3", "f4"},                new TypeInformation[]{                        Types.LONG(),Types.INT(),                        Types.STRING(),                        Types.STRING(),                        Types.STRING()                },                new PrintTableSink());        table4.insertInto("console4");        // execute program        env.execute("Flink Table Json Engine");

4.3 SQL语句

select     search_time,     code,     results.id as result_id, //嵌套json子字段    items[1].name as item_1_name,  //数组对象子字段,数组下标从1开始    items[2].id as item_2_idfrom tb_json

  嵌套字段可以通过.连接符直接获取,而数组元素可以通过[下标]获取,下标从1开始,与Java中数组下标从0开始不同.

4.3 Schema定义

  按照Json对象的嵌套以及数组格式进行定义,即无需将每个字段展平进行定义,将嵌套字段定义为Row类型,数组类型定义为ObjectArrayTypeInfoBasicArrayTypeInfo, ObjectArrayTypeInfo的第一个参数为数组类型,如示例中Row[].class 或Array.newInstance(Row.class, 10).getClass()方式获取class.

4.4 经测试发现flink-json*.jar中的代码问题:

convert方法中的类型判断使用==,可能时由于flink版本的原因引起的==运算符没有重载.因此将此运算符替换为.equals()方法.

JsonRowDeserializationSchema.java

private Object convert(JsonNode node, TypeInformation
info) { if (Types.VOID.equals(info) || node.isNull()) { return null; } else if (Types.BOOLEAN.equals(info)) { return node.asBoolean(); } else if (Types.STRING.equals(info)) { return node.asText(); } else if (Types.BIG_DEC.equals(info)) { return node.decimalValue(); } else if (Types.BIG_INT.equals(info)) { return node.bigIntegerValue(); } else if(Types.LONG.equals(info)){ return node.longValue(); } else if(Types.INT.equals(info)){ return node.intValue(); } else if(Types.FLOAT.equals(info)){ return node.floatValue(); } else if(Types.DOUBLE.equals(info)){ return node.doubleValue(); } else if (Types.SQL_DATE.equals(info)) { return Date.valueOf(node.asText()); } else if (Types.SQL_TIME.equals(info)) { // according to RFC 3339 every full-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String time = node.asText(); if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) { throw new IllegalStateException( "Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " + "Format: HH:mm:ss'Z'"); } return Time.valueOf(time.substring(0, time.length() - 1)); } else if (Types.SQL_TIMESTAMP.equals(info)) { // according to RFC 3339 every date-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String timestamp = node.asText(); if (timestamp.indexOf('Z') < 0) { throw new IllegalStateException( "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " + "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); } return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' ')); } else if (info instanceof RowTypeInfo) { return convertRow(node, (RowTypeInfo) info); } else if (info instanceof ObjectArrayTypeInfo) { return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof BasicArrayTypeInfo) { return convertObjectArray(node, ((BasicArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return convertByteArray(node); } else { // for types that were specified without JSON schema // e.g. POJOs try { return objectMapper.treeToValue(node, info.getTypeClass()); } catch (JsonProcessingException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node); } } }

JsonRowSerializationSchema.java

private JsonNode convert(ContainerNode
container, JsonNode reuse, TypeInformation
info, Object object) { if (Types.VOID.equals(info) || object == null) { return container.nullNode(); } else if (Types.BOOLEAN.equals(info)) { return container.booleanNode((Boolean) object); } else if (Types.STRING.equals(info)) { return container.textNode((String) object); } else if (Types.BIG_DEC.equals(info)) { // convert decimal if necessary if (object instanceof BigDecimal) { return container.numberNode((BigDecimal) object); } return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); } else if (Types.BIG_INT.equals(info)) { // convert integer if necessary if (object instanceof BigInteger) { return container.numberNode((BigInteger) object); } return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); } else if(Types.LONG.equals(info)){ if(object instanceof Long){ return container.numberNode((Long) object); } return container.numberNode(Long.valueOf(((Number) object).longValue())); } else if(Types.INT.equals(info)){ if(object instanceof Integer){ return container.numberNode((Integer) object); } return container.numberNode(Integer.valueOf(((Number) object).intValue())); } else if(Types.FLOAT.equals(info)){ if(object instanceof Float){ return container.numberNode((Float) object); } return container.numberNode(Float.valueOf(((Number) object).floatValue())); } else if(Types.DOUBLE.equals(info)){ if(object instanceof Double){ return container.numberNode((Double) object); } return container.numberNode(Double.valueOf(((Number) object).doubleValue())); } else if (Types.SQL_DATE.equals(info)) { return container.textNode(object.toString()); } else if (Types.SQL_TIME.equals(info)) { final Time time = (Time) object; // strip milliseconds if possible if (time.getTime() % 1000 > 0) { return container.textNode(timeFormatWithMillis.format(time)); } return container.textNode(timeFormat.format(time)); } else if (Types.SQL_TIMESTAMP.equals(info)) { return container.textNode(timestampFormat.format((Timestamp) object)); } else if (info instanceof RowTypeInfo) { if (reuse != null && reuse instanceof ObjectNode) { return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object); } else { return convertRow(null, (RowTypeInfo) info, (Row) object); } } else if (info instanceof ObjectArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof BasicArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return container.binaryNode((byte[]) object); } else { // for types that were specified without JSON schema // e.g. POJOs try { return mapper.valueToTree(object); } catch (IllegalArgumentException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e); } } }

4.5 提交jar包到集群运行

添加文件:

resources/META-INF/services/org.apache.flink.table.factories.TableFactory

org.apache.flink.formats.json.JsonRowFormatFactoryorg.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory

由于打包后kafka-connector jar中与json jar中的同名文件会覆盖,需要将两个文件的内容保留.

5. 附PrintTableSink源码

参考阿里巴巴blink分支

scala:
BatchCompatibleStreamTableSink.scala

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flink.table.sinksimport org.apache.flink.table.api._import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}/** Defines an external [[TableSink]] to emit a batch [[Table]] for  * compatible with stream connect plugin.  */trait BatchCompatibleStreamTableSink[T] extends TableSink[T] {  /** Emits the DataStream. */  def emitBoundedStream(boundedStream: DataStream[T]): DataStreamSink[_]}

PrintTableSink.scala

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flink.table.sinksimport java.lang.{Boolean => JBool}import java.util.TimeZoneimport java.util.{Date => JDate}import java.sql.Dateimport java.sql.Timeimport java.sql.Timestampimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}import org.apache.flink.api.java.typeutils.RowTypeInfoimport org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}import org.apache.flink.streaming.api.functions.sink.RichSinkFunctionimport org.apache.flink.streaming.api.operators.StreamingRuntimeContextimport org.apache.flink.types.Rowimport org.apache.flink.configuration.Configurationimport org.apache.flink.table.runtime.functions.DateTimeFunctionsimport org.apache.flink.util.StringUtils/**  * A simple [[TableSink]] to output data to console.  *  */class PrintTableSink()  extends TableSinkBase[JTuple2[JBool, Row]]    with BatchCompatibleStreamTableSink[JTuple2[JBool, Row]]    with UpsertStreamTableSink[Row] {  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]) = {    val sink: PrintSinkFunction = new PrintSinkFunction()    dataStream.addSink(sink).name(sink.toString)  }  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = new PrintTableSink()  override def setKeyFields(keys: Array[String]): Unit = {}  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {}//  override def getRecordType: DataType = DataTypes.createRowType(getFieldTypes, getFieldNames)  override def getRecordType: TypeInformation[Row] = {    new RowTypeInfo(getFieldTypes, getFieldNames)  }  /** Emits the DataStream. */  override def emitBoundedStream(boundedStream: DataStream[JTuple2[JBool, Row]]) = {    val sink: PrintSinkFunction = new PrintSinkFunction()    boundedStream.addSink(sink).name(sink.toString)  }}/**  * Implementation of the SinkFunction writing every tuple to the standard output.  *  */class PrintSinkFunction() extends RichSinkFunction[JTuple2[JBool, Row]] {  private var prefix: String = _  override def open(parameters: Configuration): Unit = {    super.open(parameters)    val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]    prefix = "task-" + (context.getIndexOfThisSubtask + 1) + "> "  }  override def invoke(in: JTuple2[JBool, Row]): Unit = {    val sb = new StringBuilder    val row = in.f1    for (i <- 0 until row.getArity) {      if (i > 0) sb.append(",")      val f = row.getField(i)      if (f.isInstanceOf[Date]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd"))      } else if (f.isInstanceOf[Time]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss"))      } else if (f.isInstanceOf[Timestamp]) {        sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime,          "yyyy-MM-dd HH:mm:ss.SSS"))      } else {        sb.append(StringUtils.arrayAwareToString(f))      }    }    if (in.f0) {      System.out.println(prefix + "(+)" + sb.toString())    } else {      System.out.println(prefix + "(-)" + sb.toString())    }  }  override def close(): Unit = {    this.prefix = ""  }  override def toString: String = "Print to System.out"}

转载于:https://blog.51cto.com/1196740/2369735

你可能感兴趣的文章
在51CTO的第一篇博文
查看>>
Java学习笔记(16)——Java字符串
查看>>
Android 70道面试题汇总不再愁面试
查看>>
Sitecore7.5 安装指南 -- 补充内容
查看>>
mybaits like查询
查看>>
zookeeper报错问题
查看>>
使用JavaMail发送邮件之发送带附件邮件二
查看>>
Linux新建文件和目录
查看>>
ACM图灵奖
查看>>
android安全机制——权限
查看>>
selenium中implicitly_wait对is_element_present()的影响.md
查看>>
省赛热身赛之 K-Nice
查看>>
windows 2008 r2 远程桌面一个用户多登录配置
查看>>
@blankapp/ui,高可定制和主题化的 React Native 组件库
查看>>
Linux用命令修改分辨率,非常有效
查看>>
我的友情链接
查看>>
lvm
查看>>
CentOS6.5下安装tomcat7
查看>>
为程序设置超时
查看>>
通过持续ping来检测网络质量
查看>>