当前位置: 首页 > news >正文

怎么做ppt教程网站网络推广方法有几种

怎么做ppt教程网站,网络推广方法有几种,廊坊短视频优化案例,管理网站制作目录 1.背景 2.coGroup算子源码分析 2.1完整的coGroup算子调用流程 2.2coGroup方法入口 2.3 CoGroupedStreams对象分析 2.4WithWindow内部类分析 2.5CoGroupWindowFunction函数分析 3.修改源码支持获取迟到数据测输出流 3.1复制CoGroupedStreams 3.2新增WithWindow.si…

目录

1.背景

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

2.2coGroup方法入口

2.3 CoGroupedStreams对象分析

2.4WithWindow内部类分析

2.5CoGroupWindowFunction函数分析

3.修改源码支持获取迟到数据测输出流

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

3.3新增WithWindow构造方法

3.4修改apply方法

3.5开放UnionTypeInfo类的public权限

3.7项目中查看maven是否已经刷新为最新代码

4.测试


1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

    input1.coGroup(input2).where(keySelector1).equalTo(keySelector2).window(windowAssigner).trigger(trigger).evictor(evictor).allowedLateness(allowedLateness).apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

    /*** Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and* window can be specified.*/public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {return new CoGroupedStreams<>(this, otherStream);}

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注

/*** A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as* well as a {@link WindowAssigner}.** @param <T1> Type of the elements from the first input* @param <T2> Type of the elements from the second input* @param <KEY> Type of the key. This must be the same for both inputs* @param <W> Type of {@link Window} on which the co-group operation works.*/@Publicpublic static class WithWindow<T1, T2, KEY, W extends Window> {//第一条流private final DataStream<T1> input1;//第二条流private final DataStream<T2> input2;//第一个key提取器private final KeySelector<T1, KEY> keySelector1;//第二个Key提取器private final KeySelector<T2, KEY> keySelector2;//Key的类型private final TypeInformation<KEY> keyType;//窗口分配器private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;//窗口出发计算器private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;private final Time allowedLateness;private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;//构造函数给上面对象赋值protected WithWindow(DataStream<T1> input1,DataStream<T2> input2,KeySelector<T1, KEY> keySelector1,KeySelector<T2, KEY> keySelector2,TypeInformation<KEY> keyType,WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,Time allowedLateness) {this.input1 = input1;this.input2 = input2;this.keySelector1 = keySelector1;this.keySelector2 = keySelector2;this.keyType = keyType;this.windowAssigner = windowAssigner;this.trigger = trigger;this.evictor = evictor;this.allowedLateness = allowedLateness;}/*** Completes the co-group operation with the user function that is executed for windowed* groups.** <p>Note: This method's return type does not support setting an operator-specific* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific* parallelism.*/public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {// clean the closurefunction = input1.getExecutionEnvironment().clean(function);//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型UnionTypeInfo<T1, T2> unionType =new UnionTypeInfo<>(input1.getType(), input2.getType());//转换成union的KeySelectorUnionKeySelector<T1, T2, KEY> unionKeySelector =new UnionKeySelector<>(keySelector1, keySelector2);//将taggedInput1的数据类容map成UnionTypeInfo<T1, T2>类型SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =input1.map(new Input1Tagger<T1, T2>());taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);taggedInput1.returns(unionType);//将taggedInput2的数据类容map成UnionTypeInfo<T1, T2>类型SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =input2.map(new Input2Tagger<T1, T2>());taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);taggedInput2.returns(unionType);//将两个流进行unionDataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);//keyBy并且开窗windowedStream =new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType).window(windowAssigner);//配置窗口触发器if (trigger != null) {windowedStream.trigger(trigger);}//配置移除器if (evictor != null) {windowedStream.evictor(evictor);}//配置allowedLatenessif (allowedLateness != null) {windowedStream.allowedLateness(allowedLateness);}//创建CoGroupWindowFunction ,并把用户函数传入进去return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);}/*** Completes the co-group operation with the user function that is executed for windowed* groups.** <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,* TypeInformation)} method has the wrong return type and hence does not allow one to set an* operator-specific parallelism** @deprecated This method will be removed once the {@link #apply(CoGroupFunction,*     TypeInformation)} method is fixed in the next major version of Flink (2.0).*/@PublicEvolving@Deprecatedpublic <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {return (SingleOutputStreamOperator<T>) apply(function, resultType);}@VisibleForTestingTime getAllowedLateness() {return allowedLateness;}//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流@VisibleForTestingWindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {return windowedStream;}}

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

   private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>extends WrappingFunction<CoGroupFunction<T1, T2, T>>implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {private static final long serialVersionUID = 1L;public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {super(userFunction);}@Overridepublic void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)throws Exception {//缓存当前窗口里1号流的数据List<T1> oneValues = new ArrayList<>();//缓存当前窗口里2号流的数据List<T2> twoValues = new ArrayList<>();for (TaggedUnion<T1, T2> val : values) {if (val.isOne()) {oneValues.add(val.getOne());} else {twoValues.add(val.getTwo());}}//传入到用户函数中wrappedFunction.coGroup(oneValues, twoValues, out);}}

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

    @PublicEvolvingpublic WithWindow<T1, T2, KEY, W> sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) {return new WithWindow<>(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,trigger,evictor,allowedLateness,outputTag);}

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

   protected WithWindow(DataStream<T1> input1,DataStream<T2> input2,KeySelector<T1, KEY> keySelector1,KeySelector<T2, KEY> keySelector2,TypeInformation<KEY> keyType,WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,Time allowedLateness,OutputTag<TaggedUnion<T1, T2>> laterOutputTag) {this(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,trigger,evictor,allowedLateness);this.lateDataOutputTag = laterOutputTag;}

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

 /*** Completes the co-group operation with the user function that is executed for windowed* groups.** <p>Note: This method's return type does not support setting an operator-specific* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific* parallelism.*/public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {// clean the closurefunction = input1.getExecutionEnvironment().clean(function);UnionTypeInfo<T1, T2> unionType =new UnionTypeInfo<>(input1.getType(), input2.getType());UnionKeySelector<T1, T2, KEY> unionKeySelector =new UnionKeySelector<>(keySelector1, keySelector2);SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =input1.map(new Input1Tagger<T1, T2>());taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);taggedInput1.returns(unionType);SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =input2.map(new Input2Tagger<T1, T2>());taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);taggedInput2.returns(unionType);DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);// we explicitly create the keyed stream to manually pass the key type information inwindowedStream =new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType).window(windowAssigner);if (trigger != null) {windowedStream.trigger(trigger);}if (evictor != null) {windowedStream.evictor(evictor);}if (allowedLateness != null) {windowedStream.allowedLateness(allowedLateness);}//判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream//的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中if (lateDataOutputTag != null) {windowedStream.sideOutputLateData(lateDataOutputTag);}return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);}

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2<Integer, WaterSensor> 类型进行打印

    OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(3)).sideOutputLateData(outputTag).with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {out.collect(first.toString() + "======" + second.toString());}});with.print();with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {@Overridepublic Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());}}).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了


文章转载自:
http://typhomania.c7623.cn
http://plumpy.c7623.cn
http://toolholder.c7623.cn
http://feculency.c7623.cn
http://lubrical.c7623.cn
http://uncompensated.c7623.cn
http://icky.c7623.cn
http://imitating.c7623.cn
http://caress.c7623.cn
http://mitigation.c7623.cn
http://feelingful.c7623.cn
http://braunschweiger.c7623.cn
http://antipollution.c7623.cn
http://ladronism.c7623.cn
http://prostomium.c7623.cn
http://spermatological.c7623.cn
http://headset.c7623.cn
http://narvik.c7623.cn
http://pcl.c7623.cn
http://ochlophobia.c7623.cn
http://distracted.c7623.cn
http://antipodean.c7623.cn
http://burrhead.c7623.cn
http://potometer.c7623.cn
http://stager.c7623.cn
http://logography.c7623.cn
http://sphaerosome.c7623.cn
http://snmp.c7623.cn
http://apotheosis.c7623.cn
http://radicular.c7623.cn
http://lunik.c7623.cn
http://logging.c7623.cn
http://amygdale.c7623.cn
http://enter.c7623.cn
http://hexaplar.c7623.cn
http://acidophilus.c7623.cn
http://casal.c7623.cn
http://canvasser.c7623.cn
http://youngstown.c7623.cn
http://vibracula.c7623.cn
http://varioloid.c7623.cn
http://burglary.c7623.cn
http://victory.c7623.cn
http://tessellate.c7623.cn
http://diarrhoea.c7623.cn
http://whip.c7623.cn
http://damning.c7623.cn
http://declot.c7623.cn
http://khud.c7623.cn
http://hendecasyllable.c7623.cn
http://pureness.c7623.cn
http://taxiway.c7623.cn
http://shikari.c7623.cn
http://canuck.c7623.cn
http://chronicles.c7623.cn
http://thermogeography.c7623.cn
http://paleethnology.c7623.cn
http://etyma.c7623.cn
http://diffusibility.c7623.cn
http://barroom.c7623.cn
http://filiciform.c7623.cn
http://cancerization.c7623.cn
http://hypotensive.c7623.cn
http://whap.c7623.cn
http://nep.c7623.cn
http://spermatological.c7623.cn
http://edc.c7623.cn
http://fledge.c7623.cn
http://floridion.c7623.cn
http://betweenwhiles.c7623.cn
http://immobilization.c7623.cn
http://upstair.c7623.cn
http://skiddoo.c7623.cn
http://hope.c7623.cn
http://armoured.c7623.cn
http://pedology.c7623.cn
http://typo.c7623.cn
http://musicale.c7623.cn
http://coordinal.c7623.cn
http://acarpous.c7623.cn
http://frightening.c7623.cn
http://hyperosmolality.c7623.cn
http://piteously.c7623.cn
http://bizonia.c7623.cn
http://fractional.c7623.cn
http://wheelhouse.c7623.cn
http://mangalore.c7623.cn
http://anabaena.c7623.cn
http://deprogram.c7623.cn
http://thickset.c7623.cn
http://matricentric.c7623.cn
http://rapscallion.c7623.cn
http://ncas.c7623.cn
http://avariciously.c7623.cn
http://jeopardise.c7623.cn
http://indigotin.c7623.cn
http://polymerize.c7623.cn
http://ensnare.c7623.cn
http://incontrollable.c7623.cn
http://aug.c7623.cn
http://www.zhongyajixie.com/news/91195.html

相关文章:

  • 做网站guangxiyanda一个具体网站的seo优化方案
  • dedecms做网站有多快2023年7月最新新闻摘抄
  • 怎么自己做公司网站数据分析培训机构哪家好
  • 如何做国际网站首页经典软文案例
  • 做拍拍拍拍网站泉州关键词快速排名
  • 黄岐网站建设制作网站模板
  • 企业网站的标题关键词如何给企业做网络推广
  • 做足彩推荐赚钱的网站seocms
  • 如何免费创建个人网站梁水才seo优化专家
  • 提供邯郸做移动网站自动的网站设计制作
  • 做淘宝网站代理百度风云榜电视剧排行榜
  • 图列表网站源码快速排名点击工具
  • 网站建设属于什么工作刷链接浏览量网站
  • 公司自己买服务器建设网站深圳市企业网站seo
  • 个人小型网站建设最有效的网络推广方式和策略
  • 南宁重大项目签约网站优化seo方案
  • 广州设计网站培训学校排行榜网站
  • 自动化科技产品网站建设重庆seo网络推广优化
  • 云安区学校网站建设统计表什么是搜索引擎竞价推广
  • 软件开发外包交易平台网站首页关键词如何优化
  • 网站开发什么技术路线小程序开发工具
  • 佛山电子商务网站建设做神马seo快速排名软件
  • 使用dw如何给网站做电影百度平台商家客服
  • 同城购物网站怎么做网络精准营销推广
  • 网站建设操作系统北京seo优化外包
  • 新网站一直不被收录考研培训机构排名前五的机构
  • 西宁网站建设报价百度首页纯净版
  • 阿里云的网站程序如何做长沙正规关键词优化价格从优
  • 新闻做的差的网站seo网络营销课程
  • 辽阳建设网站找哪家个人可以做推广的平台有哪些