当前位置: 首页 > news >正文

开封网站制作公司优秀网站设计欣赏

开封网站制作公司,优秀网站设计欣赏,cm在线设计平台,修改wordpress图片外链学习了这么多python的知识,是时候来搞点真玩意儿了~~ 春风得意马蹄疾,一日看尽长安花 o(* ̄︶ ̄*)o 1.前言介绍 (1)什么是spark Apache Spark是一个开源的分布式计算框架,用于处理大规模数据集的…

学习了这么多python的知识,是时候来搞点真玩意儿了~~

春风得意马蹄疾,一日看尽长安花

o(* ̄︶ ̄*)o


 1.前言介绍

(1)什么是spark

        Apache Spark是一个开源的分布式计算框架,用于处理大规模数据集的计算任务。它提供了一种高性能、通用、易用的计算引擎,支持数据并行处理、内存计算、迭代计算等多种计算模式,并提供了丰富的API,比如Spark SQL、Spark Streaming、Mlib和Graphx等。Spark的基本单元是弹性分布式数据集(RDD),它是一种可分区、可并行计算的数据结构,可以在多个节点上进行操作。Spark可以运行在多种集群管理器上,包括Hadoop YARN、Apache Mesos和Standalone等。Spark的特点是速度快、易用、灵活、可扩展,已经成为了数据处理和数据科学领域的一个重要工具。

(2)什么是pyspark

        PySpark是指Spark的Python API,它是Spark的一部分,可以通过Python语言进行Spark编程。PySpark提供了Python与Spark之间的交互,使Python开发人员能够使用Python语言对Spark进行编程和操作。PySpark可以使用Python进行数据预处理和分析,同时扩展了Python语言的功能,能够同时使用Spark的分布式计算能力,加快了数据处理和分析的速度。与其他编程语言相比,Python在数据处理和科学计算方面有广泛的应用和支持,因此PySpark特别适合处理Python与大规模数据集交互的数据科学项目。

 


2.基础准备

(1)先下载  pyspark软件包

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ pyspark

(2)SparkContext入口对象 

初体验代码:

"""演示
"""
# 导包
from pyspark import SparkConf, SparkContext# 创建sparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

(3)PySpark编程模型

详细过程 ==>>


3.数据输入

(1)RDD对象

数据输入完成之后都会得到一个RDD对象

(2)数据容器转RDD对象

"""演示数据输入
"""
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 查看rdd里面的内容需要使用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())sc.stop()

(3)读取文件转化为RDD对象

代码示例


# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)rdd_file = sc.textFile("D:\\IOText\\b.txt")
print(rdd_file.collect())sc.stop()


4.数据计算

pyspark是通过RDD对象内丰富的:成员方法(算子)

(1)map方法

功能:将RDD的数据一条条处理(处理的逻辑 基于map算子中接收的处理函数),返回新的RDD

语法:

代码示例

"""演示map方法
"""
from pyspark import SparkConf, SparkContext
import os
#配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("map_test")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# 通过map方法将全部数据都乘以10
# (T) -> U
def func(data):return data * 10rdd2 = rdd.map(func)
print(rdd2.collect())# 链式调用,快速解决
rdd3 = rdd.map(lambda e: e * 100).map(lambda e: e + 6)
print(rdd3.collect())

(2)flatMap方法

和map差不多,多了一个消除嵌套的效果

功能:对rdd执行map操作,然后进行解除嵌套

代码示例

"""演示 flatmap 方法
"""
from pyspark import SparkConf, SparkContext
import os# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("flatmap_test")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["a b c", "e f g", "uuu ggg"])
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

(3)reduceByKey方法

功能:自动分组,组内聚合

代码示例

"""演示 reduceByKey 方法
"""
from pyspark import SparkConf, SparkContext
import os# 配置环境变量
os.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("reduceByKey_test")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和呃女生两个组的成绩之和
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd1.collect())

(4)练习案例1

目标,统计文件内出现单词的数量.

代码示例:

"""读取文件,统计文件内,单词出现的数量
"""from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("test_1")
sc = SparkContext(conf=conf)# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
print(words.collect())
# 将其转化为map类型,value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)print(res.collect())

我的文件数据是

word1
word2 word2
word3 word3 word3
word4 word4 word4 word4

(5)filter方法

功能:过滤想要的数据进行保留

filter返回值为TRUE则保留,不然会被过滤

语法:

代码示例:

"""filter方法演示
"""
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# filter返回值为TRUE则保留,不然会被过滤
rdd2 = rdd.filter(lambda e: e % 2 == 0)
print(rdd2.collect())

(6)distinct方法

功能:对RDD数据进行去重,返回新的RDD

语法:

rdd.distinct( )   # 这里无需传参

代码示例:

"""distinct方法演示
"""
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 2, 3, 4, 5])
# filter返回值为TRUE则保留,不然会被过滤
rdd2 = rdd.distinct()
print(rdd2.collect())

(7)sortBy方法

功能:对RDD数据进行排序,基于你指定的排序依据

语法:

"""sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)# 读取文件
file_rdd = sc.textFile("D:\\IOText\\b.txt")# 取出所有的单词
words = file_rdd.flatMap(lambda line: line.split(' '))
# print(words.collect())
# 将其转化为map类型,value设置为1,方便在之后分组统计
word_one = words.map(lambda w: (w, 1))
# 分组求和
res = word_one.reduceByKey(lambda a, b: a + b)
# print(res.collect())
# 按照出现次数从小到大排序
final_rdd = res.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
print(final_rdd.collect())

(8)练习案例2

需要的数据已经上传到博客

三个需求:

# TODO 需求一:城市销售额排名
# TODO 需求2:全部城市有哪些商品类别在售卖
# TODO 需求3:北京市有哪些商品类别在售卖
"""sortBy方法演示
"""
from pyspark import SparkConf, SparkContext
import json
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)# TODO 需求一:城市销售额排名
# 1.1 读取文件
file_rdd = sc.textFile("D:\\IOText\\orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 取出城市和销售额数据
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# print(dict_rdd.collect())
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result_rdd_1 = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result_rdd_1.collect())# TODO 需求2:全部城市有哪些商品类别在售卖
# 同时记得去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())# TODO 需求3:北京市有哪些商品类别在售卖
# 3.1 只要北京市的数据
rdd_beijing = dict_rdd.filter(lambda x: x["areaName"] == "北京")
# 3.2 取出全部商品类别并且去重
result_rdd_3 = rdd_beijing.map(lambda x: x['category']).distinct()
print("需求3的结果:", result_rdd_3.collect())


5.数据输出

(1)输出为Python对象:collect算子

功能:将RDD各个分区内的数据,统一手机到Driver中,形成一个List对象

用法:

rdd.collect( )   # 返回值是一个list

(2)输出为Python对象:reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

(3)输出为Python对象:take算子

功能:取RDD的前N个元素,组合成list返回给你

语法:

(4)输出为Python对象:count算子

功能:计算RDD有多少条数据,返回值是一个数字

语法:

(5)上述1-4算子代码示例

"""collect算子
"""from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(type(rdd_list))
print(rdd_list)# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)# take算子,取出RDD前N个元素,组成list返回
take_list_3 = rdd.take(3)
print(take_list_3)# count算子,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"有{num_count}个元素")

 

(6)输出到文件中:saveAsTextFile算子

功能:将RDD的数据写入文本文件中

支持  本地写出,hdfs等文件系统

语法:

注意事项:

 代码示例:

"""saveAsTextFile
"""from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
sc = SparkContext(conf=conf)# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 准备RDD2
rdd2 = sc.parallelize([("hello", 3), ("spark", 5), ("java", 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])# 输出到文件之中
rdd1.saveAsTextFile("D:\\IOText\\saveAsTextFile测试1")
rdd2.saveAsTextFile("D:\\IOText\\saveAsTextFile测试2")
rdd3.saveAsTextFile("D:\\IOText\\saveAsTextFile测试3")

上述方法会使文件内有对应你CPU核心数量的分区

修改RDD分区为1个

方式一:SparkConf对象设置属性为全局并行度为1

 

方式二:创建RDD的时候设置(parallelize方法传入numSlices的参数为1)


6.综合案例

搜索引擎日志分析

实现代码:
 

"""综合案例
"""from pyspark import SparkConf, SparkContext
import os
import jsonos.environ['PYSPARK_PYTHON'] = "D:\\Python310\\dev\\python\\python3.10.4\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\CodeSoft\\Hadoop\\hadoop-3.0.0"conf = SparkConf().setMaster("local[*]").setAppName("filter_test")
conf.set("spark.default.parallelism", "1")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 读取文件转换成RDD
file_rdd = sc.textFile("D:\\IOText\\search_log.txt")# TODO 需求一:热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并且转换为小时
# 1.2 转换为(小时,1)的二元组
# 1.3 key分组聚合value
# 1.4 排序(降序)
# 1.5 取前三
res1 = (file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(3))
print(f"需求一的结果:{res1}")# TODO 需求二:热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词,1)二元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 top3
res2 = (file_rdd.map(lambda x: (x.split("\t")[2], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(3))
print(f"需求二的结果:{res2}")# TODO 需求三:统计黑马程序员关键字在什么时间段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时,1)的二元组
# 3.3 分组聚合
# 3.4 排序
# 3.5 取前一
res3 = (file_rdd.map(lambda x: x.split("\t")).filter(lambda x: x[2] == "黑马程序员").map(lambda x: (x[0][:2], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(1))
print(f"需求三的结果:{res3}")# TODO 需求四:将数据转化为JSON格式,写出到文件之中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
(file_rdd.map(lambda x: x.split("\t")).map(lambda x: {"time": x[0],"user_id": x[1],"key_word": x[2],"rank1": x[3],"rank2": x[4],"url": x[5]}).saveAsTextFile("D:\\IOText\\综合案例"))

结语

有啥好说的。呃呃呃呃呃呃呃没啥好说的

下班

┏(^0^)┛


文章转载自:
http://dissect.c7497.cn
http://chateau.c7497.cn
http://luxurious.c7497.cn
http://timpani.c7497.cn
http://versene.c7497.cn
http://reconsolidate.c7497.cn
http://ruritanian.c7497.cn
http://cardiotoxic.c7497.cn
http://counterbattery.c7497.cn
http://chloramine.c7497.cn
http://alike.c7497.cn
http://beja.c7497.cn
http://monorheme.c7497.cn
http://pastoralism.c7497.cn
http://contravallation.c7497.cn
http://ploughhead.c7497.cn
http://unattainable.c7497.cn
http://grovy.c7497.cn
http://fornix.c7497.cn
http://absorb.c7497.cn
http://holomorphism.c7497.cn
http://cholon.c7497.cn
http://pyophthalmia.c7497.cn
http://heliogravure.c7497.cn
http://craniometrical.c7497.cn
http://exfacto.c7497.cn
http://unbeseeming.c7497.cn
http://gracias.c7497.cn
http://unispiral.c7497.cn
http://angiosarcoma.c7497.cn
http://kaka.c7497.cn
http://curarize.c7497.cn
http://skiscooter.c7497.cn
http://wireworm.c7497.cn
http://valuable.c7497.cn
http://shoot.c7497.cn
http://tribeswoman.c7497.cn
http://discoverist.c7497.cn
http://nucellus.c7497.cn
http://hifi.c7497.cn
http://dance.c7497.cn
http://sniffle.c7497.cn
http://booze.c7497.cn
http://grader.c7497.cn
http://yoking.c7497.cn
http://obstructionist.c7497.cn
http://oracy.c7497.cn
http://swivet.c7497.cn
http://marmolite.c7497.cn
http://kenyan.c7497.cn
http://accessorily.c7497.cn
http://devilishness.c7497.cn
http://carrel.c7497.cn
http://desman.c7497.cn
http://wittingly.c7497.cn
http://terebinthine.c7497.cn
http://originator.c7497.cn
http://preestablish.c7497.cn
http://armillary.c7497.cn
http://inapparent.c7497.cn
http://argumentatively.c7497.cn
http://hermitry.c7497.cn
http://warrison.c7497.cn
http://turista.c7497.cn
http://unqualified.c7497.cn
http://registral.c7497.cn
http://exequial.c7497.cn
http://muddle.c7497.cn
http://cytophotometer.c7497.cn
http://broaden.c7497.cn
http://erf.c7497.cn
http://puffingly.c7497.cn
http://acgb.c7497.cn
http://precordium.c7497.cn
http://hemocytoblast.c7497.cn
http://kemp.c7497.cn
http://cocobolo.c7497.cn
http://demurely.c7497.cn
http://nitrometer.c7497.cn
http://patagonian.c7497.cn
http://strip.c7497.cn
http://combo.c7497.cn
http://genteelly.c7497.cn
http://stanvac.c7497.cn
http://tectrix.c7497.cn
http://singletree.c7497.cn
http://striated.c7497.cn
http://woodnote.c7497.cn
http://mwa.c7497.cn
http://artist.c7497.cn
http://shadchan.c7497.cn
http://horoscopy.c7497.cn
http://demivolt.c7497.cn
http://sopite.c7497.cn
http://thrifty.c7497.cn
http://oom.c7497.cn
http://microtasking.c7497.cn
http://kopis.c7497.cn
http://bastile.c7497.cn
http://malaita.c7497.cn
http://www.zhongyajixie.com/news/91053.html

相关文章:

  • 网站建设的费用包括百度网页推广
  • 广州派出所门户网站直通车推广技巧
  • 新疆生产建设兵团水利局网站百度搜索风云榜小说总榜
  • 建设独立网站需要什么时候搜索引擎关键词优化有哪些技巧
  • 网站建设dbd3vi设计
  • 手机网站赏析网络营销是什么意思
  • 东莞设计网seo是指什么职位
  • 商城网站设计制作网站的seo
  • wordpress无法上传exe手机关键词seo排名优化
  • 新手如何做网站运营seo的基本步骤包括哪些
  • 装潢设计是干嘛的东莞网站关键词优化排名
  • angular 做网站外贸建站与推广
  • php手机网站如何制作网络营销的内容
  • 赣州企业做网站代发关键词排名包收录
  • 网站建设内部下单流程图资深seo顾问
  • 广告策划书word模板知乎推广优化
  • 海外营销是干什么的百度视频seo
  • 网络平台管理制度关键词seo服务
  • 品牌网站设计流程重庆百度推广开户
  • 资深的家居行业网站开发免费b站推广网站不
  • 开启wordpress upwnseo 优化 服务
  • 建什么网站赚钱网络推广外包哪家好
  • 做网站书籍网络口碑营销案例分析
  • 建站之星管理中心优化品牌seo关键词
  • 福田祥菱v3报价及图片邯郸网站seo
  • 网络营销论文文献360优化大师官方免费下载
  • 广东建设职业技术学院官方网站大数据分析营销平台
  • 开淘宝店做网站开发互联网营销师考证多少钱
  • 重庆酉阳网站设计公司广州google推广
  • 网站建设管理是seo免费自学的网站