PysparkNote100---DataFrame常用操作

网友投稿 574 2022-09-02

PysparkNote100---DataFrame常用操作

PysparkNote100---DataFrame常用操作

创建数据

构造两个数据集

df1:用户基础属性,年龄、性别、注册时间df2:用户交易属性,交易时间、支付金额

import pyspark.sql.functions

from pyspark.sql import SparkSession# 创建SparkSession对象,调用.builder类# .appName("testapp")方法给应用程序一个名字;.getOrCreate()方法创建或着获取一个已经创建的SparkSessionspark = SparkSession.builder.appName("pysaprk").getOrCreate()

df1 = spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50"), (2, "female", "37", "2019-01-02 11:55:50"), (3, "male", "21", "2019-01-21 11:45:50"), (4, "female", "44", "2019-02-01 12:45:50"), (5, "male", "39", "2019-01-15 10:40:50")], ["id", "sex", "age", "createtime_str"])

df2 = spark.createDataFrame([(1, "2019-04-01 11:45:50", 11.15), (2, "2019-05-02 11:56:50", 10.37), (3, "2019-07-21 12:45:50", 12.11), (4, "2019-08-01 12:40:50", 14.50), (5, "2019-01-06 10:00:50", 16.39)], ["id", "buytime_str", "payamount"])

df1.show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|| 4|female| 44|2019-02-01 12:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+

df2.show()

+---+-------------------+---------+| id| buytime_str|payamount|+---+-------------------+---------+| 1|2019-04-01 11:45:50| 11.15|| 2|2019-05-02 11:56:50| 10.37|| 3|2019-07-21 12:45:50| 12.11|| 4|2019-08-01 12:40:50| 14.5|| 5|2019-01-06 10:00:50| 16.39|+---+-------------------+---------+

数据框概览

判断类型

type(df1)

pyspark.sql.dataframe.DataFrame

打印列类型

df1.printSchema()

root |-- id: long (nullable = true) |-- sex: string (nullable = true) |-- age: string (nullable = true) |-- createtime_str: string (nullable = true)

描述性统计

df1.describe().show()

+-------+------------------+------+------------------+-------------------+|summary| id| sex| age| createtime_str|+-------+------------------+------+------------------+-------------------+| count| 5| 5| 5| 5|| mean| 3.0| null| 31.8| null|| stddev|1.5811388300841898| null|11.562871615649808| null|| min| 1|female| 18|2019-01-01 11:45:50|| max| 5| male| 44|2019-02-01 12:45:50|+-------+------------------+------+------------------+-------------------+

行数列数统计

df1.count()

5

len(df1.columns)

4

print("行数:"+str(df1.count())+"\n"+"列数:"+str(len(df1.columns)))

行数:5列数:4

取子集

取某个位置

df1.collect()[1][1]

'female'

筛选前几行

df1.limit(3).show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+---+------+---+-------------------+

筛选某些列

df1.select("id","sex").show(3)

+---+------+| id| sex|+---+------+| 1| male|| 2|female|| 3| male|+---+------+only showing top 3 rows

条件筛选

df1.filter("id>2").show(2)

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 3| male| 21|2019-01-21 11:45:50|| 4|female| 44|2019-02-01 12:45:50|+---+------+---+-------------------+only showing top 2 rows

.where("id = 2").show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|+---+------+---+-------------------+

df1.filter(df1.id > 3).show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 4|female| 44|2019-02-01 12:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+

列操作

列数&列名

len(df1.columns)

4

df1.columns

['id', 'sex', 'age', 'createtime_str']

列名修改

str(df1)

'DataFrame[id: bigint, sex: string, age: string, createtime_str: string]'

修改一个列

df1.withColumnRenamed("id","idNew").show(3)

+-----+------+---+-------------------+|idNew| sex|age| createtime_str|+-----+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+-----+------+---+-------------------+only showing top 3 rows

修改多个列

df1.withColumnRenamed("id", "idNew").withColumnRenamed("age", "ageNew").show(3)

+-----+------+------+-------------------+|idNew| sex|ageNew| createtime_str|+-----+------+------+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+-----+------+------+-------------------+only showing top 3 rows

修改全部列

newCol = ["id_new","sex_new","age_new","createtime_str_new"]df1.toDF(*newCol).show(3)

+------+-------+-------+-------------------+|id_new|sex_new|age_new| createtime_str_new|+------+-------+-------+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2| female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+------+-------+-------+-------------------+only showing top 3 rows

删除列

删除一个列

df1.drop("id").show(2)

+------+---+-------------------+| sex|age| createtime_str|+------+---+-------------------+| male| 18|2019-01-01 11:45:50||female| 37|2019-01-02 11:55:50|+------+---+-------------------+only showing top 2 rows

删除多个列

df1.drop("sex","age").show(2)

+---+-------------------+| id| createtime_str|+---+-------------------+| 1|2019-01-01 11:45:50|| 2|2019-01-02 11:55:50|+---+-------------------+only showing top 2 rows

列筛选

df1.select("sex","age").show(2)

+------+---+| sex|age|+------+---+| male| 18||female| 37|+------+---+only showing top 2 rows

增加列

增加常数列

from pyspark.sql import functions as fdf1.withColumn("idNew",f.lit(1)).show()

+---+------+---+-------------------+-----+| id| sex|age| createtime_str|idNew|+---+------+---+-------------------+-----+| 1| male| 18|2019-01-01 11:45:50| 1|| 2|female| 37|2019-01-02 11:55:50| 1|| 3| male| 21|2019-01-21 11:45:50| 1|| 4|female| 44|2019-02-01 12:45:50| 1|| 5| male| 39|2019-01-15 10:40:50| 1|+---+------+---+-------------------+-----+

通过运算增加列

df1.withColumn("idNew",df1.id * 2).show(3)

+---+------+---+-------------------+-----+| id| sex|age| createtime_str|idNew|+---+------+---+-------------------+-----+| 1| male| 18|2019-01-01 11:45:50| 2|| 2|female| 37|2019-01-02 11:55:50| 4|| 3| male| 21|2019-01-21 11:45:50| 6|+---+------+---+-------------------+-----+only showing top 3 rows

通过向量or列表增加列

需要先转换成dataframe,再关联

列的类型转化

几个列类型转化

df1.select(df1.id.cast("int"),"sex",df1.age.cast("double"),df1.createtime_str.cast("Timestamp")).show()

+---+------+----+-------------------+| id| sex| age| createtime_str|+---+------+----+-------------------+| 1| male|18.0|2019-01-01 11:45:50|| 2|female|37.0|2019-01-02 11:55:50|| 3| male|21.0|2019-01-21 11:45:50|| 4|female|44.0|2019-02-01 12:45:50|| 5| male|39.0|2019-01-15 10:40:50|+---+------+----+-------------------+

df1.select(df1.id.cast("int"),"sex",df1.age.cast("double"),df1.createtime_str.cast("Timestamp")).printSchema()

root |-- id: integer (nullable = true) |-- sex: string (nullable = true) |-- age: double (nullable = true) |-- createtime_str: timestamp (nullable = true)

多个列批量转化

暂时没找到批量转的方法,先这样,后面遇到再添加吧

列之间的运算

df1.withColumn("id_x",f.lit(3)).withColumn("greatest",f.greatest("id","id_x")).withColumn("least",f.least("id","id_x")).show()

+---+------+---+-------------------+----+--------+-----+| id| sex|age| createtime_str|id_x|greatest|least|+---+------+---+-------------------+----+--------+-----+| 1| male| 18|2019-01-01 11:45:50| 3| 3| 1|| 2|female| 37|2019-01-02 11:55:50| 3| 3| 2|| 3| male| 21|2019-01-21 11:45:50| 3| 3| 3|| 4|female| 44|2019-02-01 12:45:50| 3| 4| 3|| 5| male| 39|2019-01-15 10:40:50| 3| 5| 3|+---+------+---+-------------------+----+--------+-----+

行操作

统计行数

df1.count()

5

行筛选

filter就行

重复行删除

df1.union(spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50")])).show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|| 4|female| 44|2019-02-01 12:45:50|| 5| male| 39|2019-01-15 10:40:50|| 1| male| 18|2019-01-01 11:45:50|+---+------+---+-------------------+

df1.union(spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50")])).dropDuplicates().show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+

聚合操作,groupby

data = [("a",2, 2), ("a",1, 3), ("b",3, 4), ("b",3, 4)]df = spark.createDataFrame(data, schema=['user','x1', 'x2'])

from pyspark.sql.functions import *

df.groupBy("user").agg((count("x1") - 1).alias("num")).show()

+----+---+|user|num|+----+---+| b| 1|| a| 1|+----+---+

df.distinct().show()

+----+---+---+|user| x1| x2|+----+---+---+| a| 2| 2|| b| 3| 4|| a| 1| 3|+----+---+---+

排序

df.show()

+----+---+---+|user| x1| x2|+----+---+---+| a| 2| 2|| a| 1| 3|| b| 3| 4|| b| 3| 4|+----+---+---+

df.orderBy(["x1", "x2"], ascending=[1, 0]).show()

+----+---+---+|user| x1| x2|+----+---+---+| a| 1| 3|| a| 2| 2|| b| 3| 4|| b| 3| 4|+----+---+---+

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Session的生成机制、回收机制和存储机制详解(session垃圾回收机制)
下一篇:回归评估指标-Python实现
相关文章

 发表评论

暂时没有评论,来抢沙发吧~