app开发者平台在数字化时代的重要性与发展趋势解析
635
2022-10-08
Scala102-操作Hdfs
Info
先生成DataFrame,再把数据储存在HDFS上。
import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.ml.feature.VectorAssemblerimport org.apache.spark.ml.linalg.{Vector, Vectors}import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Intitializing Scala interpreter ...SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1577952043881)SparkSession available as 'spark'import org.apache.spark.sql.functions._import spark.implicits._import org.apache.spark.ml.feature.VectorAssemblerimport org.apache.spark.ml.linalg.{Vector, Vectors}import org.apache.spark.sql.{DataFrame, Row, SparkSession}
val builder = SparkSession .builder() .appName("learningScala") .config("spark.executor.heartbeatInterval","60s") .config("spark-work.timeout","120s") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max","512m") .config("spark.dynamicAllocation.enabled", false) .config("spark.sql.inMemoryColumnarStorage.compressed", true) .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000) .config("spark.sql.broadcastTimeout", 600) .config("spark.sql.autoBroadcastJoinThreshold", -1) .config("spark.sql.crossJoin.enabled", true) .master("local[*]") val spark = builder.appName("OperateHdfs").getOrCreate()spark.sparkContext.setLogLevel("ERROR")
builder: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@679eb75aspark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@551aa019
val df1 = Seq( (1, "male", "18" ,"2019-01-01 11:45:50"), (2, "female", "37" ,"2019-01-02 11:55:50"), (3, "male", "21" ,"2019-01-21 11:45:50"), (4, "female", "44" ,"2019-02-01 12:45:50"), (5, "male", "39" ,"2019-01-15 10:40:50") ).toDF("id","sex","age", "createtime_str")
df1: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
val df=df1.withColumn("ds",date_format($"createtime_str","yyyyMMdd"))
df: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 3 more fields]
df.show()
+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 1| male| 18|2019-01-01 11:45:50|20190101|| 2|female| 37|2019-01-02 11:55:50|20190102|| 3| male| 21|2019-01-21 11:45:50|20190121|| 4|female| 44|2019-02-01 12:45:50|20190201|| 5| male| 39|2019-01-15 10:40:50|20190115|+---+------+---+-------------------+--------+
查看hdfs文件
是否存在对应目录
import相关方法
import org.apache.hadoop.fs.{FileSystem, Path,FileStatus,FileUtil}
import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil}
获取配置信息
var path="../Data"val hadoopConf = spark.sparkContext.hadoopConfiguration
path: String = ../DatahadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, __spark_hadoop_conf__.xml
val hdfs = FileSystem.get(hadoopConf)
hdfs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem@25866b28
设置路径
val outputPath = new Path("../Data")
outputPath: org.apache.hadoop.fs.Path = ../Data
hdfs上是否存在这个路径
hdfs.exists(outputPath)
res2: Boolean = true
hdfs上是否存在这个文件
hdfs.exists(new Path("../Data/test.txt"))
res3: Boolean = true
判断该path是否为文件夹?
hdfs.getFileStatus(outputPath).isDirectory()
res4: Boolean = true
判断该path是否为文件?
hdfs.getFileStatus(outputPath).isFile()
res5: Boolean = false
hdfs.getFileStatus(new Path("../Data/test.txt")).isFile()
res6: Boolean = true
获取路径下所有文件
val allFiles = FileUtil.stat2Paths(hdfs.listStatus(outputPath))
allFiles: Array[org.apache.hadoop.fs.Path] = Array(file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csv, file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt)
打印一级目录名和文件名
allFiles.foreach(println)
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
打印一级目录名
逻辑:
获取路径下所有文件循环遍历,判断每一个文件是否为Directory是则打印出来
allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
打印对应路径下文件
allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录.foreach(println)//打印对应一级目录名
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
封装成Object
object HdfsCheckPath { import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil} import org.apache.spark.sql.{SparkSession} /** * * @param spark SparkSession * @param path 字符串格式,指定的路径 */ def isPathExist(spark: SparkSession, path: String) = { // 获取hdfs配置信息 val hadoopConf = spark.sparkContext.hadoopConfiguration val hdfs = FileSystem.get(hadoopConf) // 设置输入的路径 val outputPath = new Path(path) if (hdfs.exists(outputPath)) { println(s"This path(${path}) Already exist!") } else { println(s"This path(${path}) don't exist!") } hdfs.exists(outputPath) } /** * * @param spark SparkSession * @param path 对应路径 * @param printlevel 打印的级别,枚举:directory、file、total */ def printPathDetail(spark: SparkSession, path: String, printlevel: String="total"): Unit = { val hadoopConf = spark.sparkContext.hadoopConfiguration val hdfs = FileSystem.get(hadoopConf) val isExists = hdfs.exists(new Path(path)) // 路径不存在无需继续,上一步中直接print出信息 if (isExists) { println("This path Already exist!") // 如果路径存在,打印出对应的一级目录和文件名 val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path))) if (printlevel == "directory") { println("-----Directory:") allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录 .foreach(println) //打印对应一级目录名 } else if (printlevel == "file") { println("-----File:") allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录 .foreach(println)//打印对应一级目录名 } else if (printlevel == "total") { println("-----Total:") allFiles.foreach(println) } }else{ println("This path don't exist!") } }}
defined object HdfsCheckPath
HdfsCheckPath.isPathExist(spark,"../Data")
This path(../Data) Already exist!res10: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data","total")
This path Already exist!-----Total:
HdfsCheckPath.printPathDetail(spark,"../Data","directory")
file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txtThis path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
HdfsCheckPath.printPathDetail(spark,"../Data","file")
This path Already exist!-----File:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
储存文件
储存暂时只介绍parquet格式
非分区储存
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
mode参数 Specifies the behavior when data or table already exists. Options include:
overwrite: overwrite the existing data//覆盖写入append: append the data//追加写入ignore: ignore the operation (i.e. no-op)//忽略写入操作?那搞这个干啥?error or errorifexists: default option, throw an exception at runtime//如果写入目录存在则报错
一般选择errorifexists,防止错误地把数据插入在已经存在的目录。参数saveMode的值可以是String或者SaveMode(需要import spark.sql.SaveMode)
df1.write.mode(saveMode="errorifexists").parquet(path="../Data/hdfsSaveNoPartition/")
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3zNCZ30J-1577952595406)(…/Picture/Pic01.jpg)]
df1.write.mode(saveMode=SaveMode.Overwrite).parquet("../Data/hdfsSaveNoPartition/")
分区储存
val partitionColNames = Array("ds")df.write.partitionBy(partitionColNames:_*).mode(saveMode="errorifexists").parquet("../Data/hdfsSavePartition/")
partitionColNames: Array[String] = Array(ds)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yu50KUcn-1577952595407)(…/Picture/Pic02.jpg)]
同样的,如果对应分区已经存在,也会报错。
读取
非分区读取
val df1New = spark.read.parquet("../Data/hdfsSaveNoPartition")
df1New: org.apache.spark.sql.DataFrame = [id: int, sex: string ... 2 more fields]
df1New.show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+
分区读取
指定某一个固定的分区
spark.read.option("basePath", "../Data/hdfsSavePartition").parquet("../Data/hdfsSavePartition/ds=20190101").show()
+---+----+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+----+---+-------------------+--------+| 1|male| 18|2019-01-01 11:45:50|20190101|+---+----+---+-------------------+--------+
指定多个分区
spark.read.option("basePath", "../Data/hdfsSavePartition").parquet("../Data/hdfsSavePartition/ds=20190101","../Data/hdfsSavePartition/ds=20190102").show()
+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+
指定多分区的简略写法
:*作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:*)就是将1 to 5当作参数序列处理
val partitionPathArray = Array(20190101,20190102).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102)
partitionPathArray.mkString("\n")
res20: String =../Data/hdfsSavePartition/ds=20190101../Data/hdfsSavePartition/ds=20190102
spark.read.option("basePath", "../Data/hdfsSavePartition").parquet(partitionPathArray:_*).show()
+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+
如果指定分区中部分缺失
val partitionPathArray1 = Array(20190101,20190102,20191231).map(x=>"../Data/hdfsSavePartition/" + s"ds=${x}")
partitionPathArray1: Array[String] = Array(../Data/hdfsSavePartition/ds=20190101, ../Data/hdfsSavePartition/ds=20190102, ../Data/hdfsSavePartition/ds=20191231)
partitionPathArray1.mkString("\n")
res22: String =../Data/hdfsSavePartition/ds=20190101../Data/hdfsSavePartition/ds=20190102../Data/hdfsSavePartition/ds=20191231
partitionPathArray1.map(x=>HdfsCheckPath.isPathExist(spark,x))
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!This path(../Data/hdfsSavePartition/ds=20190102) Already exist!This path(../Data/hdfsSavePartition/ds=20191231) don't exist!res23: Array[Boolean] = Array(true, true, false)
spark.read.option("basePath", "../Data/hdfsSavePartition").parquet(partitionPathArray1.filter(x=>HdfsCheckPath.isPathExist(spark,x)):_*).show()
This path(../Data/hdfsSavePartition/ds=20190101) Already exist!This path(../Data/hdfsSavePartition/ds=20190102) Already exist!This path(../Data/hdfsSavePartition/ds=20191231) don't exist!+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+
删除文件
删除与文件本身无关,是对整个目录的删除操作
删除非分区数据
第二个参数表示递归删除,即删除当前路径以及其子文件和子文件夹
路径不存在时,返回false
hdfs.delete(new Path("../Data/hdfsSaveNoPartition555/"),true)
res25: Boolean = false
路径存在返回true
hdfs.delete(new Path("../Data/hdfsSaveNoPartition/"),true)
res26: Boolean = true
删除分区数据
删除分区的逻辑比较简单,即把需删除分区的完整路径循环删除即可
查看分区数据
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
删除指定某个分区
hdfs.delete(new Path("../Data/hdfsSavePartition/ds=20190101"),true)
res28: Boolean = true
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition","directory")
This path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201
删除多个分区
val partitionPathArray2 = Array("20190102","20200101").map(x=>"../Data/hdfsSavePartition"+"/"+x)
partitionPathArray2: Array[String] = Array(../Data/hdfsSavePartition/20190102, ../Data/hdfsSavePartition/20200101)
partitionPathArray2.mkString("\n")
res30: String =../Data/hdfsSavePartition/20190102../Data/hdfsSavePartition/20200101
partitionPathArray.foreach(x=>hdfs.delete(new Path(x),true))
查看当前目录,上述两个分区已经被删除
HdfsCheckPath.printPathDetail(spark,"../Data/hdfsSavePartition/","total")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
如上所示,如果有的分区不存在,程序并不报错,可以在代码里加上print语句,作为提示。
hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res33: Boolean = true
封装
object封装
目的:
封装常用的hdfs操作,方便调用object方式组织,方便函数的内部调用obejct scala2Hdfs
object scala2Hdfs { import org.apache.hadoop.fs.{FileSystem, Path, FileStatus, FileUtil} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession,Dataset} /** * 获取文件系统对象 * * @param spark * @return */ def getFileSystem(spark: SparkSession): FileSystem = { val hadoopConf = spark.sparkContext.hadoopConfiguration FileSystem.get(hadoopConf) } /** * * @param spark SparkSession * @param path 字符串格式,指定的路径 */ def isPathExist(spark: SparkSession, path: String) = { // 获取hdfs配置信息 val hdfs = getFileSystem(spark) // 设置输入的路径 val outputPath = new Path(path) hdfs.exists(outputPath) // if (hdfs.exists(outputPath)) { // println("This path Already exist!") // } else { // println("This path don't exist!") // } } /** * 判断文件夹是否存在,以及是否为文件夹 * * @param spark SparkSession * @param path 字符串格式,指定的路径 * @return */ def isExistsDirectory(spark: SparkSession, path: String): Boolean = { val hdfs = getFileSystem(spark) val outputPath = new Path(path) if(!(hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory)){ println(s"This path(${path}) don't exist!") } hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isDirectory } /** * * @param spark SparkSession * @param path 字符串格式,指定的路径 * @return */ def isExistsFile(spark: SparkSession, path: String): Boolean = { val hdfs = getFileSystem(spark) val outputPath = new Path(path) hdfs.exists(outputPath) && hdfs.getFileStatus(outputPath).isFile } /** * * @param spark SparkSession * @param path 对应路径 * @param printlevel 打印的级别,枚举:directory、file、total */ def printPathDetail(spark: SparkSession, path: String, printlevel: String = "total"): Unit = { val hdfs = getFileSystem(spark) val isExists = isPathExist(spark, path) // 路径不存在无需继续,上一步中直接print出信息 if (isExists) { println("This path Already exist!") // 如果路径存在,打印出对应的一级目录和文件名 val allFiles = FileUtil.stat2Paths(hdfs.listStatus(new Path(path))) if (printlevel == "directory") { println("-----Directory:") allFiles.filter(hdfs.getFileStatus(_).isDirectory()) //判断哪些是目录 .foreach(println) //打印对应一级目录名 } else if (printlevel == "file") { println("-----File:") allFiles.filter(hdfs.getFileStatus(_).isFile()) //判断哪些是目录 .foreach(println) //打印对应一级目录名 } else if (printlevel == "total") { println("-----Total:") allFiles.foreach(println) } } else { println("This path don't exist!") } } /** * * @param spark SparkSession * @param path 对应路径 * path存在,检查是否有子文件夹,有break,没有delete */ def deleteSinglePath(spark: SparkSession, path: String): Unit = { val hdfs = getFileSystem(spark) val outputPath = new Path(path) // 先判断是否有对应路径,没有直接写,有删除 if (hdfs.exists(outputPath)) { // 判断对应路径下是否有子文件夹 val isDirectoryExists = hdfs.listStatus(outputPath).exists(_.isDirectory) // 没有子文件夹,删除数据 if (!isDirectoryExists) { // 递归删除 hdfs.delete(outputPath, true) println("Clean this path: " + path) } else { // 如果有子文件,防止错删,跳过 println("Contains sub path, Skip clean") } } else { println(s"This path(${path}) don't exist!") } } /** * * @param spark SparkSession * @param path 对应路径,要求最后不用带/ * @param partitionsArray 对应分区列表,格式和表中实际分区字段一致 * @param isPartition 是否分区数据 */ def deletePartitionPath(spark: SparkSession, path: String, partitionsArray: Array[String], isPartition: Boolean = true): Unit = { if (!isPartition) { // 如果非分区,直接删除 deleteSinglePath(spark, path) } else { if (partitionsArray.length == 0) { println("partitionsArray has no items") } else { // 拼接分区path val partitionPathArray = partitionsArray.map(x => path + "/" + s"ds=${x}") // 循环删除分区,如果路径不存在,报错 partitionPathArray.foreach(x => deleteSinglePath(spark, x)) } } } /** * 存储 * * @param df 存储的df * @param path 路径 * @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚 * @param saveType 存储类型 * @param saveMode 模式 */ def saveSinglePath(df: DataFrame, path: String, coalesceNum: Int = 0 , saveType: String = "parquet", saveMode: String = "errorifexists" ): Unit = { var tempDf = df if (coalesceNum >= 1) { tempDf = df.coalesce(coalesceNum) } val write = tempDf.write.mode(saveMode) saveType match { case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path) case "parquet" => write.parquet(path);println("Save this path: "+ path) case _ => println(s"Not Support this savetype:${saveType}") } } /** * * @param dataFrameWithDs 待分区字段的dataframe * @param path 储存的path * @param saveMode 保存方式,默认为append * @param coalesceNum 合并后分区数,为了提高关联的效率,具体用法暂不清楚 * @param partitionColNames 分区列名,array形式 * 得到分区列数据,存储即可 */ def savePartitionPath(dataFrameWithDs: DataFrame, path: String, saveType: String = "parquet" , saveMode: String = "append", coalesceNum: Int = 0 , partitionColNames: Array[String] = Array("ds")): Unit = { var tempDf = dataFrameWithDs if (coalesceNum >= 1) { tempDf = dataFrameWithDs.coalesce(coalesceNum) } val write = tempDf.write.partitionBy(partitionColNames: _*).mode(saveMode) saveType match { case "csv" => write.option("header", "true").csv(path);println("Save this path: "+ path) case _ => write.parquet(path);println("Save this path: "+ path) } } /** * 清空并保存 * * @param df 存储的df * @param path 路径 * @param coalesceNum 合并后分区数 * @param saveType 存储类型 * @param saveMode 模式 */ def cleanAndSaveSinglePath(spark: SparkSession, df: DataFrame, path: String, coalesceNum: Int = 0 , saveType: String = "parquet", saveMode: String = "errorifexists"): Unit = { // 先删除对应路径 deleteSinglePath(spark, path) // 再保存数据 saveSinglePath(df, path, coalesceNum, saveType, saveMode) } /** * * @param dataFrameWithDs 存储的df * @param path 路径 * @param partitionsArray 分区list * @param coalesceNum 合并后分区数 * @param partitionColNames 分区列 */ def cleanAndSavePartitionPath(dataFrameWithDs: DataFrame, path: String, saveMode: String = "append" , partitionsArray: Array[String], coalesceNum: Int = 0 , partitionColNames: Array[String] = Array("ds")): Unit = { val spark = dataFrameWithDs.sparkSession // 先删除对应分区的数据 deletePartitionPath(spark, path, partitionsArray) // 保存对应分区的数据 savePartitionPath(dataFrameWithDs = dataFrameWithDs, path = path ,saveMode=saveMode, partitionColNames = partitionColNames) }def readSinglePath(spark: SparkSession, path: String): DataFrame = { if (isExistsDirectory (spark, path) ) { spark.read.parquet(path) }else{ println ("This path don't exist!") // 返回个空数据框,不知道有没有别的方式!! spark.emptyDataFrame }} /** * * @param spark SparkSession * @param path 路径 * @param readType 文件类型,暂只支持parquet * @param partitionsArray 所读的分区 * @return */ def readPartitionPath(spark: SparkSession, path: String, readType: String = "parquet" , partitionsArray: Array[String])={ if(readType!="parquet"){ println(s"Not Support this readType:${readType}") spark.emptyDataFrame }else{ if(partitionsArray.length==0){ println("PartitionsArray is null") spark.emptyDataFrame }else{ val partitionPathArray = partitionsArray.map(x=>path +"/"+ s"ds=${x}") //过滤掉不存在的分区目录 spark.read.option("basePath", path).parquet(partitionPathArray.filter(x=> isExistsDirectory(spark, x)):_*) } }}}
defined object scala2Hdfs
查看操作
查看路径是否存在,是否为文件、为目录
var path ="../Data"
path: String = ../Data
scala2Hdfs.isExistsDirectory(spark,path)
res34: Boolean = true
scala2Hdfs.isPathExist(spark,path)
res35: Boolean = true
scala2Hdfs.isExistsFile(spark,path)
res36: Boolean = false
scala2Hdfs.isExistsFile(spark,"../Data/test.txt")
res37: Boolean = true
scala2Hdfs.isPathExist(spark,"../Data/test.txt")
res38: Boolean = true
打印路径信息
scala2Hdfs.printPathDetail(spark,path,"total")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
scala2Hdfs.printPathDetail(spark,path,"directory")
This path Already exist!-----Directory:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTest
scala2Hdfs.printPathDetail(spark,path,"file")
This path Already exist!-----File:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
保存操作
非分区保存
saveMode="errorifexists"相当于直接新建,再保存。如果路径存在,则会报错。 此时可以采用覆盖写入的方式或者用saveSinglePath方法,先删除再写入
scala2Hdfs.saveSinglePath(df=df1,path="../Data/hdfsSaveNoPartition",saveMode="errorifexists")
Save this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.cleanAndSaveSinglePath(spark=spark,df=df1,path="../Data/hdfsSaveNoPartition",saveMode="overwrite")
Clean this path: ../Data/hdfsSaveNoPartitionSave this path: ../Data/hdfsSaveNoPartition
错误的储存格式
scala2Hdfs.saveSinglePath(df=df,path="../Data/hdfsSaveTest",saveMode="append",saveType="dd")
Not Support this savetype:dd
分区储存
scala2Hdfs.savePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition",partitionColNames=Array("ds"))
Save this path: ../Data/hdfsSavePartition
查看分区保存的结果
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
cleanAndSavePartitionPath,可以先对分区删除,再保存。此时,非20190101分区会有重复数据 常规的应用场景,都是往新分区里查数据,可具体问题具体分析
scala2Hdfs.cleanAndSavePartitionPath(dataFrameWithDs=df,path="../Data/hdfsSavePartition" ,partitionsArray=Array("20190101"),partitionColNames=Array("ds"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101Save this path: ../Data/hdfsSavePartition
scala2Hdfs.printPathDetail(spark=spark,path="../Data/hdfsSavePartition/ds=20190101")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101/part-00000-5771f2d8-7713-4fc3-9b5d-06df065bd3b8.c000.snappy.parquet
读取操作
非分区数据读取
spark.read.parquet("../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+
scala2Hdfs.readSinglePath(spark, "../Data/hdfsSaveNoPartition").show()
+---+------+---+-------------------+| id| sex|age| createtime_str|+---+------+---+-------------------+| 2|female| 37|2019-01-02 11:55:50|| 4|female| 44|2019-02-01 12:45:50|| 1| male| 18|2019-01-01 11:45:50|| 3| male| 21|2019-01-21 11:45:50|| 5| male| 39|2019-01-15 10:40:50|+---+------+---+-------------------+
读取分区数据
scala2Hdfs.readPartitionPath(spark=spark, path="../Data/hdfsSavePartition",partitionsArray=Array("20191101","20190101","20190102")).show()
This path(../Data/hdfsSavePartition/ds=20191101) don't exist!+---+------+---+-------------------+--------+| id| sex|age| createtime_str| ds|+---+------+---+-------------------+--------+| 2|female| 37|2019-01-02 11:55:50|20190102|| 2|female| 37|2019-01-02 11:55:50|20190102|| 1| male| 18|2019-01-01 11:45:50|20190101|+---+------+---+-------------------+--------+
删除操作
删除操作错误路径
scala2Hdfs.deleteSinglePath(spark,"../data1")
This path(../data1) don't exist!
删除非分区数据
scala2Hdfs.deleteSinglePath(spark,"../Data/hdfsSaveNoPartition")
Clean this path: ../Data/hdfsSaveNoPartition
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSaveNoPartition")
This path don't exist!
删除分区数据
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190101file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190102file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
scala2Hdfs.deletePartitionPath(spark=spark,path="../Data/hdfsSavePartition",partitionsArray=Array("20190101","20190102","20200101"))
Clean this path: ../Data/hdfsSavePartition/ds=20190101Clean this path: ../Data/hdfsSavePartition/ds=20190102This path(../Data/hdfsSavePartition/ds=20200101) don't exist!
scala2Hdfs.printPathDetail(spark,"../Data/hdfsSavePartition")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190115file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190121file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/ds=20190201file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsSavePartition/_SUCCESS
清空所有数据
hdfs.delete(new Path("../Data/hdfsSavePartition"),true)
res58: Boolean = true
scala2Hdfs.printPathDetail(spark,"../Data")
This path Already exist!-----Total:file:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/hdfsTestfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.csvfile:/D:/ThereIsNoEndToLearning/Git01-LifeIsLimited/A02-Learning_Scala/ScalaNote/Hdfs/Data/test.txt
2020-01-02 于南京江宁区九龙湖
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~