SparkSQl简介及运行原理

网友投稿 768 2022-12-20

SparkSQl简介及运行原理

SparkSQl简介及运行原理

目录一:什么是SparkSQL?(一)SparkSQL简介(二)SparkSQL运行原理(三)SparkSQL特点二:DataFrame(一)什么是DataFrame?补充:Spark中的RDD、DataFrame和DataSet讲解(一)Spark中的模块(二)RDD和DataFrame的区别三:SparkSession(一)SparkSession简介(二)SparkSession实质(三)SparkSession特点四:通过RDD创建DataFrame(一)通过样本类创建(反射)(二)通过SparkSession创建DataFrame(三)通过 json 文件创建DataFrames五:临时视图(一)什么是视图(二)类型(三)创建视图(四)视图查询(五)会话周期六:DataFrame的read和save和savemode(一)数据读取(二)数据保存(三)数据保存模式七:数据集DataSet(一)创建和使用DataSet---使用序列(二)创建和使用DataSet---通过case class作为编码器,将DataFrame转换成DataSet(三)创建和使用DataSet---读取HDFS数据文件

一:什么是SparkSQL?

(一)SparkSQL简介

Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。

Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。

(二)SparkSQL运行原理

将Spark SQL转化为RDD,然后提交到集群执行。

(三)SparkSQL特点

(1)容易整合,Spark SQL已经集成在Spark中

(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquhttp://et等都是使用统一的方式进行访问

(3)兼容 Hive

(4)标准的数据连接:JDBC、ODBC

二:DataFrame

(一)什么是DataFrame?

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame是组织成命名列的数据集。

它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。

关系型数据库中的表由表结构和数据组成,而DataFrame也类似,由schema(结构)和数据组成,其数据集是RDD。

DataFrame可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

补充:Spark中的RDD、DataFrame和DataSet讲解

(一)Spark中的模块

上图展示了Spark的模块及各模块之间的关系:

底层是Spark-core核心模块,Spark每个模块都有一个核心抽象,Spark-core的核心抽象是RDD,

Spark SQL等都基于RDD封装了自己的抽象,在Spark SQL中是DataFrame/DataSet。

相对来说RDD是更偏底层的抽象,DataFrame/DataSet是在其上做了一层封装,做了优化,使用起来更加方便。

从功能上来说,DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做。

(二)RDD和DataFrame的区别

DataFrame与RDD的主要区别在于:

DataFrame

DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。

RDD

RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame和RDD联系:

DataFrame底层是以RDD为基础的分布式数据集,和RDD的主要区别的是:RDD中没有schema信息,而DataFrame中数据每一行都包含schema

DataFrame = RDD[Row] + shcema

三:SparkSession

(一)SparkSession简介

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。

对于每个其他的API,我们需要使用不同的context。

例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。

但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。

SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

(二)SparkSession实质

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

(三)SparkSession特点

----为用户提供一个统一的切入点使用Spark各项功能

----允许用户通过它调用DataFrame和Dataset相关 API来编写程序

----减少了用户需要了解的一些概念,可以很容易的与Spark进行交互

----与Spark交互之时不需要显示的创建SparkConf, SparkContext以及 SQlContext,这些对象已经封闭在SparkSession中

四:通过RDD创建DataFrame

(一)通过样本类创建(反射)

case class People(val name:String,val age:Int) //可以声明数据类型

object WordCount {

def main(args:Array[String]):Unit={

val conf = new SparkConf()

//设置运行模式为本地运行,不然默认是集群模式

//conf.setMaster("local") //默认是集群模式

//设置任务名

conf.setAppName("WordCount").setMaster("local")

conf.set("spark.default.parallelism","5")

//设置SparkContext,是SparkCore的程序入口

val sc = new SparkContext(conf)

val Sqlsc = new SQLContext(sc) //根据SparkContext生成SQLContext

val array = Array("mark,14","kitty,23","dasi,45")

val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD

People(line.split(",")(0),line.split(",")(1).trim().toInt)

})

import Sqlsc.implicits._ //引入全部方法

//将RDD转换成DataFrame

val df = peopleRDD.toDF()

//将DataFrame转换成一个临时的视图

df.createOrReplaceTempView("people")

//使用SQL语句进行查询

Sqlsc.sql("select * from people").show()

}

}

(二)通过SparkSession创建DataFrame

object WordCount {

def main(args:Array[String]):Unit={

val conf = new SparkConf()

//设置运行模式为本地运行,不然默认是集群模式

//conf.setMaster("local") //默认是集群模式

//设置任务名

conf.setAppName("WordCount").setMaster("local")

conf.set("spark.default.parallelism","5")

//设置SparkContext,是SparkCore的程序入口

val sc = new SparkContext(conf)

val Sqlsc = new SQLContext(sc) //根据SparkContext生成SQLContext

val array = Array("mark,14","kitty,23","dasi,45")

//1.需要将RDD数据映射成Row,需要引入import org.apache.spark.sql.Row

val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD

val fields = line.split(",")

Row(fields(0),fields(1).trim().toInt)

})

//2.创建StructType定义结构

val st:StructType = StructType(

//字段名,字段类型,是否可以为空

List( //传参是列表类型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil来构建列表

StructField("name",StringType,true),

StructField("age",IntegerType,true)

)

)

//3.使用SparkSession建立DataFrame

val df = Sqlsc.createDataFrame(peopleRDD,st)

//将DataFrame转换成一个临时的视图

df.createOrReplaceTempView("people")

//使用SQL语句进行查询

Sqlsc.sql("select * from people").show()

}

}

(三)通过 json 文件创建DataFrames

[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]

def main(args:Array[String]):Unit={

val conf = new SparkConf()

//设置运行模式为本地运行,不然默认是集群模式

//conf.setMaster("local") //默认是集群模式

//设置任务名

conf.setAppName("WordCount").setMaster("local")

//设置SparkContext,是SparkCore的程序入口

val sc = new SparkContext(conf)

val Sqlsc = new SQLContext(sc) //根据SparkContext生成SQLContext

//通过json数据直接创建DataFrame

val df = Sqlsc.read.json("E:\\1.json")

//将DataFrame转换成一个临时的视图

df.createOrReplaceTempView("people1")

//使用SQL语句进行查询

Sqlsc.sql("select * from people1").show()

}

五:临时视图

(一)什么是视图

视图是一个虚表,跟mysql里的概念是一样的,视图基于实际的表而存在,其实质是一系列的查询语句

(二)类型

局部视图(Temoporary View):只在当前会话中有效,如果创建它的会话终止,则视图也会消失。

全局视图(Global Temporary View): 在全局范围内有效,不同的Session中都可以访问,生命周期是Spark的Application运行周期,全局视图会绑定到系统保留的数据库global_temp中,因此使用它的时候必须加上相应前缀。

(三)创建视图

创建局部视图:df.createOrReplaceTempView("emp")

创建全局视图:df.createOrReplaceGlobalTempView("empG")

(四)视图查询

spark.sql("select * from emp").show

spark.sql("select * from global_temp.empG").show  //查询全局视图,需要添加前缀

(五)会话周期

spark.newSession.sql("select * from emp").show -----> 报错,Table or View Not Found

spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查询

六:DataFrame的read和save和savemode

(一)数据读取

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

//方式一

val df1 = sqlContext.read.json("E:\\666\\people.json")

val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")

//方式二

val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")

val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")

//方式三,默认是parquet格式

val df5 = sqlContext.load("E:\\666\\users.parquet")

  //方式四,使用MySQL进行数据源读取

val url = "jdbc:mysql://192.168.123.102:3306/hivedb"

val table = "dbs"

val properties = new Properties()

properties.setProperty("user","root")

properties.setProperty("password","root")

//需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)

val df = sqlContext.read.jdbc(url,table,properties)

df.createOrReplaceTempView("dbs")

sqlContext.sql("select * from dbs").show()

使用Hive作为数据源:需要在pom.xml文件中添加依赖

org.apache.spark

spark-hive_2.11

2.3.0

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

javax.jdo.option.ConnectionURL

jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true

JDBC connect string for a JDBC metastore

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:一体化政务服务平台推进会(政务信息一体化应用平台)
下一篇:一体化政务服务平台通知(什么是一体化政务服务平台)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~