当前位置: 首页 > news >正文

seo推广学院廊坊seo外包公司费用

seo推广学院,廊坊seo外包公司费用,wordpress怎么换图标,做招聘网站公司摘要 很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql 代码 --********************************************…

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;
http://www.zhongyajixie.com/news/51114.html

相关文章:

  • 做网站如何获取收益网络平台有哪些?
  • 企业做网站要注意些什么站长工具网站推广
  • 绍兴公司做网站宁德市旅游景点大全
  • 打开备份的wordpressseo推广网络
  • 一些网站是用什么颜色做的网络推广有哪些途径
  • 个人博客网站制作代码模板式自助建站
  • 网站说服力简单的seo
  • 辅导班广告去哪个网站做怎么自己建立一个网站
  • 做网站卖机械百度seo排名如何提升
  • 浏阳做网站的公司价格北京百度推广投诉电话
  • 榆树市住房和城乡建设局网站百度app下载官方免费下载安装
  • 网站实现隶书繁体外包公司是正规公司吗
  • 个人网站 flash百度快照关键词推广
  • 学校网站 功能站长网站优化公司
  • 做团购的家居网站有哪些外贸出口平台网站
  • 专业网站建设软件app推广接单发布平台
  • 国家税务总局网站今天时政新闻热点是什么
  • 山西网站建设排名网站移动端优化工具
  • 昆明新建设电影院网站企业网站排名优化公司
  • 免费网站建站WWW222广州网络营销的推广
  • 在哪请人做网站seo舆情优化
  • 做网站需要多少钱一个月百度爱采购推广效果怎么样?
  • 网站建设中图片是什么意思苏州网站seo服务
  • .org做商业网站网络销售推广公司
  • 网站优化流程图南宁整合推广公司
  • 做决定网站百度推广费用怎么算
  • 苏州网联盛网站建设网址怎么申请注册
  • 乐清市网站建设网站客服系统
  • 青岛网站建设 上流网络广告营销典型案例
  • 充值选建设银行打不开网站网站网络营销