Scala102-操作Hdfs

网友投稿 611 2022-10-08

Scala102-操作Hdfs

Scala102-操作Hdfs

Info

先生成DataFrame,再把数据储存在HDFS上。

import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.ml.feature.VectorAssemblerimport org.apache.spark.ml.linalg.{Vector, Vectors}import org.apache.spark.sql.{DataFrame, Row, SparkSession}

Intitializing Scala interpreter ...SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1577952043881)SparkSession available as 'spark'import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.ml.feature.VectorAssemblerimport org.apache.spark.ml.linalg.{Vector, Vectors}import org.apache.spark.sql.{DataFrame, Row, SparkSession}

val builder = SparkSession .builder() .appName("learningScala") .config("spark.executor.heartbeatInterval","60s") .config("spark-work.timeout","120s") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max","512m") .config("spark.dynamicAllocation.enabled", false) .config("spark.sql.inMemoryColumnarStorage.compressed", true) .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000) .config("spark.sql.broadcastTimeout", 600) .config("spark.sql.autoBroadcastJoinThreshold", -1) .config("spark.sql.crossJoin.enabled", true) .master("local[*]") val spark = builder.appName("OperateHdfs").getOrCreate()spark.sparkContext.setLogLevel("ERROR")

builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@679eb75aspark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@551aa019

val df1 = Seq( (1, "male", "18" ,"2019-01-01 11:45:50"), (2, "female", "37" ,"2019-01-02 11:55:50"), (3, "male", "21" ,"2019-01-21 11:45:50"), (4, "female", "44" ,"2019-02-01 12:45:50"), (5, "male", "39" ,"2019-01-15 10:40:50") ).toDF("id","sex","age", "createtime_str")

df1: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]

val df=df1.withColumn("ds",date_format($"createtime_str","yyyyMMdd"))

df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 3 more fields]

df.show()

+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 1| male| 18|2019-01-01 11:45:50|20190101|| 2|female| 37|2019-01-02 11:55:50|20190102|| 3| male| 21|2019-01-21 11:45:50|20190121|| 4|female| 44|2019-02-01 12:45:50|20190201|| 5| male| 39|2019-01-15 10:40:50|20190115|+---+------+---+-------------------+--------+

查看hdfs文件

是否存在对应目录

import相关方法

import org.apache.hadoop.fs.{FileSystem, Path,FileStatus,FileUtil}

import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}

获取配置信息

var path="../Data"val hadoopConf = spark.sparkContext.hadoopConfiguration

path: String = ../DatahadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, __spark_hadoop_conf__.xml

val hdfs = FileSystem.get(hadoopConf)

hdfs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@25866b28

设置路径

val outputPath = new Path("../Data")

outputPath: org.apache.hadoop.fs.Path = ../Data

hdfs上是否存在这个路径

hdfs.exists(outputPath)

res2: Boolean = true

hdfs上是否存在这个文件

hdfs.exists(new Path("../Data/test.txt"))

res3: Boolean = true

判断该path是否为文件夹?

hdfs.getFileStatus(outputPath).isDirectory()

res4: Boolean = true

判断该path是否为文件?

hdfs.getFileStatus(outputPath).isFile()

res5: Boolean = false

hdfs.getFileStatus(new Path("../Data/test.txt")).isFile()

res6: Boolean = true

获取路径下所有文件

val allFiles = FileUtil.stat2Paths(hdfs.listStatus(outputPath))

allFiles: Array[org.apache.hadoop.fs.Path] = Array(file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt)

打印一级目录名和文件名

allFiles.foreach(println)

file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

打印一级目录名

逻辑:

获取路径下所有文件循环遍历,判断每一个文件是否为Directory是则打印出来

allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录.foreach(println)//打印对应一级目录名

file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest

打印对应路径下文件

allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录.foreach(println)//打印对应一级目录名

file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

封装成Object

object HdfsCheckPath { import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil} import org.apache.spark.sql.{SparkSession} /** * * @param spark SparkSession * @param path 字符串格式,指定的路径 */ def isPathExist(spark: SparkSession, path: String) = { // 获取hdfs配置信息 val hadoopConf = spark.sparkContext.hadoopConfiguration val hdfs = FileSystem.get(hadoopConf) // 设置输入的路径 val outputPath = new Path(path) if (hdfs.exists(outputPath)) { println(s"This path(${path}) Already exist!") } else { println(s"This path(${path}) don't exist!") } hdfs.exists(outputPath) } /** * * @param spark SparkSession * @param path 对应路径 * @param printlevel 打印的级别,枚举:directory、file、total */ def printPathDetail(spark: SparkSession, path: String, printlevel: String="total"): Unit = { val hadoopConf = spark.sparkContext.hadoopConfiguration val hdfs = FileSystem.get(hadoopConf) val isExists = hdfs.exists(new Path(path)) // 路径不存在无需继续,上一步中直接print出信息 if (isExists) { println("This path Already exist!") // 如果路径存在,打印出对应的一级目录和文件名 val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path))) if (printlevel == "directory") { println("-----Directory:") allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录 .foreach(println) //打印对应一级目录名 } else if (printlevel == "file") { println("-----File:") allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录 .foreach(println)//打印对应一级目录名 } else if (printlevel == "total") { println("-----Total:") allFiles.foreach(println) } }else{ println("This path don't exist!") } }}

defined object HdfsCheckPath

HdfsCheckPath.isPathExist(spark,"../Data")

This path(../Data) Already exist!res10: Boolean = true

HdfsCheckPath.printPathDetail(spark,"../Data","total")

This path Already exist!-----Total:

HdfsCheckPath.printPathDetail(spark,"../Data","directory")

file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txtThis path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest

HdfsCheckPath.printPathDetail(spark,"../Data","file")

This path Already exist!-----File:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

储存文件

储存暂时只介绍parquet格式

非分区储存

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

mode参数 Specifies the behavior when data or table already exists. Options include:

overwrite: overwrite the existing data//覆盖写入append: append the data//追加写入ignore: ignore the operation (i.e. no-op)//忽略写入操作?那搞这个干啥?error or errorifexists: default option, throw an exception at runtime//如果写入目录存在则报错

一般选择errorifexists,防止错误地把数据插入在已经存在的目录。参数saveMode的值可以是​​String​​​或者​​SaveMode​​(需要import spark.sql.SaveMode)

df1.write.mode(saveMode="errorifexists").parquet(path="../Data/hdfsSaveNoPartition/")

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3zNCZ30J-1577952595406)(…/Picture/Pic01.jpg)]

df1.write.mode(saveMode=SaveMode.Overwrite).parquet("../Data/hdfsSaveNoPartition/")

分区储存

val partitionColNames = Array("ds")df.write.partitionBy(partitionColNames:_*).mode(saveMode="errorifexists").parquet("../Data/hdfsSavePartition/")

partitionColNames: Array[String] = Array(ds)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yu50KUcn-1577952595407)(…/Picture/Pic02.jpg)]

同样的,如果对应分区已经存在,也会报错。

读取

非分区读取

val df1New = spark.read.parquet("../Data/hdfsSaveNoPartition")

df1New: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]

df1New.show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+

分区读取

指定某一个固定的分区

spark.read.option("basePath", "../Data/hdfsSavePartition").parquet("../Data/hdfsSavePartition/ds=20190101").show()

+---+----+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+----+---+-------------------+--------+| 1|male| 18|2019-01-01 11:45:50|20190101|+---+----+---+-------------------+--------+

指定多个分区

spark.read.option("basePath", "../Data/hdfsSavePartition").parquet("../Data/hdfsSavePartition/ds=20190101","../Data/hdfsSavePartition/ds=20190102").show()

+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+

指定多分区的简略写法

:*作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:*)就是将1 to 5当作参数序列处理

val partitionPathArray = Array(20190101,20190102).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")

partitionPathArray: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102)

partitionPathArray.mkString("\n")

res20: String =../Data/hdfsSavePartition/ds=20190101../Data/hdfsSavePartition/ds=20190102

spark.read.option("basePath", "../Data/hdfsSavePartition").parquet(partitionPathArray:_*).show()

+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+

如果指定分区中部分缺失

val partitionPathArray1 = Array(20190101,20190102,20191231).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")

partitionPathArray1: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102, ../Data/hdfsSavePartition/ds=20191231)

partitionPathArray1.mkString("\n")

res22: String =../Data/hdfsSavePartition/ds=20190101../Data/hdfsSavePartition/ds=20190102../Data/hdfsSavePartition/ds=20191231

partitionPathArray1.map(x=>HdfsCheckPath.isPathExist(spark,x))

This path(../Data/hdfsSavePartition/ds=20190101) Already exist!This path(../Data/hdfsSavePartition/ds=20190102) Already exist!This path(../Data/hdfsSavePartition/ds=20191231) don't exist!res23: Array[Boolean] = Array(true, true, false)

spark.read.option("basePath", "../Data/hdfsSavePartition").parquet(partitionPathArray1.filter(x=>HdfsCheckPath.isPathExist(spark,x)):_*).show()

This path(../Data/hdfsSavePartition/ds=20190101) Already exist!This path(../Data/hdfsSavePartition/ds=20190102) Already exist!This path(../Data/hdfsSavePartition/ds=20191231) don't exist!+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+

删除文件

删除与文件本身无关,是对整个目录的删除操作

删除非分区数据

第二个参数表示递归删除,即删除当前路径以及其子文件和子文件夹

路径不存在时,返回false

hdfs.delete(new Path("../Data/hdfsSaveNoPartition555/"),true)

res25: Boolean = false

路径存在返回true

hdfs.delete(new Path("../Data/hdfsSaveNoPartition/"),true)

res26: Boolean = true

删除分区数据

删除分区的逻辑比较简单,即把需删除分区的完整路径循环删除即可

查看分区数据

HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")

This path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201

删除指定某个分区

hdfs.delete(new Path("../Data/hdfsSavePartition/ds=20190101"),true)

res28: Boolean = true

HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")

This path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201

删除多个分区

val partitionPathArray2 = Array("20190102","20200101").map(x=>"../Data/hdfsSavePartition"+"/"+x)

partitionPathArray2: Array[String] = Array(../Data/hdfsSavePartition/20190102, ../Data/hdfsSavePartition/20200101)

partitionPathArray2.mkString("\n")

res30: String =../Data/hdfsSavePartition/20190102../Data/hdfsSavePartition/20200101

partitionPathArray.foreach(x=>hdfs.delete(new Path(x),true))

查看当前目录,上述两个分区已经被删除

HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition/","total")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

如上所示,如果有的分区不存在,程序并不报错,可以在代码里加上print语句,作为提示。

hdfs.delete(new Path("../Data/hdfsSavePartition"),true)

res33: Boolean = true

封装

object封装

目的:

封装常用的hdfs操作,方便调用object方式组织,方便函数的内部调用obejct scala2Hdfs

object scala2Hdfs { import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession,Dataset} /** * 获取文件系统对象 * * @param spark * @return */ def getFileSystem(spark: SparkSession): FileSystem = { val hadoopConf = spark.sparkContext.hadoopConfiguration FileSystem.get(hadoopConf) } /** * * @param spark SparkSession * @param path 字符串格式,指定的路径 */ def isPathExist(spark: SparkSession, path: String) = { // 获取hdfs配置信息 val hdfs = getFileSystem(spark) // 设置输入的路径 val outputPath = new Path(path) hdfs.exists(outputPath) // if (hdfs.exists(outputPath)) { // println("This path Already exist!") // } else { // println("This path don't exist!") // } } /** * 判断文件夹是否存在,以及是否为文件夹 * * @param spark SparkSession * @param path 字符串格式,指定的路径 * @return */ def isExistsDirectory(spark: SparkSession, path: String): Boolean = { val hdfs = getFileSystem(spark) val outputPath = new Path(path) if(!(hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory)){ println(s"This path(${path}) don't exist!") } hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory } /** * * @param spark SparkSession * @param path 字符串格式,指定的路径 * @return */ def isExistsFile(spark: SparkSession, path: String): Boolean = { val hdfs = getFileSystem(spark) val outputPath = new Path(path) hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isFile } /** * * @param spark SparkSession * @param path 对应路径 * @param printlevel 打印的级别,枚举:directory、file、total */ def printPathDetail(spark: SparkSession, path: String, printlevel: String = "total"): Unit = { val hdfs = getFileSystem(spark) val isExists = isPathExist(spark, path) // 路径不存在无需继续,上一步中直接print出信息 if (isExists) { println("This path Already exist!") // 如果路径存在,打印出对应的一级目录和文件名 val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path))) if (printlevel == "directory") { println("-----Directory:") allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录 .foreach(println) //打印对应一级目录名 } else if (printlevel == "file") { println("-----File:") allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录 .foreach(println) //打印对应一级目录名 } else if (printlevel == "total") { println("-----Total:") allFiles.foreach(println) } } else { println("This path don't exist!") } } /** * * @param spark SparkSession * @param path 对应路径 * path存在,检查是否有子文件夹,有break,没有delete */ def deleteSinglePath(spark: SparkSession, path: String): Unit = { val hdfs = getFileSystem(spark) val outputPath = new Path(path) // 先判断是否有对应路径,没有直接写,有删除 if (hdfs.exists(outputPath)) { // 判断对应路径下是否有子文件夹 val isDirectoryExists = hdfs.listStatus(outputPath).exists(_.isDirectory) // 没有子文件夹,删除数据 if (!isDirectoryExists) { // 递归删除 hdfs.delete(outputPath, true) println("Clean this path: " + path) } else { // 如果有子文件,防止错删,跳过 println("Contains sub path, Skip clean") } } else { println(s"This path(${path}) don't exist!") } } /** * * @param spark SparkSession * @param path 对应路径,要求最后不用带/ * @param partitionsArray 对应分区列表,格式和表中实际分区字段一致 * @param isPartition 是否分区数据 */ def deletePartitionPath(spark: SparkSession, path: String, partitionsArray: Array[String], isPartition: Boolean = true): Unit = { if (!isPartition) { // 如果非分区,直接删除 deleteSinglePath(spark, path) } else { if (partitionsArray.length == 0) { println("partitionsArray has no items") } else { // 拼接分区path val partitionPathArray = partitionsArray.map(x => path + "/" + s"ds=${x}") // 循环删除分区,如果路径不存在,报错 partitionPathArray.foreach(x => deleteSinglePath(spark, x)) } } } /** * 存储 * * @param df 存储的df * @param path 路径 * @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚 * @param saveType 存储类型 * @param saveMode 模式 */ def saveSinglePath(df: DataFrame, path: String, coalesceNum: Int = 0 , saveType: String = "parquet", saveMode: String = "errorifexists" ): Unit = { var tempDf = df if (coalesceNum >= 1) { tempDf = df.coalesce(coalesceNum) } val write = tempDf.write.mode(saveMode) saveType match { case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path) case "parquet" => write.parquet(path);println("Save this path: "+ path) case _ => println(s"Not Support this savetype:${saveType}") } } /** * * @param dataFrameWithDs 待分区字段的dataframe * @param path 储存的path * @param saveMode 保存方式,默认为append * @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚 * @param partitionColNames 分区列名,array形式 * 得到分区列数据,存储即可 */ def savePartitionPath(dataFrameWithDs: DataFrame, path: String, saveType: String = "parquet" , saveMode: String = "append", coalesceNum: Int = 0 , partitionColNames: Array[String] = Array("ds")): Unit = { var tempDf = dataFrameWithDs if (coalesceNum >= 1) { tempDf = dataFrameWithDs.coalesce(coalesceNum) } val write = tempDf.write.partitionBy(partitionColNames: _*).mode(saveMode) saveType match { case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path) case _ => write.parquet(path);println("Save this path: "+ path) } } /** * 清空并保存 * * @param df 存储的df * @param path 路径 * @param coalesceNum 合并后分区数 * @param saveType 存储类型 * @param saveMode 模式 */ def cleanAndSaveSinglePath(spark: SparkSession, df: DataFrame, path: String, coalesceNum: Int = 0 , saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = { // 先删除对应路径 deleteSinglePath(spark, path) // 再保存数据 saveSinglePath(df, path, coalesceNum, saveType, saveMode) } /** * * @param dataFrameWithDs 存储的df * @param path 路径 * @param partitionsArray 分区list * @param coalesceNum 合并后分区数 * @param partitionColNames 分区列 */ def cleanAndSavePartitionPath(dataFrameWithDs: DataFrame, path: String, saveMode: String = "append" , partitionsArray: Array[String], coalesceNum: Int = 0 , partitionColNames: Array[String] = Array("ds")): Unit = { val spark = dataFrameWithDs.sparkSession // 先删除对应分区的数据 deletePartitionPath(spark, path, partitionsArray) // 保存对应分区的数据 savePartitionPath(dataFrameWithDs = dataFrameWithDs, path = path ,saveMode=saveMode, partitionColNames = partitionColNames) }def readSinglePath(spark: SparkSession, path: String): DataFrame = { if (isExistsDirectory (spark, path) ) { spark.read.parquet(path) }else{ println ("This path don't exist!") // 返回个空数据框,不知道有没有别的方式!! spark.emptyDataFrame }} /** * * @param spark SparkSession * @param path 路径 * @param readType 文件类型,暂只支持parquet * @param partitionsArray 所读的分区 * @return */ def readPartitionPath(spark: SparkSession, path: String, readType: String = "parquet" , partitionsArray: Array[String])={ if(readType!="parquet"){ println(s"Not Support this readType:${readType}") spark.emptyDataFrame }else{ if(partitionsArray.length==0){ println("PartitionsArray is null") spark.emptyDataFrame }else{ val partitionPathArray = partitionsArray.map(x=>path +"/"+ s"ds=${x}") //过滤掉不存在的分区目录 spark.read.option("basePath", path).parquet(partitionPathArray.filter(x=> isExistsDirectory(spark, x)):_*) } }}}

defined object scala2Hdfs

查看操作

查看路径是否存在,是否为文件、为目录

var path ="../Data"

path: String = ../Data

scala2Hdfs.isExistsDirectory(spark,path)

res34: Boolean = true

scala2Hdfs.isPathExist(spark,path)

res35: Boolean = true

scala2Hdfs.isExistsFile(spark,path)

res36: Boolean = false

scala2Hdfs.isExistsFile(spark,"../Data/test.txt")

res37: Boolean = true

scala2Hdfs.isPathExist(spark,"../Data/test.txt")

res38: Boolean = true

打印路径信息

scala2Hdfs.printPathDetail(spark,path,"total")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

scala2Hdfs.printPathDetail(spark,path,"directory")

This path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest

scala2Hdfs.printPathDetail(spark,path,"file")

This path Already exist!-----File:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

保存操作

非分区保存

saveMode="errorifexists"相当于直接新建,再保存。如果路径存在,则会报错。 此时可以采用覆盖写入的方式或者用saveSinglePath方法,先删除再写入

scala2Hdfs.saveSinglePath(df=df1,path="../Data/hdfsSaveNoPartition",saveMode="errorifexists")

Save this path: ../Data/hdfsSaveNoPartition

scala2Hdfs.cleanAndSaveSinglePath(spark=spark,df=df1,path="../Data/hdfsSaveNoPartition",saveMode="overwrite")

Clean this path: ../Data/hdfsSaveNoPartitionSave this path: ../Data/hdfsSaveNoPartition

错误的储存格式

scala2Hdfs.saveSinglePath(df=df,path="../Data/hdfsSaveTest",saveMode="append",saveType="dd")

Not Support this savetype:dd

分区储存

scala2Hdfs.savePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionColNames=Array("ds"))

Save this path: ../Data/hdfsSavePartition

查看分区保存的结果

scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

cleanAndSavePartitionPath,可以先对分区删除,再保存。此时,非20190101分区会有重复数据 常规的应用场景,都是往新分区里查数据,可具体问题具体分析

scala2Hdfs.cleanAndSavePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition" ,partitionsArray=Array("20190101"),partitionColNames=Array("ds"))

Clean this path: ../Data/hdfsSavePartition/ds=20190101Save this path: ../Data/hdfsSavePartition

scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition/ds=20190101")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101/part-00000-5771f2d8-7713-4fc3-9b5d-06df065bd3b8.c000.snappy.parquet

读取操作

非分区数据读取

spark.read.parquet("../Data/hdfsSaveNoPartition").show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+

scala2Hdfs.readSinglePath(spark, "../Data/hdfsSaveNoPartition").show()

+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+

读取分区数据

scala2Hdfs.readPartitionPath(spark=spark, path="../Data/hdfsSavePartition",partitionsArray=Array("20191101","20190101","20190102")).show()

This path(../Data/hdfsSavePartition/ds=20191101) don't exist!+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+

删除操作

删除操作错误路径

scala2Hdfs.deleteSinglePath(spark,"../data1")

This path(../data1) don't exist!

删除非分区数据

scala2Hdfs.deleteSinglePath(spark,"../Data/hdfsSaveNoPartition")

Clean this path: ../Data/hdfsSaveNoPartition

scala2Hdfs.printPathDetail(spark,"../Data/hdfsSaveNoPartition")

This path don't exist!

删除分区数据

scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

scala2Hdfs.deletePartitionPath(spark=spark,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101","20190102","20200101"))

Clean this path: ../Data/hdfsSavePartition/ds=20190101Clean this path: ../Data/hdfsSavePartition/ds=20190102This path(../Data/hdfsSavePartition/ds=20200101) don't exist!

scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS

清空所有数据

hdfs.delete(new Path("../Data/hdfsSavePartition"),true)

res58: Boolean = true

scala2Hdfs.printPathDetail(spark,"../Data")

This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt

2020-01-02 于南京江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微信小程序Page中data数据操作和函数调用详细介绍(微信小程序page中的data)
下一篇:微信小程序Page中data数据操作和函数调用详解(小程序 page)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~