如何使用IDEA开发Spark SQL程序(一文搞懂)

网友投稿 1086 2022-12-20

如何使用IDEA开发Spark SQL程序(一文搞懂)

如何使用IDEA开发Spark SQL程序(一文搞懂)

目录前言Spark SQL是什么1、使用IDEA开发Spark SQL 1.1、指定列名添加Schema1.2、通过StructType指定Schema1.3、反射推断Schema–掌握1.4、花式查询1.5、 相互转化1.6、Spark SQL完成WordCount(案例)1.6.1、SQL风格1.6.2、DQL风格

前言

大家好,我是DJ丶小哪吒,我又来跟你们分享知识了。对软件开发有着浓厚的兴趣。喜欢与人分享知识。做博客的目的就是为了能与 他 人知识共享。由于水平有限。博客中难免会有一些错误。如有 纰漏 之处,欢迎大家在留言区指正。也会及时改正。

DJ丶小哪吒又来与各位分享知识了。今天我们不飙车,今天就静静的坐下来,我们来聊一聊关于sparkSQL。准备好茶水,听老朽与你娓娓道来。

Spark SQL是什么

Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

1、使用IDEA开发Spark SQL

Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:

第1种:指定列名添加Schema

第2种:通过StructType指定Schema

第3种:编写样例类,利用反射机制推断Schema

1.1、指定列名添加Schema

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDFDS {

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")

val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))

val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))

//3.将RDD转成DF

//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换

import spark.implicits._

val personDF: DataFrame = rowRDD.toDF("id","name","age")

personDF.show(10)

personDF.printSchema()

sc.stop()

spark.stop()

}

}

1.2、通过StructType指定Schema

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.types._

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object CreateDFDS2 {

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")

val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))

val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))

//3.将RDD转成DF

//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换

//import spark.implicits._

val schema: StructType = StructType(Seq(

StructField("id", IntegerType, true),//允许为空

StructField("name", StringType, true),

StructField("age", IntegerType, true))

)

val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)

personDF.show(10)

personDF.printSchema()

sc.stop()

spark.stop()

}

}

1.3、反射推断Schema–掌握

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDFDS3 {

case class Person(id:Int,name:String,age:Int)

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")

.getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")

val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))

val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))

//3.将RDD转成DF

//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换

import spark.implicits._

//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息

//所以SparkSQL可以通过反射自动获取到并添加给DF

val personDF: DataFrame = rowRDD.toDF

personDF.show(10)

personDF.printSchema()

sc.stop()

spark.stop()

}

}

1.4、花式查询

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SparkSession}

object QueryDemo {

case class Person(id:Int,name:String,age:Int)

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")

.getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")

val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))

val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))

//3.将RDD转成DF

//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换

import spark.implicits._

//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息

//所以SparkSQL可以通过反射自动获取到并添加给DF

val personDF: DataFrame = rowRDD.toDF

personDF.show(10)

personDF.printSchema()

//=======================SQL方式查询=======================

//0.注册表

http://personDF.createOrReplaceTempView("t_person")

//1.查询所有数据

spark.sql("select * from t_person").show()

//2.查询age+1

spark.sql("select age,age+1 from t_person").show()

//3.查询age最大的两人

spark.sql("select name,age from t_person order by age desc limit 2").show()

//4.查询各个年龄的人数

spark.sql("select age,count(*) from t_person group by age").show()

//5.查询年龄大于30的

sparhttp://k.sql("select * from t_person where age > 30").show()

//=======================DSL方式查询=======================

//1.查询所有数据

personDF.select("name","age")

//2.查询age+1

personDF.select($"name",$"age" + 1)

//3.查询age最大的两人

personDF.sort($"age".desc).show(2)

//4.查询各个年龄的人数

personDF.groupBy("age").count().show()

//5.查询年龄大于30的

personDF.filter($"age" > 30).show()

sc.stop()

spark.stop()

}

}

1.5、 相互转化

RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:

1)使用RDD算子操作

2)使用DSL/SQL对表操作

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object TransformDemo {

case class Person(id:Int,name:String,age:Int)

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")

val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))

val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))

//3.将RDD转成DF

//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换

import spark.implicits._

//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息

//所以SparkSQL可以通过反射自动获取到并添加给DF

//=========================相互转换======================

//1.RDD-->DF

val personDF: DataFrame = personRDD.toDF

//2.DF-->RDD

val rdd: RDD[Row] = personDF.rdd

//3.RDD-->DS

val DS: Dataset[Person] = personRDD.toDS()

//4.DS-->RDD

val rdd2: RDD[Person] = DS.rdd

//5.DF-->DS

val DS2: Dataset[Person] = personDF.as[Person]

//6.DS-->DF

val DF: DataFrame = DS2.toDF()

sc.stop()

spark.stop()

}

}

1.6、Spark SQL完成WordCount(案例)

1.6.1、SQL风格

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object WordCount {

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")

val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")

//fileDF.show()

//fileDS.show()

//3.对每一行按照空格进行切分并压平

//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String

import spark.implicits._

val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String

//wordDS.show()

/*

+-----+

|value|

+-----+

|hello|

| me|

|hello|

| you|

...

*/

//4.对上面的数据进行WordCount

wordDS.createOrReplaceTempView("t_word")

val sql =

"""

|select value ,count(value) as count

|from t_word

|group by value

|order by count desc

""".stripMargin

spark.sql(sql).show()

sc.stop()

spark.stop()

}

}

1.6.2、DQL风格

package cn.itcast.sql

import org.apache.spark.SparkContext

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object WordCount2 {

def main(args: Array[String]): Unit = {

//1.创建SparkSession

val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

val sc: SparkContext = spark.sparkContext

sc.setLogLevel("WARN")

//2.读取文件

val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")

val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")

//fileDF.show()

//fileDS.show()

//3.对每一行按照空格进行切分并压平

//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String

import spark.implicits._

val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String

//wordDS.show()

/*

+-----+

|value|

+-----+

|hello|

| me|

|hello|

| you|

...

*/

//4.对上面的数据进行WordCount

wordDS.groupBy("value").count().orderBy($"count".desc).show()

sc.stop()

spark.stop()

}

}

好了,以上内容就到这里了。你学到了吗。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:一体化政务服务平台培训(自助政务一体机培训)
下一篇:一体化政务服务平台排名(国家政务一体化服务平台)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~