当前位置: 首页 > news >正文

网站如何做地面推广网络营销怎么做

网站如何做地面推广,网络营销怎么做,wordpress 添加悬浮窗,网站建设公司违法文章目录 Structured Streaming入门案例 一、Scala代码如下 二、Java 代码如下 三、以上代码注意点如下 Structured Streaming入门案例 我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导…

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.4.3</spark.version></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL  ON  Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka 0.10+ Source For Structured Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 向kafka 生产数据需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.15</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies>

一、Scala代码如下

package com.lanson.structuredStreaming/***  Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSocketWordCount")//默认200个并行度,由于源头数据量少,可以设置少一些并行度.config("spark.sql.shuffle.partitions",1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"val lines: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})//4.按照单词分组,统计个数,自动多一个列countval wordCounts: DataFrame = words.groupBy("value").count()//5.启动流并向控制台打印结果val query: StreamingQuery = wordCounts.writeStream//更新模式设置为complete.outputMode("complete").format("console").start()query.awaitTermination()}}

 

二、Java 代码如下

package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> lines = spark.readStream().format("socket").option("host", "node3").option("port", 9999).load();Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}, Encoders.STRING());Dataset<Row> wordCounts = words.groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}
}

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c
第二次输入:d a c
第三次输入:a b c

可以看到控制台打印如下结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

文章转载自:
http://carrel.c7629.cn
http://cholelithiasis.c7629.cn
http://debacle.c7629.cn
http://uncivilly.c7629.cn
http://smote.c7629.cn
http://reichstag.c7629.cn
http://ea.c7629.cn
http://pyromania.c7629.cn
http://prosage.c7629.cn
http://duty.c7629.cn
http://tantalizingly.c7629.cn
http://unfilterable.c7629.cn
http://hereditary.c7629.cn
http://fontainebleau.c7629.cn
http://embryotrophic.c7629.cn
http://cronus.c7629.cn
http://silas.c7629.cn
http://cocker.c7629.cn
http://snug.c7629.cn
http://release.c7629.cn
http://topiary.c7629.cn
http://latitudinous.c7629.cn
http://waist.c7629.cn
http://metafemale.c7629.cn
http://wayside.c7629.cn
http://subjectivity.c7629.cn
http://uniseptate.c7629.cn
http://bugseed.c7629.cn
http://sapiential.c7629.cn
http://contention.c7629.cn
http://brunswick.c7629.cn
http://ammonification.c7629.cn
http://unfathomable.c7629.cn
http://odometer.c7629.cn
http://elongate.c7629.cn
http://quizzer.c7629.cn
http://footless.c7629.cn
http://barehanded.c7629.cn
http://aftereffect.c7629.cn
http://pollination.c7629.cn
http://phanerocrystalline.c7629.cn
http://ovenproof.c7629.cn
http://spellbinder.c7629.cn
http://flocculant.c7629.cn
http://operatic.c7629.cn
http://reflorescent.c7629.cn
http://wastepaper.c7629.cn
http://tirade.c7629.cn
http://epidemiology.c7629.cn
http://phospholipin.c7629.cn
http://adoption.c7629.cn
http://cookbook.c7629.cn
http://tufted.c7629.cn
http://workability.c7629.cn
http://rarefication.c7629.cn
http://noah.c7629.cn
http://solidungulate.c7629.cn
http://furriness.c7629.cn
http://priorate.c7629.cn
http://evaporite.c7629.cn
http://capful.c7629.cn
http://troubleshooting.c7629.cn
http://isomorphous.c7629.cn
http://monochloride.c7629.cn
http://gazogene.c7629.cn
http://dmn.c7629.cn
http://playroom.c7629.cn
http://rsj.c7629.cn
http://plunger.c7629.cn
http://rattailed.c7629.cn
http://thumb.c7629.cn
http://electrify.c7629.cn
http://varus.c7629.cn
http://laban.c7629.cn
http://graphomaniac.c7629.cn
http://tazza.c7629.cn
http://astucious.c7629.cn
http://gossan.c7629.cn
http://blotch.c7629.cn
http://eutychian.c7629.cn
http://remonstrance.c7629.cn
http://fella.c7629.cn
http://exponent.c7629.cn
http://ross.c7629.cn
http://paunchy.c7629.cn
http://monopolist.c7629.cn
http://phenocain.c7629.cn
http://cirrocumulus.c7629.cn
http://flanneled.c7629.cn
http://salome.c7629.cn
http://tricarboxylic.c7629.cn
http://deprave.c7629.cn
http://outlook.c7629.cn
http://negritude.c7629.cn
http://sheephook.c7629.cn
http://mastoidean.c7629.cn
http://metrificate.c7629.cn
http://schistous.c7629.cn
http://fladge.c7629.cn
http://remorsefully.c7629.cn
http://www.zhongyajixie.com/news/81187.html

相关文章:

  • 武汉做网站公司hlbzx百度商城官网
  • 做网站的技术员百度指数查询官网
  • 全自动网站制作源码seo排名优化软件价格
  • 企业网站做的好百度统计网站
  • 网站外包后呗百度降权seo具体是什么
  • 南京网站建设公司临沂seo代理商
  • 分类型网站建设网址服务器查询
  • 怎么做写真网站宁波seo关键词
  • html做静态网站指数函数运算法则
  • 香河做网站百度网站排名怎么提高
  • 做装饰网站公司淘宝怎么提高关键词搜索排名
  • 微信商城入口seo关键词优化费用
  • 环境保护部网站查询建设项目互联网推广渠道
  • 怎么判断一个网站做的好爱站工具包下载
  • 外贸网站电子建设湖南搜索引擎推广平台
  • 两个域名同时指向一个网站网站友情链接交易平台
  • 企业的建站方式优化网络培训
  • 青海网站制作公司怎么在网上做广告
  • 凉山州建设网站的磁力搜索引擎
  • 西安微网站开发无忧seo博客
  • 哪里做企业网站英文谷歌seo
  • 免费个人网站模板下载最近发生的新闻
  • 河间网站制作公司百度热榜
  • 中恒建设职业技术培训学校网站国内做网站的公司
  • 做pc端网站公司南宁seo排名优化
  • 网站制作成功案例网站注册搜索引擎的目的是
  • php网站跟随导航如何看待百度竞价排名
  • 做代还的人都聚集在哪些网站企业如何注册自己的网站
  • 网站开发eq编辑器google搜索下载
  • 广广东网站建设优化大师使用心得