当前位置: 首页 > news >正文

百度公司网站怎么建设seo搜索引擎优化薪资

百度公司网站怎么建设,seo搜索引擎优化薪资,网页设计比较优秀的网站,网站建设使用工具1. 背景 公司最近在新建集群,全部采用开源的大数据框架,并且将之前使用的阿里云的所有服务进行下线,其中就涉及到了旧任务的迁移。 2. 任务 2.1. 简述 我接手到一个之前的 spark 任务,是读取阿里 LogStore 数据,然…

1. 背景

公司最近在新建集群,全部采用开源的大数据框架,并且将之前使用的阿里云的所有服务进行下线,其中就涉及到了旧任务的迁移。

2. 任务

2.1. 简述

我接手到一个之前的 spark 任务,是读取阿里 LogStore 数据,然后使用 spark streaming,将接收到的 LogStore 数据注册为表,之后运行 spark sql 进行分批处理,每 2 分钟一批,最后写入时序数据库。

2.2. 处理逻辑

spark sql 首先计算接收到的 2 分钟数据,对维度字段进行 group by,指标字段进行 sumcount 之类的聚合操作;然后将这两分钟的结果和之前从当天 0 点开始累积到上个 2 分钟的结果进行 union all,最后再次进行 group by 以及 sumcount 操作,最后将结果写出。

整体需求是,计算当天 0 点到每个 2 分钟的累加结果,类似于 flink sql 中的渐进式(或叫累计)窗口。

3. 改造方案

去掉从阿里的 LogStore 接收数据,而是从 kafka 接收数据,后面所有的处理逻辑都一样。

4. 出现的问题

将改造、重构后的代码部署到新建的大数据集群上运行,结果发现,计算的结果总是比之前的环境中大一些。

然后我们就开始进行代码级别的排查,一直以为是代码哪儿写错了。之前的代码接收 LogStore 的数据,而且是只接收了一个流的数据,但是改造之后,需要接收三个 kafka 主题的数据,在 spark 代码中,就变成了三个 InputDStream,然后分别将三个流注册为三张不同的表,最后再进行一个大的 sql 处理,示例代码见下面。

case class Table1(@BeanProperty var goods1: String, @BeanProperty var price1: Int) extends Serializable
case class Table2(@BeanProperty var goods2: String, @BeanProperty var price2: Int) extends Serializable
case class Table3(@BeanProperty var goods3: String, @BeanProperty var price3: Int) extends Serializable
object Stream extends Serializable {def main(args: Array[String]): Unit = {val masterUri = sys.props.getOrElse("spark.master", "local[4]")// 获取 spark 环境val conf = new SparkConf()val spark: SparkSession = SparkSession.builder().config(conf).master(masterUri).getOrCreate()val sparkContext = spark.sparkContextval ssc: StreamingContext = new StreamingContext(sparkContext, Seconds(120))val sqlContext = spark.sqlContext// ------------------------------------------------------------------------------------------------------------------------------------------------------------val kafkaParams: mutable.Map[String, Object] = mutable.Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka01:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getCanonicalName,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer].getCanonicalName,ConsumerConfig.GROUP_ID_CONFIG -> "test-1")// 保存 offset,最后手动提交val offsetRangesList = mutable.ListBuffer[Array[OffsetRange]]()val topic1 = Array("topic1")val tableName1 = "table1"val inputDS1: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic1, kafkaParams))inputDS1.foreachRDD(rdd => {offsetRangesList += rdd.asInstanceOf[HasOffsetRanges].offsetRanges})inputDS1.map(_.value()).map(x => JSONUtil.toBean(x, classOf[Table1])).foreachRDD((rdd: RDD[Table1]) => {spark.createDataFrame(rdd).createOrReplaceTempView(tableName1)})val topic2 = Array("topic2")val tableName2 = "table2"val inputDS2: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic2, kafkaParams))inputDS2.foreachRDD(rdd => {offsetRangesList += rdd.asInstanceOf[HasOffsetRanges].offsetRanges})inputDS2.map(_.value()).map(x => JSONUtil.toBean(x, classOf[Table2])).foreachRDD((rdd: RDD[Table2]) => {spark.createDataFrame(rdd).createOrReplaceTempView(tableName2)})val topic3 = Array("topic3")val tableName3 = "table3"val inputDS3: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic3, kafkaParams))inputDS3.foreachRDD(rdd => {offsetRangesList += rdd.asInstanceOf[HasOffsetRanges].offsetRanges})// 所有计算和结果写出都在下面维护inputDS3.map(_.value()).map(x => JSONUtil.toBean(x, classOf[Table3])).foreachRDD(foreachFunc = (rdd: RDD[Table3]) => {spark.createDataFrame(rdd).createOrReplaceTempView(tableName3)// 从 hdfs 读取上一批次的计算结果val lastDataDF: DataFrame = sqlContext.read.format("csv").option("header", "true").load("hdfs:///spark/latest-data")lastDataDF.createOrReplaceTempView("last_result")// 计算最新的结果val resultDF: DataFrame = spark.sql("真正要执行的 sql 语句")// 将结算结果进行输出,这里简单调用 show ,只是为了演示resultDF.show(10)// 将本批次结果写入 hdfs,供下次计算前初始化使用resultDF.write.option("header", "true").mode(SaveMode.Overwrite).csv("hdfs:///spark/latest-data")// 手动提交 offsetfor (offsetRanges <- offsetRangesList) {inputDS3.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}offsetRangesList.clear()// 清理掉内存中的结果数据resultDF.unpersist()})ssc.start()ssc.awaitTermination()}}

我的做法是,将三个主题对应的流分别处理,然后各自注册为表,并且在最后一个主题的 foreachRdd 函数中进行 sql 的执行和结果的输出。

注意第三个主题对应流里面的处理流程:

  1. 接收本批次数据,先从 hdfs 对应路径获取上批次结果,注册名为 last_result 的表。
  2. 执行真正的 sql 计算逻辑。
  3. 将结果写出,为了代码演示,只是简单的使用 show() 函数进行输出。
  4. 将本批次的计算结果保存到 hdfs,然后手动提交 offset。

由于我的逻辑中,每次处理,都需要将本批次的计算结果和 0 点到上一批次的计算进行合并处理,所以每次都会将本批次的计算结果写出到 hdfs,此时就出现了问题,最后算出来的每批次结果值都比正确结果多一些。

然后我们就把每批次的结果值,不但输出到 hdfs 进行保存,而且还把他们输出到 mysql,查看其详细的计算结果,看到底是哪一步出了问题。

通过观察 mysql 中每批次的详细计算结果,我们发现,同一个商品,在一个批次计算中,居然出现了相同时间的两条计算结果数据,但理论上应该是只有一条才对。此时我们才发现了问题所在:由于 spark 框架计算由于,某一批次的计算结果中,对 group by 中出现的字段,并没有做到真正的唯一聚合,而是出现了多条。而且我是把每批次的计算结果都写入到 hdfs,也没有对结果数据进行去重,所以下批次数据计算时,通过上批次写入到 hdfs 的结果进行 last_result 表的初始化后,last_result 表中对于同一个维度组合,就会出现多条数据,本批次聚合计算完之后,最终的结果值就多了。而且,这种情况只出现在设置 spark 任务为多并发时才会出现,如果提交时只给一个 executor,并且只给 1 核 CPU,就不会出现问题。

最后手动部署了 apache spark-2.3.2,替换掉之前使用的 CDH-6.3.2 内置的 spark-2.4.0,重新运行任务,就没问题了。

5. 总结

CDH-6.3.2 中内置的 spark-2.4.0 有 bug,在实时数据处理上,如果是多并发处理,遇到 group by 时,对于同一个维度组合,可能会出现多条数据。

至于 hive on spark 和 spark on hive 的方式使用 CDH 内置的这个版本的 spark 会不会出现问题,目前还没去做验证,不过我们还是决定重新部署线上使用的 spark,替换为 apache spark 的稳定版本。


文章转载自:
http://consulate.c7617.cn
http://unlid.c7617.cn
http://vespertilionid.c7617.cn
http://jackstaff.c7617.cn
http://thermos.c7617.cn
http://intertranslatable.c7617.cn
http://extravasation.c7617.cn
http://sinologue.c7617.cn
http://doomwatcher.c7617.cn
http://aerosinusitis.c7617.cn
http://shortlist.c7617.cn
http://pardon.c7617.cn
http://stillbirth.c7617.cn
http://lyceum.c7617.cn
http://banshee.c7617.cn
http://ciliiform.c7617.cn
http://aerograph.c7617.cn
http://wellingtonia.c7617.cn
http://salonika.c7617.cn
http://borderer.c7617.cn
http://conversational.c7617.cn
http://collunarium.c7617.cn
http://clipping.c7617.cn
http://galosh.c7617.cn
http://cerusite.c7617.cn
http://phillumenist.c7617.cn
http://klischograph.c7617.cn
http://inoxidize.c7617.cn
http://pavilion.c7617.cn
http://bunnia.c7617.cn
http://incurably.c7617.cn
http://cryoprotective.c7617.cn
http://kronos.c7617.cn
http://overwrap.c7617.cn
http://restenosis.c7617.cn
http://murky.c7617.cn
http://chessel.c7617.cn
http://resettle.c7617.cn
http://cameroonian.c7617.cn
http://endocardiac.c7617.cn
http://exhalent.c7617.cn
http://snackette.c7617.cn
http://perdue.c7617.cn
http://bestir.c7617.cn
http://anonym.c7617.cn
http://popped.c7617.cn
http://pungi.c7617.cn
http://cartage.c7617.cn
http://encephalograph.c7617.cn
http://athanasia.c7617.cn
http://orpiment.c7617.cn
http://sensually.c7617.cn
http://amendment.c7617.cn
http://reconciliatory.c7617.cn
http://prebiotic.c7617.cn
http://roadsigns.c7617.cn
http://cracked.c7617.cn
http://xenotime.c7617.cn
http://portulan.c7617.cn
http://tycoonship.c7617.cn
http://selah.c7617.cn
http://schizomycete.c7617.cn
http://sensitization.c7617.cn
http://racking.c7617.cn
http://ethnically.c7617.cn
http://allsorts.c7617.cn
http://alist.c7617.cn
http://vivat.c7617.cn
http://copy.c7617.cn
http://impermanence.c7617.cn
http://medicable.c7617.cn
http://therapeutics.c7617.cn
http://flecky.c7617.cn
http://beaky.c7617.cn
http://snakeroot.c7617.cn
http://zonkey.c7617.cn
http://plowshoe.c7617.cn
http://laminarin.c7617.cn
http://rhythmize.c7617.cn
http://talion.c7617.cn
http://leukocytosis.c7617.cn
http://revoice.c7617.cn
http://bringdown.c7617.cn
http://mamaguy.c7617.cn
http://misteach.c7617.cn
http://drouthy.c7617.cn
http://evaluable.c7617.cn
http://pinnatifid.c7617.cn
http://classifiable.c7617.cn
http://ferdus.c7617.cn
http://customary.c7617.cn
http://lowveld.c7617.cn
http://atkins.c7617.cn
http://subform.c7617.cn
http://lazily.c7617.cn
http://aging.c7617.cn
http://eligibility.c7617.cn
http://trophy.c7617.cn
http://engine.c7617.cn
http://seamster.c7617.cn
http://www.zhongyajixie.com/news/95971.html

相关文章:

  • 网站备案 代理网络推广怎么做效果好
  • 网站建设周期计划网络黄页推广软件
  • 多个域名指向同一个网站百度人工服务24小时电话
  • 服务器屏蔽网站seo快速排名软件网址
  • php如何制作网站百度外链查询工具
  • 合肥知名建站公司seo美式
  • wap网站报价抖音引流推广怎么做
  • 怎么做直播网站今日新闻网
  • 合肥企业建站程序今日新闻简报
  • php网站建设心得体会百度竞价有点击无转化
  • 网站换服务器怎么做做网站的平台
  • 广州荔湾做网站手机app免费制作平台
  • 管理软件有哪几种天津百度推广排名优化
  • 怎么打帮人 做网站开发的广告百度小说app下载
  • 个人如何做网站百度知道官网
  • 推广什么东莞seo外包公司
  • 专做淘宝的网站国家市场监管总局官网
  • 静态网页制作毕业论文海阳seo排名优化培训
  • 科技有限公司可以做网站建设吗?郑州网络营销学校
  • 焦作市住房和城乡建设局网站百度竞价优化排名
  • 做地方网站如何盈利志鸿优化设计电子版
  • 免费模板建站企业网站推广
  • 网站和app区别与联系seo优化多久能上排名
  • 惠州网站建设 惠州邦智能营销系统开发
  • 公司用在线客服系统网站关键词优化建议
  • 徐州市建设局网站网站seo优化工具
  • 欧模网室内设计网官网如何做网站推广及优化
  • 网站建设软件有哪些游戏代理300元一天
  • 暖通毕业设计代做网站深圳推广平台有哪些
  • 个人网站的名称企业域名查询