在Jupyter Notebook中使用pyspark操作Spark DataFrame基本功能

2021-9-26 15:49| 发布者: Fuller| 查看: 4636| 评论: 0

摘要: 1,背景说明之前我们发布过一篇Notebook模板:《集搜客分词结果表用Jupyter Notebook做统计分析—对应Excel功能》。该模板以GooSeeker分词和文本分析软件生成的数据表作为处理对象,在Python Pandas Dataframe中对这 ...

1,背景说明

之前我们发布过一篇Notebook模板:《集搜客分词结果表用Jupyter Notebook做统计分析—对应Excel功能》。该模板以GooSeeker分词和文本分析软件生成的数据表作为处理对象,在Python Pandas Dataframe中对这些数据表进行了类似excel的处理,通过该Notebook介绍了一系列数据表的基本操作方法,跟Excel的功能项逐一做对比。

最近有同学在GooSeeker论坛发布了Spark环境部署详细步骤的帖子,同时我们也分享了使用Spark GraphX做为研究工具的研究范例《基于社交关系和用户偏好的多样性图推荐方法》。有同学在技术交流QQ群和我们交流,问能否发布一些Spark的学习和实验用的Notebook模板。我们认为这是一个非常好的建议,同时也计划陆续发布有关Spark DataFrame, Spark GraphX等的学习记录和实验模板。

本Notebook基于Spark官网的Quick Start, 实验PySpark DataFrame的功能。

2,Spark简介

2.1 Spark架构图

1.大数据处理统一平台:Spark提供了一致的,可组合的统一API

2.计算引擎:Spark专注于计算引擎,不负责持久存储。你可以将多种持久化存储系统与Spark结合使用,包括云存储系统(如Azure存储和Amazon S3),分布式 文件系统(Apache Hadoop),键值存储系统(Apache Cassandra),消息队列系统(Apache Kafka)。

3.配套的软件库:Spark不仅支持引擎附带的标准库,也支持由开源社区以第三方包形式发布的大量外部库,包括Spark SQL, MLlib, (Structured)Streaming, 图分析(GraphX)库。spark-packages.org提供了外部库的索引。

2.2 Spark进程组成

Spark 运行架构上图所示,包括集群资源管理器(Cluster Manager)、多个运行作业任务的工作结点(Worker Node)、每个应用的任务控制结点(Driver)和每个工作结点上负责具体任务的执行进程(Executor)。

Driver 是运行 Spark Applicaion 的 main() 函数,它会创建 SparkContext。SparkContext 负责和 Cluster Manager 通信,进行资源申请、任务分配和监控等。

Cluster Manager 负责申请和管理在 Worker Node 上运行应用所需的资源,目前包括 Spark 原生的 Cluster Manager、Mesos Cluster Manager 和 Hadoop YARN Cluster Manager。

Executor 是 Application 运行在 Worker Node 上的一个进程,负责运行 Task(任务),并且负责将数据存在内存或者磁盘上,每个 Application 都有各自独立的一批 Executor。每个 Executor 则包含了一定数量的资源来运行分配给它的任务。

每个 Worker Node 上的 Executor 服务于不同的 Application,它们之间是不可以共享数据的。与 MapReduce 计算框架相比,Spark 采用的 Executor 具有两大优势。 Executor 利用多线程来执行具体任务,相比 MapReduce 的进程模型,使用的资源和启动开销要小很多。 Executor 中有一个 BlockManager 存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算的时候,可以将中间结果存储到这个存储模块里,供下次需要时直接使用,而不需要从磁盘中读取,从而有效减少 I/O 开销,在交互式查询场景下,可以预先将数据缓存到 BlockManager 存储模块上,从而提高读写 I/O 性能。

3,本Notebook实现的功能

本Notebook基于Spark官网的Quick Start, 使用测试数据,实验PySpark DataFrame的功能:创建,显示数据,选择和存取数据,数据分组,保存和读取,使用SQL

4,运行本Notebook需要的第3方库

运行本Notebook需要安装pyspark库,如果没有安装,打开Anaconda的command窗口,运行如下命令:

pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple/

4.1 解决版本配套问题

如果运行下面的创建dataframe的函数出现异常,一般是PySpark版本不配套造成的。那么尝试下面的过程

4.1.1 根据需要降低pySpark版本

1. 首先查看目前应该跟什么版本配套

2. 然后安装特定版本pySpark

import sys

sys.version

可能看到下面的输出结果(具体跟你安装的anaconda有关):

'3.8.5 (default, Sep  3 2020, 21:29:08) [MSC v.1916 64 bit (AMD64)]'

参照帖子Spark环境部署详细步骤介绍的方法,安装低版本的pyspark。为什么要降低pyspark的版本,似乎是很怪异。

4.1.2 第一次重装pyspark

我按照帖子的说明降低了版本,但是遇到了这个错误

ERROR: Could not install packages due to an EnvironmentError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。: 'c:\programdata\anaconda3\lib\site-packages\pyspark\jars\accessors-smart-1.2.jar' Consider using the --user option or check the permissions.

我打算停下这个notebook再尝试一下安装。我除了关闭了这个notebook网页,还把这个notebook从running状态stop了。

4.1.3 第二次重装pyspark

再次执行安装命令,没有看到安装错误,不知道是不是真的解决了,继续往下执行吧。

这次在import的时候就异常了:

ImportError: cannot import name '_parse_memory' from 'pyspark.util' (C:\ProgramData\Anaconda3\lib\site-packages\pyspark\util.py)

4.1.4 第n次重装pyspark

我估计是安装了两次pyspark,可能版本冲突了,尝试先用pip uninstall pyspark卸载以后再重新install,其实实验发现这是没有用的。

4.1.5 终于成功重装pyspark

最后我采用了这个顺序:首先uninstall,然后我用管理员身份打开anaconda prompt,再执行install 3.0.1版本的pyspark。这次成功了。

5,初始化-创建SparkSession对象

SparkSession对象是运行Spark代码的入口点。

基于Python或R运行Spark时,不用写显式的jvm指令,通过编写Python或R代码来调用SparkSession,Spark将它们转换为可以在JVM上运行的代码。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


6,Spark DataFrame创建

以下4种方法创建的DataFrame是相同的。

创建5个字段的dataframe,并且插入3条记录。

6.1 基于行列表创建DataFrame

from datetime import datetime, date

import pandas as pd

from pyspark.sql import Row

df = spark.createDataFrame([

    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),

    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),

    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))

])

# 显示前5行

df.show(5)


如果pyspark版本配套正常,那么应该输出下面的结果:

但是,我第一次执行上述代码段,遇到了以下异常:

Py4JJavaError: An error occurred while calling o36.showString.

: org.apache.spark.SparkException: Job aborted due to stage failure:

Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.0.105 executor driver):

org.apache.spark.SparkException: Python worker failed to connect back.

我又回到前面解决版本配套问题一节,按照说明重新安装了pyspark,然后再重新运行这个notebook


6.2 指定schema创建DataFrame

df = spark.createDataFrame([

    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),

    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),

    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))

], schema='a long, b double, c string, d date, e timestamp')

# 显示前5行

df.show(5)

6.3 从Pandas Dataframe创建Spark DataFrame

6.3.1 从Pandas DataFrame转成Spark DataFrame

先创建一个Pandas Dataframe, 然后直接基于Pandas Dataframe创建Spark DataFrame

pandas_df = pd.DataFrame({

    'a': [1, 2, 3],

    'b': [2., 3., 4.],

    'c': ['string1', 'string2', 'string3'],

    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],

    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]

})

df = spark.createDataFrame(pandas_df)

# 显示前5行

df.show(5)

6.3.2 从Spark DataFrame转换成Pandas DataFrame

笔者注:Spark DataFrame是分布式的,如果数据量比较大,那么转成Pandas DataFrame应该会有异常发生。有兴趣的同学可以测试下

pd_df = df.toPandas()

pd_df.head()

6.4 从RDD(弹性数据集)创建Spark DataFrame

rdd = spark.sparkContext.parallelize([

    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),

    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),

    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))

])

df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])

# 显示前5行

df.show(5)

7,查看数据

可以用show() 方法来展示数据,show有以下几种不同的使用方式:

show():显示所有数据

show(n) :显示前n条数据

show(true): 最多显示20个字符,默认为true

show(false): 去除最多显示20个字符的限制

show(n, true):显示前n条并最多显示20个字符


7.1 查看第1行

# 查看第1行

df.show(1)

7.2 查看前10行

df.show(10)

7.3 一行显示一个字段

# 一行显示一个字段

df.show(1, vertical=True)

7.4 查看列名

df.columns

输出结果:

['a', 'b', 'c', 'd', 'e']


7.5 查看结构信息

df.printSchema()

输出结果:

root

 |-- a: long (nullable = true)

 |-- b: double (nullable = true)

 |-- c: string (nullable = true)

 |-- d: date (nullable = true)

 |-- e: timestamp (nullable = true)


7.6 查看摘要信息describe()

这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。

df.select("a", "b", "c").describe().show()

7.7 DataFrame.collect()

将分布式数据收集到驱动节点侧,作为 Python 中的本地数据。

请注意,不同于前面的show方法,这里的collect方法会将df中的所有数据都获取到,并返回一个Array对象。当数据集太大时,可能会引发内存溢出错误,因为它从所有的执行器把数据收集到驱动节点。

df.collect()

输出结果:

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),

 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),

 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]


7.8 转换成Padas Dataframe

当数据量太大时,有可能出现“内存溢出”错误

df.toPandas()

8,选择和访问数据

8.1 选择某列

from pyspark.sql import Column

from pyspark.sql.functions import upper

df.a

输出结果:

Column<b'a'>


8.2 选择并显示列数据

df.select(df.c).show()

8.3 可以传入DataFrame的某列对其进行计算

df.select(df.a + 1,df.b,df.c).show()

8.4 新增1列

使用withColumn来增加列

df_new = df.withColumn('upper_c', upper(df.c))

df.show()

df_new.show()

这里要注意,增加一列会生成一个新的DataFrame,这是因为底层实现是immutable的,不可能在当前DataFrame上in-place方式修改数据。

所以,要注意其他代码段,都是在执行一个修改数据表结构和内容的函数之后紧接着写.show(),其实这个show()函数是show了新生成的DataFrame。

8.5 重命名

使用withColumnRenamed重命名某列

df2 = df.withColumnRenamed('e', 'renamed_e')

df2.show()

8.6 使用drop删除某列

返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段

df2.drop('renamed_e').show()

9,筛选和过滤

9.1 使用where进行过滤

where(conditionExpr: String)过滤:SQL语言中where关键字后的条件 ,传入筛选条件表达式,可以用and和or,得到DataFrame类型的返回结果

df.where("a > 1 and e like '2000-01-03%'").show()

9.2 使用filter进行过滤

传入筛选条件表达式,得到DataFrame类型的返回结果。

df.filter(df.a == 1).show()

df.filter("a > 1 and e like '2000-01-03%'").show()

10,数据分组

PySpark 可以按特定条件对数据进行分组

10.1 创建测试Dataframe

df = spark.createDataFrame([

    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],

    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],

    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

df.show()

10.2 按颜色分组计算平均值

df.groupby('color').avg().show()

11,执行SQL

DataFrame和Spark SQL 共享相同的执行引擎,因此可以无缝互换使用。例如,您可以将DataFrame注册为表,并轻松运行 SQL

df.createOrReplaceTempView("tableA")

spark.sql("SELECT count(*) from tableA").show()

from pyspark.sql.functions import expr

df.select(expr('count(*)') > 0).show()

12,排序

orderBy或sort:按指定字段排序,默认为升序

df.show()

12.1 按字段降序序排列

df.orderBy('color',ascending=False).show()

12.2 按字段升序排列

df.sort('fruit',ascending=True).show()

13,下载本Notebook

Jupyter Notebook使用python实验Spark DataFrame基本功能.zip


鲜花

握手

雷人

路过

鸡蛋

最新评论

GMT+8, 2024-4-17 02:16