Spark调优多线程并行处理任务实现方式

所属分类: 软件编程 / java 阅读数: 70
收藏 0 赞 0 分享

方式1:

1. 明确 Spark中Job 与 Streaming中 Job 的区别

1.1 Spark Core

一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)

一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算

Job在spark里应用里是一个被调度的单位

1.2 Streaming

一个 batch 的数据对应一个 DStreamGraph

而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作

每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job

2. Streaming Job的并行度

Job的并行度由两个配置决定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job

所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念

这些 Job 由 jobExecutor 依次提交执行

而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job

这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 决定了向 Spark Core 提交Job的并行度

提交一个Job,必须等这个执行完了,才会提交第二个

假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core

Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)

默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job

虽然如此,如果资源够两个job运行,还是会并行运行两个Job

Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:

有延时发生了,batch无法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源

Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。

方式2:

场景1:

程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。

一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。

场景2:

  • 程序需要处理的相似job数随着业务的增长越来越多
  • 我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。
  • spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。

spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务

DStream.foreachRDD{
   rdd =>
    //创建线程池
    val executors=Executors.newFixedThreadPool(rules.length)
    //将规则放入线程池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //执行规则
       runRule(ru,spark)
      }
     })
    }
    //每次创建的线程池执行完所有规则后shutdown
    executors.shutdown()
  }

注意点

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。
  • 如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。
  • 如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。

2.可不可以将创建线程池放到foreachRDD外面?

不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。

3.线程池executor崩溃了就会导致数据丢失

原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

更多精彩内容其他人还在看

利用MultipartFile实现文件上传功能

这篇文章主要为大家详细介绍了利用MultipartFile实现文件上传功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
收藏 0 赞 0 分享

Java编程实现NBA赛事接口调用实例代码

这篇文章主要介绍了Java编程实现NBA赛事接口调用实例代码,具有一定参考价值,需要的朋友可以了解下。
收藏 0 赞 0 分享

Java编程之双重循环打印图形

这篇文章主要介绍了Java编程之双重循环打印图形,属于Java编程基础练习部分,具有一定参考价值,需要的朋友可以了解下。
收藏 0 赞 0 分享

java基础学习JVM中GC的算法

这篇文章主要介绍了java基础学习JVM中GC的算法,通过图文加深对GC算法思路的理解。
收藏 0 赞 0 分享

Java编程Post数据请求和接收代码详解

这篇文章主要介绍了Java编程Post数据请求和接收代码详解,涉及enctype的三种编码,post与get等相关内容,具有一定参考价值,需要的朋友可以了解下。
收藏 0 赞 0 分享

Retrofit+Rxjava实现文件上传和下载功能

这篇文章主要介绍了Retrofit+Rxjava实现文件上传和下载功能,文中提到了单文件上传和多文件上传及相关参数的请求,需要的朋友参考下吧
收藏 0 赞 0 分享

Retrofit+Rxjava下载文件进度的实现

这篇文章主要介绍了Retrofit+Rxjava下载文件进度的实现,非常不错,具有参考借鉴价值,需要的朋友可以参考下
收藏 0 赞 0 分享

java检查服务器的连通两种方法代码分享

这篇文章主要介绍了java检查服务器的连通两种方法代码分享,涉及ping的介绍以及检查服务器连通的两种方法代码示例,具有一定参考价值,需要的朋友可以了解下。
收藏 0 赞 0 分享

Java/Android 获取网络重定向文件的真实URL的示例代码

本篇文章主要介绍了Java/Android 获取网络重定向文件的真实URL的示例代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
收藏 0 赞 0 分享

java并发编程之同步器代码示例

这篇文章主要介绍了java并发编程之同步器代码示例,分享了相关代码,具有一定参考价值,需要的朋友可以了解下。
收藏 0 赞 0 分享
查看更多