IDEA 开发配置SparkSQL及简单使用案例代码

网友投稿 1039 2022-12-20

IDEA 开发配置SparkSQL及简单使用案例代码

IDEA 开发配置SparkSQL及简单使用案例代码

1.添加依赖

在idea项目的pom.xml中添加依赖。

org.apache.spark

spark-sql_2.12

3.0.0

2.案例代码

package com.zf.bigdata.spark.sql

import orgMALboqgKfb.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Spark01_SparkSql_Basic {

def main(args: Array[String]): Unit = {

//创建上下文环境配置对象

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")

//创建 SparkSession 对象

val spark = SparkSession.builder().config(sparkConf).getOrCreate()

// DataFrame

val df: DataFrame = spark.read.json("datas/user.json")

//df.show()

// DataFrame => Sql

//df.createOrReplaceTempView("user")

//spark.sql("select * from user").show()

//spark.sql("select age from user").show()

//spark.sql("select avg(age) from user").show()

//DataFrame => Dsl

//如果涉及到转换操作,转换需要引入隐式转换规则,否则无法转换,比如使用$提取数据的值

//spark 不是包名,是上下文环境对象名

import spark.implicits._

//df.select("age","username").show()

//df.select($"age"+1).show()

//df.select('age+1).show()

// DataSet

//val seq = Seq(1,2,3,4)

//val ds: Dataset[Int] = seq.toDS()

// ds.show()

// RDD <=> DataFrame

val rdd = spark.sparkContext.makeRDD(List((1,"张三",10),(2,"李四",20)))

val df1: DataFrame = rdd.toDF("id", "name", "age")

val rdd1: RDD[Row] = df1.rdd

// DataFrame <=> DataSet

val ds: Dataset[User] = df1.as[User]

val df2: DataFrame = ds.toDF()

// RDD <=> DataSet

val ds1: Dataset[User] = rdd.map {

case (id, name, age) => {

User(id, name = name, age = age)

}

}.toDS()

val rdd2: RDD[User] = ds1.rdd

spark.stop()

}

case class User(id:Int,name:String,age:Int)

}

PS:下面看下在IDEA中开发Spark SQL程序

IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:

org.apache.spark

spark-sql_2.11

2.1.1

一、指定Schema格式

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

object Demo1 {

def main(args: Array[String]): Unit = {

//使用Spark Session 创建表

val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()

//从指定地址创建RDD

val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))

//通过StructType声明Schema

val schema = StructType(

List(

StructField("id", IntegerType),

StructField("name", StringType),

StructField("age", IntegerType)))

//把RDD映射到rowRDD

val rowRDD = personRDD.map(p=>Row(p(0).toInt,p(1),p(2).toInt))

val personDF = spark.createDataFrame(rowRDD, schema)

//注册表

personDF.createOrReplaceTempView("t_person")

//执行SQL

val df = spark.sql("select * from t_person order by age desc limit 4")

df.show()

spark.stop()

}

}

二、使用case class

import org.apache.spark.sql.SparkSession

//使用case class

object Demo2 {

def main(args: Array[String]): Unit = {

//创建SparkSession

val spark = SparkSession.buildMALboqgKfber().master("local").appName("CaseClassDemo").getOrCreate()

//从指定的文件中读取数据,生成对应的RDD

val lineRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))

//将RDD和case class 关联

val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt))

//生成 DataFrame,通过RDD 生成DF,导入隐式转换

import spark.sqlContext.implicits._

val studentDF = studentRDD.toDF

//注册表 视图

studentDF.createOrReplaceTempView("student")

//执行SQL

spark.sql("select * from student").show()

spark.stop()

}

}

//case class 一定放在外面

case class Student(stuID:Int,stuName:String,stuAge:Int)

三、把数据保存到数据库

import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.Row

import java.util.Properties

object Demo3 {

def main(args: Array[String]): Unit = {

//使用Spark Session 创建表

val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()

//从指定地址创建RDD

val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))

//通过StructType声明Schema

val schema = StructType(

List(

StructField("id", IntegerType),

StructField("name", StringType),

StructField("age", IntegerType)))

//把RDD映射到rowRDD

val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))

val personDF = spark.createDataFrame(rowRDD, schema)

//注册表

personDF.createOrReplaceTempView("person")

//执行SQL

val df = spark.sql("select * from person ")

//查看SqL内容

//df.show()

//将结果保存到mysql中

val props = new Properties()

props.setProperty("user", "root")

props.setProperty("password", "123456")

props.setProperty("driver", "com.mysql.jdbc.Driver")

df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)

spark.close()

}

}

以上内容转自:

https://blog.csdn-/weixin_43520450/article/details/106093582

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:一体化政务服务平台考试(统一政务服务)
下一篇:一体化政务服务平台考核(政务一体化服务平台解决方案)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~