Spark 脚手架工程,标准化 spark 开发、部署、测试流程

网友投稿 1153 2022-10-22

Spark 脚手架工程,标准化 spark 开发、部署、测试流程

Spark 脚手架工程,标准化 spark 开发、部署、测试流程

Fire-Spark

让Spark开发变得更简单

Demo工程

BUILD和使用方法

1.构建方法

$git clone https://github.com/GuoNingNing/fire-spark.git $cd fire-spark $mvn clean install maven构建时已默认在pom中跳过了所有的test代码的编译和测试 构建完成之后会将jar安装到m2的对应路径下,使用时在自己的项目的pom.xml文件里添加 org.fire.spark.streaming fire-spark 2.2.0_kafka-0.10

2.配置说明

可使用script 中的create_default_conf.sh创建默认配置文件,使用方法:$bash create_default_conf.sh >my.properties生成的标准配置内容此配置文件可以自定义添加任何以spark开头的配置参数,spark程序本身的依赖参数也在此配置文件配置####################################################### ## spark process run.sh ## user config ## ########################################################必须设置,执行class的全包名称spark.run.main=z.cloud.t3.Demon#必须设置,包含main class的jar包#jar必须包含在lib.path当中spark.run.main.jar=t3-1.0-SNAPSHOT.jar#提供给执行class的命令行参数,多个参数之间用逗号隔开,参数中不能包含空格等空白符#Ex:param1,param2,..spark.run.self.params=--checkpointPath,/tmp/checkpoint#可以是绝对路径,也可以是相对此配置文件的相对路径#相对路径会自动补全spark.lib.path=lib####################################################### ## spark self config ## ########################################################执行集群设置,不用设置,一般使用YARNspark.master=yarn#YARN部署模式#default=clusterspark.submit.deployMode=cluster#spark-streaming每个批次间隔时间#default=300spark.batch.duration=30#spark网络序列化方式,默认是JavaSerializer,可针对所有类型但速度较慢#这里使用推荐的Kryo方式#kafka-0.10必须使用此方式spark.serializer=org.apache.spark.serializer.KryoSerializer#++++++++++++++++++++++Driver节点相关配置+++++++++++++++++++++++++++#Driver节点使用内存大小设置#default=1Gspark.driver.memory=512M#Driver节点使用的cpu个数设置#default=1spark.driver.cores=1#Driver节点构建时spark-jar和user-jar冲突时优先使用用户提供的,这是一个实验性质的参数只对cluster模式有效#default=falsespark.driver.userClassPathFirst=false#++++++++++++++++++++++Executor节点相关配置+++++++++++++++++++++++++#Executor个数设置#default=1spark.executor.instances=1#Executor使用cpu个数设置#default=1spark.executor.cores=1#Executor使用内存大小设置#default=1Gspark.executor.memory=512M#同driver节点配置作用相同,但是是针对executor的#default=falsespark.executor.userClassPathFirst=true#++++++++++++++++++++++++Executor动态分配相关配置++++++++++++++++++++#Executor动态分配的前置服务#default=falsespark.shuffle.service.enabled=true#服务对应的端口,此端口服务是配置在yarn-site中的,由NodeManager服务加载启动#default=7337spark.shuffle.service.port=7337#配置是否启用资源动态分配,此动态分配是针对executor的,需要yarn集群配置支持动态分配#default=falsespark.dynamicAllocation.enabled=true#释放空闲的executor的时间#default=60sspark.dynamicAllocation.executorIdleTimeout=60s#有缓存的executor空闲释放时间#default=infinity(默认不释放)#spark.dynamicAllocation.cachedExecutorIdleTimeout=#初始化executor的个数,如果设置executor-number,谁小用谁#default=minExecutors(不设置使用此项配置值)spark.dynamicAllocation.initialExecutors=1#executor动态分配可分配最大数量#default=infinityspark.dynamicAllocation.maxExecutors=60#executor动态收缩的最小数量#default=0spark.dynamicAllocation.minExecutors=1#批次调度延迟多长时间开始增加executor#default=1sspark.dynamicAllocation.schedulerBacklogTimeout=1s#同上,但是是针对之后的请求#default=SchedulerBacklogTimeout(不设置使用此项配置值)spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=1s####################################################### ## spark process ## Source config ## #########################################################spark.source.kafka.consume 后的配置是kafka标准配置,指定之后,将会自动传递给kafkaConsumer##spark.source.kafka.consume-ics=homework_sync_canal#spark.source.kafka.consume.group.id=z.cloud.kafka.consumer.0018#spark.source.kafka.consume.bootstrap.servers=kafka1:9092,kafka2:9092spark.source.kafka.consume.auto.offset.reset=earliest#offset.store.type是指定如何管理offset的.不指定则匿名消费,kafka表示由kafka自己管理,另有redis和hbase的管理方式spark.source.kafka.offset.store.type=kafka#使用redis管理offset时的配置依赖项#spark.source.kafka.offset.store.type=redis#spark.source.kafka.offset.store.redis.hosts=localhost#spark.source.kafka.offset.store.redis.port=6379#spark.source.kafka.consume.key.deserializer=org.apache.kafka.common.serialization.StringDeserializerspark.source.kafka.consume.value.deserializer=org.apache.kafka.common.serialization.StringDeserializerspark.source.kafka.consume.max.partition.fetch.bytes=10485760spark.source.kafka.consume.fetch.max.wait.ms=3000######################################################### ## spark process ## Sink config ## ########################################################目前暂不支持自动的sink配置,此配置需用户在自己的代码里通过sparkConf.get("spark.sink.hdfs.path")的方式获取使用spark.sink.hdfs.path=/tmp/testspark.sink.redis.host=192.168.1.1spark.sink.redis.port=6379spark.sink.redis.db=0spark.sink.redis.timeout=30这只是简单的示例配置,更详细的配置参考后文中介绍的产生配置的脚本

3.示例代码

package z.cloud.t3import org.apache.spark.streaming.StreamingContextimport org.fire.spark.streaming.core.FireStreamingimport org.fire.spark.streaming.core.plugins.kafka.KafkaDirectSourceobject Demon extends FireStreaming { override def handle(ssc : StreamingContext): Unit = { val source = new KafkaDirectSource[String,String](ssc) //val conf = ssc.sparkContext.getConf source.getDStream[(String,String)](m => (m-ic,m.value)).foreachRDD((rdd,time) => { rdd.take(10).foreach(println) source.updateOffsets(time.milliseconds) }) }}

4.运行示例代码

使用script中的create_default_conf.sh创建标准配置文件 $bash create_default_conf.sh >my.properties 创建配置文件之后按配置文件按提示设置必须的参数 spark.run.main=z.cloud.t3.Demon spark.run.main.jar=t3-1.0.jar 设置完成后使用script中的run.sh来提交和停止任务 启动 $bash run.sh my.properties 停止(任意给第二个参数即可kill掉spark任务) $bash run.sh my.properties stop

5.API 说明

5.1.框架说明

org ----apache.spark ----streaming[自定义的spark streaming一些Listenter] ----CongestionMonitorListener 拥堵监控 ----[会对streaming任务是否产生拥堵进行告警和停止任务] ----JobInfoReportListener 任务信息记录 ----[会对streaming对任务详细信息进行记录输出到对应的kafka中] ----自定义及使用方法见后文的 5.2 Streaming Listener内容 ----RpcDemo里是spark RPC服务和client实现的演示代码 ----对应的启动代码在resources里 ----spark-rpcdemo client启动代码放到spark/bin下即可使用 ----start-rpcdemo-server.sh 放到spark/sbin下既可以使用 ----demo[示例代码] ----fire.spark.streaming.core[FireSpark的核心架构代码] ----channels[通道组建] ----[目前只有接口没有具体实现] ----format[输入输出格式化组建] ----kit[工具类] ----plugins[插件类] ----hbase[hbase连接池相关组建] ----kafka[kafka生产消费相关组建] ----manager[offset管理相关] ----writer[写入kafka相关] ----redis[redis连接池相关组建] ----sinks[输出组建] ----[目前有influx,kafka,mysql,redis,show等] ----sources[spark streaming数据源组建] ----[目前只有接口没有具体实现] ----FireStreaming[用户需继承实现的特质] ----init ----[初始化SparkConf的函数,用户可做一些自定义初始化动作] ----[函数会在handle之前执行] ----[参数是SparkConf] ----[返回值是Unit] ----handle ----[用户可将计算逻辑实现在这个函数里] ----[函数会在main函数的最后执行] ----[参数是StreamingContext] ----[返回值是Unit] ----SQLContextSingleton[创建sparkSQL的实例对象] script[部署相关的脚本] ----create_default_conf.sh ----[产生FireSpark以及Spark自身需要的一些参数] ----[包括run.sh需要的参数以及相关一些组建需要的参数] ----[更详细的内容可以看脚本自身,其中包含了说明] ----run.sh ----[提交spark任务的脚本,会做一些基础参数检查和生成] ----[需要的参数是properties类型的文件,可由上面的脚本产生] ----[将run脚本加入到crontab中即可简单实现失败重启] ----[脚本自带防重复启动的功能] ----create_template_project.sh ----[创建模版项目,需要一个路径,将会在这个路径下创建模版项目] ----[模版项目包含一个父级pom文件以及一个子模块和模块需要的pom和assembly文件及相关目录结构]

5.2.Spark相关内容

Spark RPCSpark ListenerStreaming Listener

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:动态代理实现原理
下一篇:Mybatis拦截器实现数据权限的示例代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~