spark streaming被压分析

网友投稿 508 2022-09-01

spark streaming被压分析

spark streaming被压分析

在我们使用spark-streaming处理实时数据时,通常在Dstream端的rdd操作较为耗时,此刻的实时数据还在receiver端不断的store。由于数据的处理不及时,即Processing Time < blockInterval也就造成了数据的积压。此时就需要一种机制来解决receiver端store数据的“速率”。在spark streaming中就是被压(backpressure);

简单使用

开启被压参数

spark.streaming.backpressure.enabled=true

此参数会开启spark streaming内部的被压机制(1.5以上版本),开始后spark streaming会根据当前处理批次的scheduling delays(batch调度延迟时间)和 processing times(batch处理时间)控制receiver端的接受速率,以达到和数据的处理速度一样快。设置的接收速率受​​spark.streaming.receiver.maxRate​​参数的影响。

设置初始处理速率

spark.streaming.backpressure.initialRate=xxx

此参数会在receiver接收第一批(first batch)数据时初始化的最大速率,此参数只会在被压参数开启时有效。 设置此参数可以在启动spark streaming程序的瞬间就达到我们期望的最大值,而不是靠被压参数慢慢调整。

设置最小处理速率

spark.streaming.backpressure.pid.minRate=x

此参数在spark streaming中默认值为100.如果我们store的数据为一个集合,那么允许的最小速率就是100集合的数据,此时数据量可能也会很大。所以最好设置一个初始值。比如1.

设置最大处理速率

spark.streaming.receiver.maxRate=xxx

每个receiver接收数据的最大速率,每个dstream最大只能消耗这么多的数据。设置为0或者负数将不做限制。

此参数一般不做设置,除非你的机器上还有其它程序。

被压原理

我们就从receiver端的store方法开始

/** * Store a single item of received data to Spark's memory. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. */ def store(dataItem: T) { supervisor.pushSingle(dataItem) }

store方法中的supervisor对象类型为ReceiverSupervisorImpl 所以直接进入ReceiverSupervisorImpl实现类中

/** Push a single record of received data into block generator. */ def pushSingle(data: Any) { defaultBlockGenerator.addData(data) }

​​defaultBlockGenerator​​​的​​addData​​方法内容为

/** * Push a single data item into the buffer. */ def addData(data: Any): Unit = { if (state == Active) { //等待push waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }

被压机制的实现就在​​waitToPush​​方法中。点进去查看

private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble) def waitToPush() { //从令牌桶中取令牌 rateLimiter.acquire() } private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { if (maxRateLimit > 0) { rateLimiter.setRate(newRate.min(maxRateLimit)) } else { rateLimiter.setRate(newRate) } } private def getInitialRateLimit(): Long = { math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit) }

仔细查看​​rateLimiter​​​对象,我们会方向这个对象就是使用​​Guava​​​的开源工具包​​RateLimiter​​​实现的,如果想了解​​rateLimiter​​​原理的,可以google搜索,一大堆。 有人可能说​​​rateLimiter​​​和​​semphore​​​很像,其实​​semphore​​​是控制并发,而rateLimiter控制速率,尽管速率和并发很像。(具体参考:​​https://en.wikipedia.org/wiki/Little’s_law)​​

从​​getInitialRateLimit​​​方法我们可以看出​​rateLimiter​​​的初始值为​​spark.streaming.backpressure.initialRate​​​,如果没有设置默认为最大速率​​spark.streaming.receiver.maxRate​​​。​​​GuavaRateLimiter.create(getInitialRateLimit().toDouble)​​​方法会创建一个每秒令牌数为初始设置的令牌桶。​​acquire​​方法就是从桶中取令牌。

细心的你可能发现还有个​​updateRate​​方法,此方法会更新每秒能获得的最大令牌数。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:解析一行gc日志
下一篇:使用 Shell 在多服务器上批量操作(使用时间最悠久的能源形式是什么能源)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~