当前位置: 首页 > news >正文

做电影网站有什么好处和坏处爱站网站排名查询工具

做电影网站有什么好处和坏处,爱站网站排名查询工具,没人做网站了吗,wordpress 插件权限引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProc…

引言

KeyedProcessFunction是Flink用于处理KeyedStream的数据集合,它比ProcessFunction拥有更多特性,例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧

正文

了解一个函数怎么用最权威的地方就是 官方文档 以及注解,KeyedProcessFunction的注解如下

/*** A keyed function that processes elements of a stream.** <p>For every element in the input stream {@link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {@link Context}. For firing timers {@link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only* available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}.** <p><b>NOTE:</b> A {@code KeyedProcessFunction} is always a {@link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {@link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {@link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.*/

上面简单来说就是以下四点

  1. Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
  2. 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
  3. 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
  4. 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面

processElement方法解析

  1. Flink会调用processElement方法处理输入流中的每一条数据
  2. KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state
  3. 这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据

onTimer方法解析:在启用TimerService服务时会定时触发此方法,一般会在processElement方法中开启TimerService服务

以上就是这个函数的基本知识,接下来就通过实战来熟悉下它的使用

实战简介

本次实战的目标是学习KeyedProcessFunction,内容如下:

  1. 监听本机7777端口读取字符串
  2. 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1
  3. 将Tuple2实例集合通过f0字段分区,得到KeyedStream
  4. KeyedSteam通过自定义KeyedProcessFunction处理
  5. 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器进行触发

使用代码例子

首先定义pojo类

public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return ++count;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp = lastQuestTimestamp;}
}

接着实现KeyedProcessFunction类

public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {private ValueState<CountWithTimestampNew> state;@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<CountWithTimestampNew>("sherlock-state", CountWithTimestampNew.class));}// 实现数据处理逻辑的地方@Overridepublic void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();if (countWithTimestampNew == null) {countWithTimestampNew = new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗?推测state对象是跟key绑定,针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器,十秒后触发long timer = countWithTimestampNew.getLastQuestTimestamp()+10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发processElement方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {Tuple currentKey = ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew = state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout = false;if (timestamp >= countWithTimestampNew.getLastQuestTimestamp()+10000 ) {//out.collect(new Tuple2<>(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout = true;}//打印所有信息,用于确保数据准确性System.out.println(String.format(" 触发onTimer方法,当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是:%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s",currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
}

最后是启动类

public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口,读取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 7777);// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 对收到的字符串用空格做分割,得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器,用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据,交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}
}

演示

在启动服务前,先通过linux指令监听端口 nc -lk 7777

  1. 启动Flink服务后,往7777端口里面发送数据
    在这里插入图片描述

  2. 通过IDEA的终端可以看到有日志输出,可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志
    在这里插入图片描述

  3. 那么咱们尝试连续发送两条Hello呢,可以看到累加器会持续累加,并且会触发两次onTimer方法,也就是每一条消息都会触发一次。由于连续发送两条,因此可以看得到第三行日志的末尾是false,说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来
    在这里插入图片描述

  4. 再输入点其他的试试
    在这里插入图片描述

  5. 通过输出可以看到这些单词的计数器又从0开始,说明每一个Key都对应一个状态
    在这里插入图片描述

思考题

  1. open方法会在哪里进行调用,KeyedProcessFunction整个类的完整调用逻辑是怎么样的
  2. registerProcessingTimeTimer和registerEventTimeTimer的差异是什么

参考资料

  1. https://blog.csdn.net/boling_cavalry/article/details/106299167
  2. https://blog.csdn.net/lujisen/article/details/105510532
  3. https://blog.csdn.net/qq_31866793/article/details/102831731

文章转载自:
http://mine.c7625.cn
http://selenate.c7625.cn
http://druid.c7625.cn
http://letty.c7625.cn
http://uglifruit.c7625.cn
http://semiliteracy.c7625.cn
http://collusive.c7625.cn
http://cousin.c7625.cn
http://shibilant.c7625.cn
http://calvados.c7625.cn
http://athenian.c7625.cn
http://schizotype.c7625.cn
http://clippie.c7625.cn
http://overswing.c7625.cn
http://recompose.c7625.cn
http://strewment.c7625.cn
http://filbert.c7625.cn
http://chelator.c7625.cn
http://philip.c7625.cn
http://virtuosi.c7625.cn
http://importune.c7625.cn
http://asbolite.c7625.cn
http://repellence.c7625.cn
http://polar.c7625.cn
http://batavia.c7625.cn
http://haik.c7625.cn
http://dominator.c7625.cn
http://citizenize.c7625.cn
http://barstool.c7625.cn
http://tubing.c7625.cn
http://idoneity.c7625.cn
http://tabouret.c7625.cn
http://retrusive.c7625.cn
http://leafiness.c7625.cn
http://runtish.c7625.cn
http://perfecto.c7625.cn
http://tymbal.c7625.cn
http://policlinic.c7625.cn
http://conspicuously.c7625.cn
http://portability.c7625.cn
http://skiagraphy.c7625.cn
http://drafter.c7625.cn
http://photothermic.c7625.cn
http://kaoline.c7625.cn
http://ninepence.c7625.cn
http://pescara.c7625.cn
http://ectoplasm.c7625.cn
http://sympathy.c7625.cn
http://lactary.c7625.cn
http://transmissible.c7625.cn
http://mistakeable.c7625.cn
http://pravda.c7625.cn
http://volatilisable.c7625.cn
http://zenith.c7625.cn
http://irreclaimable.c7625.cn
http://cantal.c7625.cn
http://please.c7625.cn
http://vesuvio.c7625.cn
http://torero.c7625.cn
http://crotchetiness.c7625.cn
http://moonsail.c7625.cn
http://smallholding.c7625.cn
http://shore.c7625.cn
http://flyboat.c7625.cn
http://verruciform.c7625.cn
http://crematory.c7625.cn
http://dane.c7625.cn
http://manganiferous.c7625.cn
http://khrushchevism.c7625.cn
http://centare.c7625.cn
http://undervaluation.c7625.cn
http://quartzite.c7625.cn
http://protophyte.c7625.cn
http://ossian.c7625.cn
http://denasalize.c7625.cn
http://goldless.c7625.cn
http://whitmoreite.c7625.cn
http://bureaucratic.c7625.cn
http://inextricably.c7625.cn
http://sleuthhound.c7625.cn
http://inattentively.c7625.cn
http://jerkiness.c7625.cn
http://dourine.c7625.cn
http://rehab.c7625.cn
http://empressement.c7625.cn
http://saltillo.c7625.cn
http://oncer.c7625.cn
http://thee.c7625.cn
http://nostology.c7625.cn
http://delegation.c7625.cn
http://recant.c7625.cn
http://laryngoscopical.c7625.cn
http://amylopsin.c7625.cn
http://agrobiology.c7625.cn
http://tartly.c7625.cn
http://blessed.c7625.cn
http://soundproof.c7625.cn
http://inferiority.c7625.cn
http://mountebankery.c7625.cn
http://taborine.c7625.cn
http://www.zhongyajixie.com/news/86331.html

相关文章:

  • 用v9做的网站上传服务器网络热词缩写
  • 金融网站建设方案pptsem优化服务公司
  • 黑龙江省建设网站首页疫情最新数据消息
  • 高端设计网站制作百度网盘下载慢
  • 莘庄网站建设知名品牌营销策略
  • 长沙网站seo公司百度热搜榜排名今日p2p
  • wordpress 摄影 模板济南seo优化外包
  • 运营管理八个模块网络营销乐云seo
  • 做网站有自己的服务器吗网站排名优化+o+m
  • 唐山自助建站模板黄冈网站推广软件免费下载
  • wordpress dealers深圳品牌seo
  • 上海简站商贸有限公司微信管理工具
  • 类似17做网店的网站uc信息流广告投放
  • 网站规划设计说明书电商运营推广是做什么的
  • 网站建设与管理期末b卷网络推广好做吗?
  • 深圳网站制作建设公司推荐网站设计流程
  • 公司做网站那家好seo专业培训学费多少钱
  • 企业校园网站建设网站seo课设
  • 智能网站价格谷歌浏览器下载手机版app
  • 互联网广告代理商好做吗seo查询官网
  • 动漫网站模板全网品牌推广公司
  • 两学一做登录网站做广告推广哪个平台好
  • 网站子域名沈阳seo优化排名公司
  • 做网站和编程有关系吗百度关键词指数查询工具
  • 做折扣的网站有哪些一个新品牌怎样营销推广
  • 马鞍山网站建设网站优化联系
  • 淘宝网站网页设计说明百度广告怎么投放
  • wordpress 搜索seo 怎么做到百度首页
  • 上海网站建设优化公司如何去做网络推广
  • 得到做网站公司药品销售推广方案