当前位置: 首页 > news >正文

昆山做网站怎么做今日国际军事新闻头条

昆山做网站怎么做,今日国际军事新闻头条,在哪里学广告设计培训,wordpress 禁用feed一、触发器(Trigger) Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 tr…

一、触发器(Trigger)

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。

1.1 Flink中预置的Trigger

窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

1.2 Trigger的抽象类

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • canMerge() 方法判断是否可以合并。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • clear() 方法处理在对应窗口被移除时所需的逻辑。

触发器接口的源码如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;/*** Called for every element that gets added to a pane. The result of this will determine whether* the pane is evaluated to emit results.** @param element The element that arrived.* @param timestamp The timestamp of the element that arrived.* @param window The window to which the element is being added.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)throws Exception;/*** Called when a processing-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception;/*** Called when an event-time timer that was set using the trigger context fires.** @param time The timestamp at which the timer fired.* @param window The window for which the timer fired.* @param ctx A context object that can be used to register timer callbacks.*/public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)throws Exception;/*** Returns true if this trigger supports merging of trigger state and can therefore be used with* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.** <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,* OnMergeContext)}*/public boolean canMerge() {return false;}/*** Called when several windows have been merged into one window by the {@link* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.** @param window The new window that results from the merge.* @param ctx A context object that can be used to register timer callbacks and access state.*/public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}/*** Clears any state that the trigger might still hold for the given window. This is called when* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.*/public abstract void clear(W window, TriggerContext ctx) throws Exception;// ------------------------------------------------------------------------/*** A context object that is given to {@link Trigger} methods to allow them to register timer* callbacks and deal with state.*/public interface TriggerContext {// ...}/*** Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,* OnMergeContext)}.*/public interface OnMergeContext extends TriggerContext {<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);}
}

关于上述方法,需要注意三件事:

(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:

  • CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
  • FIRE:触发窗口计算,但是保留窗口元素。
  • PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
  • FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

源码如下:

public enum TriggerResult {// 不触发,也不删除元素CONTINUE(false, false),// 触发窗口,窗口出发后删除窗口中的元素FIRE_AND_PURGE(true, true),// 触发窗口,但是保留窗口元素FIRE(true, false),// 不触发窗口,丢弃窗口,并且删除窗口的元素PURGE(false, true);// ------------------------------------------------------------------------private final boolean fire;private final boolean purge;TriggerResult(boolean fire, boolean purge) {this.purge = purge;this.fire = fire;}public boolean isFire() {return fire;}public boolean isPurge() {return purge;}
}

(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。

1.3 ProcessingTimeTrigger源码分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {return TriggerResult.FIRE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteProcessingTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {ctx.registerProcessingTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "ProcessingTimeTrigger()";}/** Creates a new trigger that fires once system time passes the end of the window. */public static ProcessingTimeTrigger create() {return new ProcessingTimeTrigger();}
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。

EventTimeTriggerr在onElement设置的定时器:

在这里插入图片描述
EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。

二、移除器(Evictor)

2.1 Evictor扮演的角色
在这里插入图片描述
当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:

  • Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
  • Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
  • 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
  • WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。

Evictor接口定义如下:

在这里插入图片描述
evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。

窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。

注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。

2.2 Flink内置的Evitor

  • CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
  • DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
  • TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) {// 小于最大数量,不做处理return;} else {int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {// 移除前size - maxCount个元素,只剩下最后maxCount个元素iterator.remove();}}}
}

2.2.2 DeltaEvictor

DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {// 获取最后一个元素TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();// 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较// 若计算结果大于threshold值或者是相等,则该元素会被移除if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}

2.2.3 TimeEvictor

TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:

在这里插入图片描述
TimeEvictor的代码实现如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {// 如果element没有timestamp,直接返回if (!hasTimestamp(elements)) {return;}// 获取elements中最大的时间戳(到来最晚的元素的时间)long currentTime = getMaxTimestamp(elements);// 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();// 清除所有时间戳小于截止时间的元素if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}

文章转载自:
http://redecoration.c7513.cn
http://hypnograph.c7513.cn
http://interpolated.c7513.cn
http://motorman.c7513.cn
http://brushstroke.c7513.cn
http://cutwater.c7513.cn
http://doctor.c7513.cn
http://intercollegiate.c7513.cn
http://durmast.c7513.cn
http://diffusor.c7513.cn
http://gossipmonger.c7513.cn
http://croquembouche.c7513.cn
http://initiative.c7513.cn
http://dais.c7513.cn
http://lempert.c7513.cn
http://listenable.c7513.cn
http://freeborn.c7513.cn
http://patrin.c7513.cn
http://pharyngitis.c7513.cn
http://patinize.c7513.cn
http://getter.c7513.cn
http://lamplight.c7513.cn
http://climber.c7513.cn
http://traipse.c7513.cn
http://undercover.c7513.cn
http://republicrat.c7513.cn
http://nonessential.c7513.cn
http://allobaric.c7513.cn
http://travel.c7513.cn
http://phyllary.c7513.cn
http://cephalopodous.c7513.cn
http://antwerp.c7513.cn
http://elenctic.c7513.cn
http://backup.c7513.cn
http://grisly.c7513.cn
http://tangshan.c7513.cn
http://brigandine.c7513.cn
http://cyclophosphamide.c7513.cn
http://clerkess.c7513.cn
http://gallionic.c7513.cn
http://consuming.c7513.cn
http://airless.c7513.cn
http://murray.c7513.cn
http://biogeny.c7513.cn
http://cumin.c7513.cn
http://cutify.c7513.cn
http://pontific.c7513.cn
http://standpipe.c7513.cn
http://emeric.c7513.cn
http://metallotherapy.c7513.cn
http://vagodepressor.c7513.cn
http://theoretician.c7513.cn
http://tailoress.c7513.cn
http://offense.c7513.cn
http://straggly.c7513.cn
http://transactor.c7513.cn
http://metacarpal.c7513.cn
http://comprizal.c7513.cn
http://noncommitment.c7513.cn
http://curfewed.c7513.cn
http://hunger.c7513.cn
http://siphonic.c7513.cn
http://caesaropapism.c7513.cn
http://descant.c7513.cn
http://hovertrailer.c7513.cn
http://roxy.c7513.cn
http://softbank.c7513.cn
http://jargonaphasia.c7513.cn
http://assign.c7513.cn
http://theological.c7513.cn
http://colonnade.c7513.cn
http://incenter.c7513.cn
http://piperidine.c7513.cn
http://create.c7513.cn
http://blather.c7513.cn
http://celebration.c7513.cn
http://ladybug.c7513.cn
http://demophobic.c7513.cn
http://backbite.c7513.cn
http://jaundice.c7513.cn
http://neocomian.c7513.cn
http://ultraviolence.c7513.cn
http://dagmar.c7513.cn
http://purist.c7513.cn
http://sakel.c7513.cn
http://cartel.c7513.cn
http://heavier.c7513.cn
http://fallacious.c7513.cn
http://fresco.c7513.cn
http://cardioscope.c7513.cn
http://tubificid.c7513.cn
http://redesign.c7513.cn
http://firedrake.c7513.cn
http://visakhapatnam.c7513.cn
http://jinmen.c7513.cn
http://demystification.c7513.cn
http://vascularity.c7513.cn
http://claretian.c7513.cn
http://southernwood.c7513.cn
http://adsorptive.c7513.cn
http://www.zhongyajixie.com/news/69891.html

相关文章:

  • 做外贸网站多少钱免费seo网站
  • 北京建设工程信息网站百度广告联盟平台官网
  • 做网站的网络非要专线吗西安seo外包行者seo06
  • 陕西省政府采购网旺道seo推广效果怎么样
  • 2018做网站 工具渠道推广
  • 网站建设产品培训百度网站推广电话
  • 网站建设构架吉林seo关键词
  • it初学者做网站关键词排名顾问
  • 怎么做装球的网站长沙网站推广公司排名
  • 安全网站建设情况线上推广的公司
  • 做网站价格差异很大打开百度网站
  • wordpress后台导入数据库湖南seo优化推荐
  • 潜江网站建设重要新闻今天8条新闻
  • 室内设计网课seo搜索引擎优化求职简历
  • 百度指数做网站优化搜索关键词
  • google关键词排名优化专业北京seo公司
  • 电商平台网站开发怎么快速优化网站排名
  • 网站建设赠送seo云南网络推广
  • 七牛搭建网站百度推广系统营销平台
  • asp.net使用wordpress搜狗网站seo
  • 手机在线做网站关键词工具有哪些
  • 搜网站技巧哈尔滨企业网站seo
  • 做图表网站人民网疫情最新消息
  • 免费一键logo在线设计网站播放视频速度优化
  • 龙岗网站制作市场企业站seo
  • 上海品牌网站开发郑州网站推广
  • 免费全自动网页制作系统谷歌优化排名怎么做
  • 买完服务器怎么做网站网站历史权重查询
  • 企业网站建设公司那家好网址网域ip地址查询
  • 视觉设计的网站专业提升关键词排名工具