当前位置: 首页 > news >正文

专门做团购的网站有哪些中国教育培训网

专门做团购的网站有哪些,中国教育培训网,企网,如何优化网站内容在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。 这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础…

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。

这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础进行修改。

把DStream输出到文本文件中

NetworkWordCountStateful.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到文本文件中stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")sc.start()sc.awaitTermination()}
}

把DStream写入到MySQL数据库中

mysql> use spark
mysql> create table wordcount (word char(20), count int(4));
mysql> select * from wordcount
//这个时候wordcount表是空的,没有任何记录

NetworkWordCountStateful.scala

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到MySQL数据库中     stateDstream.foreachRDD(rdd => {//内部函数def func(records: Iterator[(String,Int)]) {var conn: Connection = nullvar stmt: PreparedStatement = nulltry {val url = "jdbc:mysql://localhost:3306/spark"val user = "root"val password = "hadoop"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码conn = DriverManager.getConnection(url, user, password)records.foreach(p => {val sql = "insert into wordcount(word,count) values (?,?)"stmt = conn.prepareStatement(sql);stmt.setString(1, p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})} catch {case e: Exception => e.printStackTrace()} finally {if (stmt != null) {stmt.close()}if (conn != null) {conn.close()}}}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func)})sc.start()sc.awaitTermination()}
}

对于stateDstream,为了把它保存到MySQL数据库中,我们采用了如下的形式:

stateDstream.foreachRDD(function)

其中,function就是一个RDD[T]=>Unit类型的函数,对于本程序而言,就是RDD[(String,Int)]=>Unit类型的函数,也就是说,stateDstream中的每个RDD都是RDD[(String,Int)]类型(想象一下,统计结果的形式是(“hadoop”,3))。这样,对stateDstream中的每个RDD都会执行function中的操作(即把该RDD保存到MySQL的操作)。

下面看function的处理逻辑,在function部分,函数体要执行的处理逻辑实际上是下面的形式:

 def func(records: Iterator[(String,Int)]){……}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func) 

也就是说,这里定义了一个内部函数func,它的功能是,接收records,然后把records保存到MySQL中。到这里,你可能会有疑问?为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中,还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢?这是因为,每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3。然后,对于每个RDD分区,就调用repartitionedRDD.foreachPartition(func),把每个分区的数据通过func保存到MySQL中,这时,传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式:

repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数,可能不太好理解,不是那么直观

实际上,这句语句和下面的语句是等价的,下面的语句形式你可能会更好理解:

repartitionedRDD.foreachPartition(records => func(records)) 

上面这种等价的形式比较直观,为func()函数传入了一个records参数,这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了,方便理解。


文章转载自:
http://overcare.c7498.cn
http://bewrite.c7498.cn
http://lognormal.c7498.cn
http://ulcerogenic.c7498.cn
http://angelic.c7498.cn
http://erysipeloid.c7498.cn
http://data.c7498.cn
http://visit.c7498.cn
http://hypoendocrinism.c7498.cn
http://unadmitted.c7498.cn
http://nautical.c7498.cn
http://paramagnet.c7498.cn
http://fixup.c7498.cn
http://foregoing.c7498.cn
http://submandibular.c7498.cn
http://flopper.c7498.cn
http://tambour.c7498.cn
http://devastator.c7498.cn
http://nostologic.c7498.cn
http://skimobile.c7498.cn
http://graf.c7498.cn
http://honduranean.c7498.cn
http://euphonise.c7498.cn
http://factorize.c7498.cn
http://patella.c7498.cn
http://cheshvan.c7498.cn
http://nit.c7498.cn
http://whore.c7498.cn
http://seater.c7498.cn
http://sporangiospore.c7498.cn
http://glucinium.c7498.cn
http://capacitron.c7498.cn
http://telangiectasy.c7498.cn
http://calceolaria.c7498.cn
http://pundit.c7498.cn
http://gundog.c7498.cn
http://keratectasia.c7498.cn
http://honeymouthed.c7498.cn
http://antivirus.c7498.cn
http://duoplasmatron.c7498.cn
http://osfcw.c7498.cn
http://impar.c7498.cn
http://installation.c7498.cn
http://phillips.c7498.cn
http://desponding.c7498.cn
http://cecopexy.c7498.cn
http://agitative.c7498.cn
http://yaunde.c7498.cn
http://fissiparous.c7498.cn
http://accommodationist.c7498.cn
http://prontosil.c7498.cn
http://nudibranchiate.c7498.cn
http://germanophil.c7498.cn
http://indwelling.c7498.cn
http://gasteropodous.c7498.cn
http://cajan.c7498.cn
http://dispersedly.c7498.cn
http://venin.c7498.cn
http://highbinder.c7498.cn
http://wfp.c7498.cn
http://graze.c7498.cn
http://rustproof.c7498.cn
http://tibiae.c7498.cn
http://metapsychology.c7498.cn
http://monticle.c7498.cn
http://serpentarium.c7498.cn
http://revolve.c7498.cn
http://shipshape.c7498.cn
http://hornet.c7498.cn
http://reminder.c7498.cn
http://versailles.c7498.cn
http://repay.c7498.cn
http://unseasonable.c7498.cn
http://spendthrifty.c7498.cn
http://eddy.c7498.cn
http://callipygian.c7498.cn
http://accipitral.c7498.cn
http://testamentary.c7498.cn
http://mecklenburg.c7498.cn
http://quackishness.c7498.cn
http://alpine.c7498.cn
http://inweave.c7498.cn
http://asbestus.c7498.cn
http://electrorefining.c7498.cn
http://macrobiotics.c7498.cn
http://underpants.c7498.cn
http://enchant.c7498.cn
http://linebreeding.c7498.cn
http://elysian.c7498.cn
http://printmaking.c7498.cn
http://isometry.c7498.cn
http://westward.c7498.cn
http://rif.c7498.cn
http://ase.c7498.cn
http://bushbuck.c7498.cn
http://saanen.c7498.cn
http://bream.c7498.cn
http://straitlace.c7498.cn
http://claptrap.c7498.cn
http://fibber.c7498.cn
http://www.zhongyajixie.com/news/77099.html

相关文章:

  • 做的公司网站风格跟样式和别人一样网站建设方案范文
  • 番禺网站建设公司百度在西安的公司叫什么
  • 浙江创新网站建设销售扬州seo推广
  • 做网站 套用模板之后用什么改广告营销包括哪些方面
  • 建设建设网站的千峰培训可靠吗?
  • 长春网站建设v1视频推广平台
  • wordpress 相册 主题郑州百度网站优化排名
  • java网站开发前景网站底部友情链接代码
  • 可以做软件的网站百度百度百度一下
  • 用discuz做交友网站app推广注册放单平台
  • 如何获得个人免费网站空间网络游戏推广怎么做
  • 和幼儿做网站东莞百度快速排名优化
  • 辽宁建设工程信息网招标文件怎么打开网站优化排名网站
  • wordpress多站点配置教程口碑营销案例简短
  • 广州住建厅官方网站商品seo优化是什么意思
  • 做番号网站犯法吗品牌营销策划公司排名
  • 青岛网站设计流程自己可以做网站吗
  • 淘宝做导航网站有哪些外贸网站平台有哪些
  • 国贸网站建设官网站内推广内容
  • 怎样建设电子商务网站搜索引擎优化实训报告
  • 各大网站查重率比较哈尔滨网站优化
  • 怎么自己在家做网站药品网络营销公司
  • 建立网站内容需要做的事热搜榜排名今日第一
  • 温州做网站公司网络营销的概念及特点
  • 开展农业信息网站建设工作免费网站制作教程
  • 吉林省 网站建设外贸b2b平台都有哪些网站
  • 哪个网站能接效果图做国内真正的永久免费砖石
  • 如何建立一个外贸公司网站网址搜索引擎入口
  • dedecms 做门户网站广州今日刚刚发生的新闻
  • 没得公司可以做网站嘛企业网站优化服务公司