微前端架构如何改变企业的开发模式与效率提升
902
2022-10-01
【SparkSQL 项目】出租乘客行程记录分析
文章目录
一、项目分析
1.数据集结构2.业务场景3.处理过程使用的技术点
二、流程分析
1.理解数据集2.理解需求和结果集3.反推每一个步骤
三、步骤分析
1.读取数据集2.数据清洗
2.1 数据转换 DataFrame:DataSet[Row] => DataSet[Trip]
① 定义Trip样例类② 转换数据
⑴ 定义转换对象方法: RDD[Row] => RDD[Trip]⑵ 转换时间类型数据⑶ 转换地点数据⑷ 包装Row处理空值 --- 返回option⑸ 异常处理
★ 简单案例解释Either作用
2.2 剪除异常数据
① 查询异常数据 ---- 统计时间分布直方图
◆ 使用UDF
▶ 剪除异常数据 ---- 绘制时长直方图▶ 根据直方图的显示,查看数据分布后,剪除反常数据,保留时长 0-3h 的数据集
3.行政区信息
3.1 求出borough
① 读取行政区位置信息
▲GeojsON
地理位置的表示行政区范围数据集 .geojson使用步骤
▲GeoJSON 解析工具▲JSON4S介绍
补充:JSON解析配置Maven依赖
▲具体实现Step 1: 创建目标类Step 2: 将 JSON 字符串解析为目标类对象Step 3:解析 GeoJSON ---- 转换 JSON 为 Geometry 对象Step 4: 读取数据集, 转换数据
② 将 Geometry 数据集按照区域大小排序③ 广播 Geometry 信息, 发给每一个 Executor④ 创建 UDF, 通过经纬度获取行政区信息⑤ 统计行政区信息
4.会话统计
4.1 需求分析
▲具体实现
Step 1: 过滤没有经纬度的数据、划分会话Step 2: 求得时间差Step 3: 统计数据
一、项目分析
1.数据集结构
主要列:
主要描述:
数据意义:
每一行数据 -> 一次行程 ->哪个出租车的:上下车时间,上下车位置
返回顶部
2.业务场景
在网约车出现之前,出行很大一部分要靠出租车和公共交通。所以经常会见到一些情况,比如说从东直门打车告诉师傅要去昌平,可能拒载。这种情况所凸显的是一个出租车调度的难题。所以需要先通过数据来看到问题,后解决问题。
所以要统计出租车利用率也就是有乘客乘坐的时间,和无乘客空跑的时间比例。这是一个理解出租车的重要指标,最响利用率的一个因素就是目的地,比如说去昌平,可能出租车师傅不确定自己是否要空放回来;而去国贸,下车几分钟内,一定能有新的顾客上。
而统计利用率的时候需要用到时间数据和空间据来进行计算,对于时计算来说, SparkSQL提供了很多工具和函数可以使用,而空间计算仍然是一个比较专业的场景,需要使用到第三方库。
我们的需求是在上述的数据集中,根据时间算出等待时间;根据地点落地到某个区,算出某个区的平均等待时间,也就是这个下车地点对于出租车利用率的影响。
返回顶部
3.处理过程使用的技术点
1.数据清洗数据清洗在几乎所有类型的项目中都会遇到处理数据的类型,处理空值等问题2.JSON解析JSON解析在大部分业务系统的数据分析中都会用到如何读取JSON数据,如何把JSON数据变为可以使用的对象数据3.地理位置信息处理地理位置信息的处理是一个比较专业的场景在一些粗车网站,或者像滴滴,Uber之类的出行服务上,也经常会处理地理位置信息4.探索性数据分析从拿到一个数据集明确需求以后,如何逐步了解数据集如何从数据集中探索对应的内容等,是一个数据工程师的基本素质5.会话分析会话分析用于识别同一个用户的多个操作之间的关联,是分析系统常见的分析模式在电商和搜索引擎中非常常见
返回顶部
二、流程分析
1.理解数据集
首先要理解数据集,要回答自己一些问题:
这个数据集是否以行作为单位?是否是 DataFrame可以处理的?大部分情况下都是这个数据集每行记录所代表的实体对象是什么?例如:出租车的载客记录表达这个实体对象的最核心字段是什么?例如:上下车地点和时间唯一标识一辆车的 License(id)
2.理解需求和结果集
第一点,理解需求再动手绝对不会浪费时间;第二点,在数据分析的任务中,如何无法理解需求可能根本无从动手我们的需求是:出租车在某个地点的平均等待客人时间简单来说,结果集中应该有的列地点平均等待时间
3.反推每一个步骤
结果集中,应该有的字段有两个,一个是地点,一个是等待时间
地点如何获知?其实就是乘客的下车点,但是是一个坐标如何得到其在哪个区?等待时间如何获知?其实就是上一个乘客下车到下一个乘客上车之间的时间通过这两个时间的差值便可获知。
返回顶部
三、步骤分析
1.读取数据集
数居集很大所以我截取了一小部分,大概百分之一左右如果大家感兴趣的话,可以将完整数据集放在集群中,使用集群来计算大数据
// 1.读取数据集val source = spark.read .option("header", value = true) .csv("G:\\Projects\\IdeaProjects\\Spark\\dataset\\train.csv")source.show() // 展示数据集source.printSchema() // 打印结构信息
在读取了数据集后,可以发现在结构信息中的上下车时间字段数据类型为字符串类型数据,显然需要进行转换。并且读取的数据中有些列属于无关信息,需要剔除。
返回顶部
2.数据清洗
数据集当中的某些列名可能使用起来不方便或者数据集当中某些列的值类型可能不对,或者数据集中有可能存在缺失值,这些都是要清洗的动机。
2.1 数据转换 DataFrame:DataSet[Row] => DataSet[Trip]
通过 DataFramoReader读取出来的数据集是 DataFrame(DataSet[Row]),而 Dataframe中保存的是Row对象.但是后续我们在进行处理的时候可能要使用到一些有类型的转换,也需要每一列数据对应自己的数据类型.所以需要将Row所代表的弱类型对象转为Trip这样的强类型对象,而Trip对象则是一个样例类用于代表一个出租车的行程.
① 定义Trip样例类
// 4.Trip 样例类case class Trip( id: String, // 出租车执照 pickUpTime: Long, // 上车时间 dropOffTime: Long, // 下车时间 pickUpX: Double, // 上车地经度 pickUpY: Double, // 上车点纬度 dropOffX: Double, // 下车点经度 dropOffY: Double // 下车点纬度)
返回顶部
② 转换数据
// DataFrame:DataSet[Row] => DataSet[Trip]source.rdd.map(。。。)
⑴ 定义转换对象方法: RDD[Row] => RDD[Trip]
// 5.1定义转换对象方法: RDD[Row] => RDD[Trip] def prase(row: Row): Trip = { // 创建RichRow对象 val richRow = new RichRow() // orNull 方法 --- 若前面有值则返回前面的值,若没有值则返回Null;即使没有值时也便于后期对空值的处理 val id = richRow.getAs[String]("id").orNull val pickUpTime = parseTime(richRow, "pickup_datetime") val dropOffTime = parseTime(richRow, "dropoff_datetime") val pickUpX = parseLocation(richRow, "pickup_longitude") val pickUpY = parseLocation(richRow, "pickup_latitude") val dropOffX = parseLocation(richRow, "dropoff_longitude") val dropOffY = parseLocation(richRow, "dropoff_latitude") // 最终转化返回的Trip对象 Trip(id, pickUpTime, dropOffTime, pickUpX, pickUpY, dropOffX, dropOffY) }
返回顶部
⑵ 转换时间类型数据
返回顶部
⑶ 转换地点数据
// 5.1.3 转换地点数据 def parseLocation(row: RichRow, field: String): Double = { // 1.获取对应的数据 val location = row.getAs[String](field) // 2.转换数据 val locationOption = location.map( loc => loc.toDouble) locationOption.getOrElse(0.0D) }
返回顶部
⑷ 包装Row处理空值 — 返回option
// 5.1.1 包装Row处理空值 --- 返回optionclass RichRow(row: Row) { // 重写getAs方法 // 指定泛型 --- 得到指定类型的数据 // 指定参数 --- 传入的形参 /** * 为了返回option提醒外部处理空值,提供处理方式 * 一个方法返回了Option代表这个方法结果有可能为空,使得方法调用处必须处理null的情况 * Option本身提供了一些处理null的方法 * @param field * @tparam T * @return */ def getAs[T](field: String): Option[T] = { // 1.判断row.getAs的内容是否为空 if (row.isNullAt(row.fieldIndex(field))) { // 2.null -> 返回None None } else { // 3.noe null -> 返回 Some Some(row.getAs[T](field)) } }}
返回顶部
⑸ 异常处理
对于返回结果的并列性,Scala中提供了一个类似于其它语言中多返回值的Either。 Either分为两种情况,一个是left,一个是right,左右两个结果代表的意思可以由用户来决定,也就是自定义。
★ 简单案例解释Either作用
/** * 封装 parse 方法,捕获异常 * .map(safe(prase)) map接收的是一个函数,所以safe封装了prase方法后返回的还应该是一个函数 * @param function 参数为需要被封装的方法,返回值类型为一个Either对象 * @tparam P 泛型P代表参数类型,表示sourceRdd中的每一条数据类型 * @tparam R 泛型R代表返回值类型,表示处理后的数据类型 * @return */ def safe[P,R](function:P => R):P => Either[R,(P,Exception)] = { new Function[P,Either[R,(P,Exception)]] with Serializable{ override def apply(param: P): Either[R, (P, Exception)] = { // 包裹逻辑要在apply方法中重写 try{ Left(function(param)) } catch { case e: Exception => Right((param,e)) } } } }}
优化后调用数据转换方法:
通过异常处理后得到的是一个Either类型的对象,包含正常以及异常值
// 对结果进行判断// 通过以下方式可以过滤出所有异常的row// val result = transform.filter( e => e.isRight)// .map( e => e.right.get._1)// 5.2 将结果对象转为leftval taxi_clean: Dataset[Trip] = transform.map(either => either.left.get).toDS()
利用map算子将结果对象转换为正常结果 — 获取left部分数据集
返回顶部
2.2 剪除异常数据
① 查询异常数据 ---- 统计时间分布直方图
◆ 使用UDF
▶ 剪除异常数据 ---- 绘制时长直方图
上车时长 = (dropOffTime: Long下车时间 - pickUpTime: Long上车时间)单位转换 ms 显然不合适,转为 hour
// 6.1 编写程序 --- UDF, 将毫秒转换为小时单位val hours = (dropOffTime: Long, pickUpTime:Long) => { // 计算时长 val duration = dropOffTime - pickUpTime // 转换时长 --- hour,并返回结果 TimeUnit.HOURS.convert(duration,TimeUnit.MILLISECONDS)}// 生成UDF函数val hoursUDF = udf(hours)// 6.2 进行时长统计 至少需要两列:时长、计数 ---> 分组聚合taxi_clean.groupBy(hoursUDF('dropOffTime,'pickUpTime) as "duration") .count() .sort('duration desc ) .show(100)
▶ 根据直方图的显示,查看数据分布后,剪除反常数据,保留时长 0-3h 的数据集
// 6.3.1 注册UDF函数spark.udf.register("hours",hours)// 6.3.2 利用注册的UDF函数剪除数据val taxi_clean_drop = taxi_clean.where("hours(dropOffTime,pickUpTime) BETWEEN 0 AND 3")taxi_clean_drop.show()
返回顶部
3.行政区信息
由于最终要统计的结果是按照区域作为单位,而不是一个具体的目的地点所以要在数据集中增加列中放置区域信息1.既然是放置行政区名字,应该现有行政区以及其边界的信息2.通过上下车的坐标点,可以判断是否存在于某个行政区中
这些判断坐标点是否属于某个区域,这些信息就是专业的领域了
○ 源数据集
返回顶部
3.1 求出borough
① 读取行政区位置信息
▲GeoJSON
地理位置的表示
行政区范围数据集 .geojson
使用步骤
返回顶部
▲GeoJSON 解析工具
返回顶部
▲JSON4S介绍
一般在 Java 中, 常使用如下三个工具解析 JSON
GsonGoogle 开源的 JSON 解析工具, 比较人性化, 易于使用, 但是性能不如 Jackson, 也不如 Jackson 有积淀JacksonJackson 是功能最完整的 JSON 解析工具, 也是最老牌的 JSON 解析工具, 性能也足够好, 但是 API 在一开始支持的比较少, 用起来稍微有点繁琐FastJson阿里巴巴的 JSON 开源解析工具, 以快著称, 但是某些方面用起来稍微有点反直觉
补充:JSON解析
【SparkSQL】扩展 ---- JSON解析
配置Maven依赖
返回顶部
▲具体实现
对照 JSON 中的格式, 创建解析的目标类解析 JSON 数据转为目标类的对象读取数据集, 执行解析
Step 1: 创建目标类
case class FeatureCollection(features:List[Feature])case class Feature(properties:Map[String,String],geometry: JObject)
返回顶部
Step 2: 将 JSON 字符串解析为目标类对象
object FeatureExtraction{ // 完成具体的解析工作 def parseJson(json:String):FeatureCollection = { // 1.导入一个format 隐式转换 implicit val formats = Serialization.formats(NoTypeHints) // 2.Json ->object val featureCollection = read[FeatureCollection](json) featureCollection }}
返回顶部
Step 3:解析 GeoJSON ---- 转换 JSON 为 Geometry 对象
case class Feature(properties:Map[String,String],geometry: JObject){def getGeometry:Geometry = { // Geometry 对象 实现 import org.json4s.jackson.JsonMethods._ val mapGeo = GeometryEngine.geoJsonToGeometry(compact(render(geometry)),0,Geometry.Type.Unknown) mapGeo.getGeometry}}
返回顶部
Step 4: 读取数据集, 转换数据
// 7.1 读取数据集val geoJson = Source.fromFile("dataset/nyc-borough-boundaries-polygon.geojson").mkStringval featureCollection = FeatureExtraction.parseJson(geoJson)
返回顶部
② 将 Geometry 数据集按照区域大小排序
后续需要得到每一个出租车在哪个行政区域,拿到经纬度,遍历feature.行政区范围大的命中率高,在后续筛选的时候减少遍历次数。
// 7.2 排序val areaSortedFeatures = featureCollection.features.sortBy( feature => { (feature.properties("boroughCode"), -feature.getGeometry.calculateArea2D()) })
返回顶部
③ 广播 Geometry 信息, 发给每一个 Executor
/ 7.3 广播 --- 减少数据集的拷贝量val featureBC = spark.sparkContext.broadcast(areaSortedFeatures)
返回顶部
④ 创建 UDF, 通过经纬度获取行政区信息
// 7.4 UDF创建,完成功能val boroughLookUp = (x: Double, y: Double) => { // 7.4.1 搜索经纬度所在的行政区 val featureHit = featureBC.value.find(feature => { // 判断是否包含 GeometryEngine.contains(feature.getGeometry, new Point(x, y), SpatialReference.create(4326)) }) // 7.4.2 转为行政区信息 val borough = featureHit.map(feature => feature.properties("borough")).getOrElse("NA") borough}
返回顶部
⑤ 统计行政区信息
// 7.5 统计信息val boroughUDF = udf(boroughLookUp)taxi_clean_drop.groupBy(boroughUDF('dropOffX,'dropOffY)) .count() .show()
返回顶部
4.会话统计
数据集中存在很多出租车师傅的数据,所以如何将某个师傅的记录发往一个分区,在这个分区上完成会话分析呢?这也是一个需要理解的点
4.1 需求分析
目标:统计每个行政区的平均等客时间
需求可以拆分为如下几个步骤:
按照行政区分组在每一个行政区中, 找到同一个出租车司机的先后两次订单, 本质就是再次针对司机的证件号再次分组求出这两次订单的下车时间和上车时间之差, 便是等待客人的时间针对一个行政区, 求得这个时间的平均数
问题: 分组效率太低
分组的效率相对较低分组是 Shuffle两次分组, 包括后续的计算, 相对比较复杂
解决方案: 分区后在分区中排序
无论是刚才的多次分组, 还是后续的分区, 都是要找到每个司机的会话, 通过会话来完成功能, 也叫做会话分析。
返回顶部
▲具体实现
Step 1: 过滤没有经纬度的数据、划分会话
// 8.1 过滤没有经纬度的数据// 8.2 按照分区的id、上车时间进行排序val sessions =taxi_clean_drop.where("dropOffX!=0 and dropOffY!=0 and pickUpX!=0 and pickUpY!=0") .repartition('id) .sortWithinPartitions('id,'pickUpTime)
返回顶部
Step 2: 求得时间差
// 8.3 获取时间差def boroughDuration(t1:Trip,t2:Trip):(String,Long) = { val borough = boroughLookUp(t1.dropOffX,t1.dropOffY) val duration = (t2.dropOffTime - t1.pickUpTime)/1000 (borough,duration)}// 得到含有时间差、地区信息的数据集val BoroughDuration = sessions.mapPartitions(trips => { // 找到长度为2的窗口,每次移动一个单位 val viter = trips.sliding(2) .filter(_.size == 2) // 过滤调数据不为奇数的 .filter( p => p.head.id == p.last.id) // 判定分区中的出租车id为同一个 viter.map( p => boroughDuration(p.head,p.last))}).toDF("borough","seconds")
返回顶部
Step 3: 统计数据
// 分组聚合 BoroughDuration.where("seconds > 0") .groupBy("borough") .agg(avg('seconds),stddev('seconds)) .show()
返回顶部
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~