app开发者平台在数字化时代的重要性与发展趋势解析
620
2022-09-02
PysparkNote100---DataFrame常用操作
创建数据框
构造两个数据集
df1:用户基础属性,年龄、性别、注册时间df2:用户交易属性,交易时间、支付金额
import pyspark.sql.functions
from pyspark.sql import SparkSession# 创建SparkSession对象,调用.builder类# .appName("testapp")方法给应用程序一个名字;.getOrCreate()方法创建或着获取一个已经创建的SparkSessionspark = SparkSession.builder.appName("pysaprk").getOrCreate()
df1 = spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50"), (2, "female", "37", "2019-01-02 11:55:50"), (3, "male", "21", "2019-01-21 11:45:50"), (4, "female", "44", "2019-02-01 12:45:50"), (5, "male", "39", "2019-01-15 10:40:50")], ["id", "sex", "age", "createtime_str"])
df2 = spark.createDataFrame([(1, "2019-04-01 11:45:50", 11.15), (2, "2019-05-02 11:56:50", 10.37), (3, "2019-07-21 12:45:50", 12.11), (4, "2019-08-01 12:40:50", 14.50), (5, "2019-01-06 10:00:50", 16.39)], ["id", "buytime_str", "payamount"])
df1.show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|| 4|female| 44|2019-02-01 12:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+
df2.show()
+---+-------------------+---------+| id| buytime_str|payamount|+---+-------------------+---------+| 1|2019-04-01 11:45:50| 11.15|| 2|2019-05-02 11:56:50| 10.37|| 3|2019-07-21 12:45:50| 12.11|| 4|2019-08-01 12:40:50| 14.5|| 5|2019-01-06 10:00:50| 16.39|+---+-------------------+---------+
数据框概览
判断类型
type(df1)
pyspark.sql.dataframe.DataFrame
打印列类型
df1.printSchema()
root |-- id: long (nullable = true) |-- sex: string (nullable = true) |-- age: string (nullable = true) |-- createtime_str: string (nullable = true)
描述性统计
df1.describe().show()
+-------+------------------+------+------------------+-------------------+|summary| id| sex| age| createtime_str|+-------+------------------+------+------------------+-------------------+| count| 5| 5| 5| 5|| mean| 3.0| null| 31.8| null|| stddev|1.5811388300841898| null|11.562871615649808| null|| min| 1|female| 18|2019-01-01 11:45:50|| max| 5| male| 44|2019-02-01 12:45:50|+-------+------------------+------+------------------+-------------------+
行数列数统计
df1.count()
5
len(df1.columns)
4
print("行数:"+str(df1.count())+"\n"+"列数:"+str(len(df1.columns)))
行数:5列数:4
取子集
取某个位置
df1.collect()[1][1]
'female'
筛选前几行
df1.limit(3).show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+---+------+---+-------------------+
筛选某些列
df1.select("id","sex").show(3)
+---+------+| id| sex|+---+------+| 1| male|| 2|female|| 3| male|+---+------+only showing top 3 rows
条件筛选
df1.filter("id>2").show(2)
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 3| male| 21|2019-01-21 11:45:50|| 4|female| 44|2019-02-01 12:45:50|+---+------+---+-------------------+only showing top 2 rows
.where("id = 2").show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|+---+------+---+-------------------+
df1.filter(df1.id > 3).show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 4|female| 44|2019-02-01 12:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+
列操作
列数&列名
len(df1.columns)
4
df1.columns
['id', 'sex', 'age', 'createtime_str']
列名修改
str(df1)
'DataFrame[id: bigint, sex: string, age: string, createtime_str: string]'
修改一个列
df1.withColumnRenamed("id","idNew").show(3)
+-----+------+---+-------------------+|idNew| sex|age| createtime_str|+-----+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+-----+------+---+-------------------+only showing top 3 rows
修改多个列
df1.withColumnRenamed("id", "idNew").withColumnRenamed("age", "ageNew").show(3)
+-----+------+------+-------------------+|idNew| sex|ageNew| createtime_str|+-----+------+------+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+-----+------+------+-------------------+only showing top 3 rows
修改全部列
newCol = ["id_new","sex_new","age_new","createtime_str_new"]df1.toDF(*newCol).show(3)
+------+-------+-------+-------------------+|id_new|sex_new|age_new| createtime_str_new|+------+-------+-------+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2| female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|+------+-------+-------+-------------------+only showing top 3 rows
删除列
删除一个列
df1.drop("id").show(2)
+------+---+-------------------+| sex|age| createtime_str|+------+---+-------------------+| male| 18|2019-01-01 11:45:50||female| 37|2019-01-02 11:55:50|+------+---+-------------------+only showing top 2 rows
删除多个列
df1.drop("sex","age").show(2)
+---+-------------------+| id| createtime_str|+---+-------------------+| 1|2019-01-01 11:45:50|| 2|2019-01-02 11:55:50|+---+-------------------+only showing top 2 rows
列筛选
df1.select("sex","age").show(2)
+------+---+| sex|age|+------+---+| male| 18||female| 37|+------+---+only showing top 2 rows
增加列
增加常数列
from pyspark.sql import functions as fdf1.withColumn("idNew",f.lit(1)).show()
+---+------+---+-------------------+-----+| id| sex|age| createtime_str|idNew|+---+------+---+-------------------+-----+| 1| male| 18|2019-01-01 11:45:50| 1|| 2|female| 37|2019-01-02 11:55:50| 1|| 3| male| 21|2019-01-21 11:45:50| 1|| 4|female| 44|2019-02-01 12:45:50| 1|| 5| male| 39|2019-01-15 10:40:50| 1|+---+------+---+-------------------+-----+
通过运算增加列
df1.withColumn("idNew",df1.id * 2).show(3)
+---+------+---+-------------------+-----+| id| sex|age| createtime_str|idNew|+---+------+---+-------------------+-----+| 1| male| 18|2019-01-01 11:45:50| 2|| 2|female| 37|2019-01-02 11:55:50| 4|| 3| male| 21|2019-01-21 11:45:50| 6|+---+------+---+-------------------+-----+only showing top 3 rows
通过向量or列表增加列
需要先转换成dataframe,再关联
列的类型转化
几个列类型转化
df1.select(df1.id.cast("int"),"sex",df1.age.cast("double"),df1.createtime_str.cast("Timestamp")).show()
+---+------+----+-------------------+| id| sex| age| createtime_str|+---+------+----+-------------------+| 1| male|18.0|2019-01-01 11:45:50|| 2|female|37.0|2019-01-02 11:55:50|| 3| male|21.0|2019-01-21 11:45:50|| 4|female|44.0|2019-02-01 12:45:50|| 5| male|39.0|2019-01-15 10:40:50|+---+------+----+-------------------+
df1.select(df1.id.cast("int"),"sex",df1.age.cast("double"),df1.createtime_str.cast("Timestamp")).printSchema()
root |-- id: integer (nullable = true) |-- sex: string (nullable = true) |-- age: double (nullable = true) |-- createtime_str: timestamp (nullable = true)
多个列批量转化
暂时没找到批量转的方法,先这样,后面遇到再添加吧
列之间的运算
df1.withColumn("id_x",f.lit(3)).withColumn("greatest",f.greatest("id","id_x")).withColumn("least",f.least("id","id_x")).show()
+---+------+---+-------------------+----+--------+-----+| id| sex|age| createtime_str|id_x|greatest|least|+---+------+---+-------------------+----+--------+-----+| 1| male| 18|2019-01-01 11:45:50| 3| 3| 1|| 2|female| 37|2019-01-02 11:55:50| 3| 3| 2|| 3| male| 21|2019-01-21 11:45:50| 3| 3| 3|| 4|female| 44|2019-02-01 12:45:50| 3| 4| 3|| 5| male| 39|2019-01-15 10:40:50| 3| 5| 3|+---+------+---+-------------------+----+--------+-----+
行操作
统计行数
df1.count()
5
行筛选
filter就行
重复行删除
df1.union(spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50")])).show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 1| male| 18|2019-01-01 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 3| male| 21|2019-01-21 11:45:50|| 4|female| 44|2019-02-01 12:45:50|| 5| male| 39|2019-01-15 10:40:50|| 1| male| 18|2019-01-01 11:45:50|+---+------+---+-------------------+
df1.union(spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50")])).dropDuplicates().show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 2|female| 37|2019-01-02 11:55:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+
聚合操作,groupby
data = [("a",2, 2), ("a",1, 3), ("b",3, 4), ("b",3, 4)]df = spark.createDataFrame(data, schema=['user','x1', 'x2'])
from pyspark.sql.functions import *
df.groupBy("user").agg((count("x1") - 1).alias("num")).show()
+----+---+|user|num|+----+---+| b| 1|| a| 1|+----+---+
df.distinct().show()
+----+---+---+|user| x1| x2|+----+---+---+| a| 2| 2|| b| 3| 4|| a| 1| 3|+----+---+---+
排序
df.show()
+----+---+---+|user| x1| x2|+----+---+---+| a| 2| 2|| a| 1| 3|| b| 3| 4|| b| 3| 4|+----+---+---+
df.orderBy(["x1", "x2"], ascending=[1, 0]).show()
+----+---+---+|user| x1| x2|+----+---+---+| a| 1| 3|| a| 2| 2|| b| 3| 4|| b| 3| 4|+----+---+---+
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~