洞察掌握android电视app开发中的安全与合规策略,提升企业运营效率
802
2022-09-09
Flink定义时间属性Demo
案例来自尚硅谷…
概述
基于时间的操作(比如Table API和SQL中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
时间属性,可以是每个表schema的一部分。一旦定义了时间属性,它就可以作为一个字段引用,并且可以在基于时间的操作中使用。
时间属性的行为类似于常规时间戳,可以访问,并且进行计算。
sensor.txt
sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,37.2sensor_1,1547718212,33.5sensor_1,1547718215,38.1
代码
import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.table.api.{EnvironmentSettings, Table}import org.apache.flink.table.api.scala._import org.apache.flink.types.Rowobject TimeAndWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 读取数据创建DataStream val inputStream: DataStream[String] = env.readTextFile("D:\\20-Flink\\FlinkTutorial\\src\\main\\resources\\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) // 创建表环境 val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts, 'temperature, 'pt.proctime) sensorTable.printSchema() // 定义一行的数据结构 sensorTable.toAppendStream[Row].print() env.execute("time and window test") }}
打印结果
root |-- id: STRING |-- ts: BIGINT |-- temperature: DOUBLE |-- pt: TIMESTAMP(3) *PROCTIME*sensor_1,1547718199,35.8,2021-07-13 08:11:09.802sensor_6,1547718201,15.4,2021-07-13 08:11:09.816sensor_7,1547718202,6.7,2021-07-13 08:11:09.816sensor_10,1547718205,38.1,2021-07-13 08:11:09.818sensor_1,1547718207,37.2,2021-07-13 08:11:09.818sensor_1,1547718212,33.5,2021-07-13 08:11:09.818sensor_1,1547718215,38.1,2021-07-13 08:11:09.819
可以看到表架构多了个pt: TIMESTAMP(3) PROCTIME这个东西,
然后输出结果每行最后都多了个时间字段
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~