Spark调优多线程并行处理任务实现方式

所属分类: 软件编程 / java 阅读数: 55
收藏 0 赞 0 分享

方式1:

1. 明确 Spark中Job 与 Streaming中 Job 的区别

1.1 Spark Core

一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)

一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算

Job在spark里应用里是一个被调度的单位

1.2 Streaming

一个 batch 的数据对应一个 DStreamGraph

而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作

每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job

2. Streaming Job的并行度

Job的并行度由两个配置决定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job

所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念

这些 Job 由 jobExecutor 依次提交执行

而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job

这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 决定了向 Spark Core 提交Job的并行度

提交一个Job,必须等这个执行完了,才会提交第二个

假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core

Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)

默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job

虽然如此,如果资源够两个job运行,还是会并行运行两个Job

Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:

有延时发生了,batch无法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源

Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。

方式2:

场景1:

程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。

一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。

场景2:

  • 程序需要处理的相似job数随着业务的增长越来越多
  • 我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
  • spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。

spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务

DStream.foreachRDD{
   rdd =>
    //创建线程池
    val executors=Executors.newFixedThreadPool(rules.length)
    //将规则放入线程池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //执行规则
       runRule(ru,spark)
      }
     })
    }
    //每次创建的线程池执行完所有规则后shutdown
    executors.shutdown()
  }

注意点

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
  • 如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
  • 如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。

2.可不可以将创建线程池放到foreachRDD外面?

不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。

3.线程池executor崩溃了就会导致数据丢失

原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

更多精彩内容其他人还在看

Java输入输出流复制文件所用时间对比

这篇文章主要介绍了Java输入输出流复制文件所用时间对比的相关资料,非常不错,具有参考解决价值,需要的朋友可以参考下
收藏 0 赞 0 分享

Java线程中start和run方法全面解析

这篇文章主要介绍了Java线程中start和run方法的相关资料,非常不错,具有参考借鉴价值,需要的朋友可以参考下
收藏 0 赞 0 分享

Java的JSON处理器fastjson使用方法详解

下面小编就为大家带来一篇Java的JSON处理器fastjson使用方法详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
收藏 0 赞 0 分享

Java 二维码,QR码,J4L-QRCode 的资料整理

本文主要介绍Java 中二维码,QR码,J4L-QRCode,这里整理了详细的资料供大家学习参考关于二维码的知识,有需要的小伙伴可以参考下
收藏 0 赞 0 分享

java哈夫曼树实例代码

这篇文章主要为大家介绍了java哈夫曼树实例代码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
收藏 0 赞 0 分享

Android读取本地或网络图片并转换为Bitmap

这篇文章主要为大家详细介绍了Android读取本地或网络图片,并转换为Bitmap,感兴趣的小伙伴们可以参考一下
收藏 0 赞 0 分享

Java日期时间操作的方法

这篇文章主要为大家详细介绍了Java日期时间操作的一些方法,获得Calendar,定义日期/时间的格式等,感兴趣的小伙伴们可以参考一下
收藏 0 赞 0 分享

java 获取路径的各种方法(总结)

下面小编就为大家带来一篇java 获取路径的各种方法(总结)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
收藏 0 赞 0 分享

java数据结构与算法之奇偶排序算法完整示例

这篇文章主要介绍了java数据结构与算法之奇偶排序算法,较为详细的分析了奇偶算法的原理并结合完整示例形式给出了实现技巧,需要的朋友可以参考下
收藏 0 赞 0 分享

java数据结构与算法之双向循环队列的数组实现方法

这篇文章主要介绍了java数据结构与算法之双向循环队列的数组实现方法,结合实例形式分析了双向循环队列的原理与数组实现技巧,并附带说明了该算法的用途,需要的朋友可以参考下
收藏 0 赞 0 分享
查看更多