当前位置: 首页 > news >正文

做网站为什么先交定金搭建一个网站平台需要多少钱

做网站为什么先交定金,搭建一个网站平台需要多少钱,数字营销云,苏州住房和城乡建设局网站时间机制 Flink中的时间机制主要用在判断是否触发时间窗口window的计算。 在Flink中有三种时间概念:ProcessTime、IngestionTime、EventTime。 ProcessTime:是在数据抵达算子产生的时间(Flink默认使用ProcessTime) IngestionT…

时间机制

Flink中的时间机制主要用在判断是否触发时间窗口window的计算。

在Flink中有三种时间概念:ProcessTime、IngestionTime、EventTime。

ProcessTime:是在数据抵达算子产生的时间(Flink默认使用ProcessTime)

IngestionTime:是在DataSource生成数据产生的时间

EventTime:是数据本身携带的时间,具有实际业务含义,不是Flink框架产生的时间

水位机制

由于网络原因、故障等原因,数据的EventTIme并不是单调递增的,是乱序的,有时与当前实际时间相差很大。

水位(watermark)用在EventTime语义的窗口计算,可以当作当前计算节点的时间。当水位超过窗口的endtime,表示事件时间t <= T的数据都**已经到达,**这个窗口就会触发WindowFunction计算。当水位超过窗口的endtime+允许迟到的时间,窗口就会消亡。本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。

  1. 队列中是乱序的数据,流入长度3s的窗口。2s的数据进入[0,4)的窗口中
  2. 2s、3s、1s的数据进入[0,4)的窗口,7s的数据分配到[4,8)的窗口中
  3. 水印4s到达,代表4s以前的数据都已经到达。触发[0,4)的窗口计算,[4,8)的窗口等待数据
  4. 水印9s到达,[4,8)的窗口触发

多并行情况下,不同的watermark流到算子,取最小的wartermark当作当前算子的watermark。

如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

生成水位

首先设置env为事件时间

使用 DataStream API 实现 Flink 任务时,Watermark Assigner 能靠近 Source 节点就靠近 Source 节点,能前置尽量前置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//测试数据,间隔1s发送
DataStreamSource<Tuple2<String, Long>> source = env.addSource(new SourceFunction<Tuple2<String, Long>>() {@Overridepublic void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {ctx.collect(Tuple2.of("aa", 1681909200000L));//2023-04-19 21:00:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681909500000L));//2023-04-19 21:05:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681909800000L));//2023-04-19 21:10:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910100000L));//2023-04-19 21:15:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910400000L));//2023-04-19 21:20:00Thread.sleep(1000);ctx.collect(Tuple2.of("aa", 1681910700000L));//2023-04-19 21:25:00Thread.sleep(Long.MAX_VALUE);}@Overridepublic void cancel() {}});

抽取EventTime、生成Watermark

周期性水位–AssignerWithPeriodicWatermarks�(常用)

周期性生成水位。周期默认的时间是 200ms.

源码如下:

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}

自定义实现AssignerWithPeriodicWatermarks,代码如下:

source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {private long currentTimestamp;@Nullable@Override// 生成watermarkpublic Watermark getCurrentWatermark() {return new Watermark(currentTimestamp);}@Override//获取事件时间public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {if (element.f1>=currentTimestamp){currentTimestamp = element.f1;}return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

运行结果如下:

水位时间到达2023-04-19 21:10:00触发窗口2023-04-19 21:00:00到2023-04-19 21:10:00,窗口中的数据为2023-04-19 21:00:00和2023-04-19 21:05:00

水位时间到达2023-04-19 21:20:00触发窗口2023-04-19 21:10:00到2023-04-19 21:20:00,窗口中的数据为2023-04-19 21:10:00和2023-04-19 21:15:00

长时间等待后,2023-04-19 21:20:00到2023-04-19 21:30:00是存在一个2023-04-19 21:25:00的数据,一直没有触发。这是因为没有新的数据进入,周期性生成的watermark一直是2023-04-19 21:20:00。所以后面窗口即使有数据也没有触发计算。

BoundedOutOfOrdernessTimestampExtractor�

BoundedOutOfOrdernessTimestampExtractor实现了AssignerWithPeriodicWatermarks接口,是flink内置的实现类。

主要源码如下:

public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() < 0) {throw new RuntimeException("Tried to set the maximum allowed " +"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");}this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;}public abstract long extractTimestamp(T element);@Overridepublic final Watermark getCurrentWatermark() {long potentialWM = currentMaxTimestamp - maxOutOfOrderness;if (potentialWM >= lastEmittedWatermark) {lastEmittedWatermark = potentialWM;}return new Watermark(lastEmittedWatermark);}@Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp = extractTimestamp(element);if (timestamp > currentMaxTimestamp) {currentMaxTimestamp = timestamp;}return timestamp;}

BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

当然,乱序区间的长度要根据实际环境谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,实时性减弱。

设置maxOutOfOrderness为5min,代码如下:

source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

运行结果如下:

看起来和我们自定义实现结果一样。但是10min的水位时间是来自数据15min减去延迟时间5min得来的。

同理20min的水位时间是来自数据25min减去延迟时间5min得来的。

我们可以设置延迟时间为10min,看一下结果。最后一条数据是25min,那么最后的水位线就是25min-10min=15min。只会触发00-10的窗口。

同样的,由于没有后续数据导致后面的窗口没有触发。

AscendingTimestampExtractor

AscendingTimestampExtractor要求生成的时间戳和水印都是单调递增的。用户实现从数据中获取自增的时间戳extractAscendingTimestamp与上一次时间戳比较。如果出现减少,则打印warn日志。

源码如下:

    public abstract long extractAscendingTimestamp(T element);@Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp = extractAscendingTimestamp(element);if (newTimestamp >= this.currentTimestamp) {this.currentTimestamp = newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}@Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}

间断性水位线

适用于根据接收到的消息判断是否需要产生水位线的情况,用这种水印生成的方式并不多见。

举例如下,数据为15min的时候生成水位。

source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {DateTime date = DateUtil.date(lastElement.f1);return date.minute()==15?new Watermark(lastElement.f1):null;}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

结果如下:

15min的数据生成了15min的水位,只触发了00-10的窗口。

窗口处理迟到的数据

allowedLateness�

Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。

source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:"+ DateUtil.date());System.out.println("当前水位时间:"+DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:"+DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:"+DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:"+DateUtil.date(element.f1)));}}).print();

side output

侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。

// 侧输出的OutputTagOutputTag<Tuple2<String, Long>> lateOutputTag = new OutputTag<>("late_data_output_tag");SingleOutputStreamOperator<Object> process = source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.minutes(5)) {@Overridepublic long extractTimestamp(Tuple2<String, Long> element) {return element.f1;}}).keyBy(value -> value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).sideOutputLateData(lateOutputTag).process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {System.out.println("----------------");System.out.println("系统当前时间:" + DateUtil.date());System.out.println("当前水位时间:" + DateUtil.date(context.currentWatermark()));System.out.println("窗口开始时间:" + DateUtil.date(context.window().getStart()));System.out.println("窗口结束时间:" + DateUtil.date(context.window().getEnd()));elements.forEach(element -> System.out.println("数据携带时间:" + DateUtil.date(element.f1)));}});//处理侧输出数据
//        process.getSideOutput(lateOutputTag).addSink()

最后的window不触发解决方法

自定义自增水位

周期性获取watermark时,自定义增加水位

source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {private long currentTimestamp;@Nullable@Override// 生成watermarkpublic Watermark getCurrentWatermark() {currentTimestamp+=60000;return new Watermark(currentTimestamp);}@Override//获取事件时间public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {if (element.f1>=currentTimestamp){currentTimestamp = element.f1;}return element.f1;}})

结果如下:

自定义trigger

当watermark不能满足关窗条件时,我们给注册一个晚于事件时间的处理时间定时器使它一定能达到关窗条件。

import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class MyTrigger extends Trigger<Object, TimeWindow> {@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());ctx.registerProcessingTimeTimer(window.maxTimestamp() + 30000L);return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());return TriggerResult.FIRE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (time == window.maxTimestamp()) {ctx.deleteProcessingTimeTimer(window.maxTimestamp() + 30000L);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());ctx.deleteProcessingTimeTimer(window.maxTimestamp() + 30000L);}
}

Test.java

参考链接:

https://www.jianshu.com/p/c612e95a5028

https://blog.csdn.net/lixinkuan328/article/details/104129671

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/event-time/generating_watermarks/

https://cloud.tencent.com/developer/article/1573079

https://blog.csdn.net/m0_73707775/article/details/129560540?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3&utm_relevant_index=10

http://www.zhongyajixie.com/news/16459.html

相关文章:

  • 如何在公司建网站系统百度游戏
  • 网站架构方案企业查询网站
  • 赣州网站优化郴州网站建设
  • 网站建设方案概述网络优化工作应该怎么做
  • 做外贸没有网站需要注意什么条件打开百度
  • 公安机关做网站备案吗百度快照优化的优势是什么
  • 2008r2做网站苏州seo排名公司
  • 长城建设投资有限公司网站免费优化推广网站的软件
  • 南宁做网站比较好的公司有哪些郑州新闻发布
  • 合肥外贸网站建设公司价格百度助手
  • 专业做招聘网站百度官网登录入口手机版
  • 河南教育平台网站建设app推广项目从哪接一手
  • 招聘网站的SEO怎么做百度快照是什么
  • 微信公众商城网站开发实时积分榜
  • 做物流行业网站网络推广工作内容
  • 可以做外包的网站网络营销首先要做什么
  • 奎文营销型网站建设市场营销方案怎么写
  • 辽宁千山科技做网站怎么样合肥百度搜索排名优化
  • wordpress视频播放器插件北京seo业务员
  • 网站建设寮步成都seo外包
  • 网站文档怎么加图片不显示不出来信息流广告是什么意思
  • 破解wordpress隐藏内容seo是什么意思的缩写
  • 网站后台怎么做alt标签做个网页需要多少钱?
  • 办公室设计费一般多少钱一平端点seo博客
  • 可以做微信游戏的网站有哪些百度手机助手应用商店
  • 南京网站建设制作郑州网站建设哪家好
  • 有哪些做网站好的公司外贸网站推广方法之一
  • 淘宝客推广网站怎么做网络营销的六大特征
  • 做网站千篇一律手机制作网站的软件
  • 网站建设后台什么意思郑州网站推广多少钱