Spark Structured Streaming 性能诊断

最近组内同事和我反馈,我提交到集群上的一个实时计算 Job 资源占用较高,而该 Job 处理数据量不大,所以怀疑有性能问题。

打开 Spark 应用监控后台如下图:

目前我们处理实时计算都是基于 Structured Streaming,本质上是一个个 Micro Batch,挑了其中一个 Batch Job 发现被拆分为 2 个 Stages,说明其中发生了一次 Shuffle,然后 Stage1 的 Tasks 数为 5,和 Source 的 Kafka topic 分区数一致,Stage2 的 Tasks 数居然是 300, 问题找到了:

Shuffle Partitions 过大,相应的分配给 Stage2 的 Task 数过多,导致资源占用过高

原因找到了那就调整配置,修改 shuffle partition 的配置项为 spark.sql.shuffle.partitions,指定的方式也有几种:

  • driver 程序中创建 SparkSession 时指定
1
2
3
4
val spark = SparkSession.builder.
appName("App Name xxxx").
config("spark.sql.shuffle.partitions", 2).
getOrCreate()
  • 通过 spark-submit 提交任务时指定
1
2
3
4
5
6
7
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.sql.shuffle.partitions=2 \
--queue queueName \
--packages packageName \
--class ClassName /path/to/xxxx.jar \

修改后重新提交 Job,发现问题:

监控后台 Environment 显示配置项值已经为 2,但是 Stage2 的 Task 数依旧为300,说明配置没有生效

google 了一圈没有找到相关的资料,只好翻了下 Structured Streaming 相关的源码,发现比较有意思的几点:

其中 MicroBatchExecution.scala 继承自 StreamExecution.scala 抽象类,管理着 Micro-Batch 方式的执行逻辑,与之对应还有 Continuous 方式,相应的子类是 ContinuousExecution.scala,关于 Continuous 方式这里不做详细描述。

阅读 MicroBatchExecution.scala -> runActivatedStream 方法,其中有一段初始化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Repeatedly attempts to run batches as data arrives.
*/
protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
triggerExecutor.execute(() => {
startTrigger()

if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionForStream)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
}

currentBatchId 默认为 -1,每次处理完一个 batch 后 +1,继续阅读 populateStartOffsets 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Populate the start offsets to start the execution at the current offsets stored in the sink
* (i.e. avoid reprocessing data that we have already processed). This function must be called
* before any processing occurs and will populate the following fields:
* - currentBatchId
* - committedOffsets
* - availableOffsets
* The basic structure of this method is as follows:
*
* Identify (from the offset log) the offsets used to run the last batch
* IF last batch exists THEN
* Set the next batch to be executed as the last recovered batch
* Check the commit log to see which batch was committed last
* IF the last batch was committed THEN
* Call getBatch using the last batch start and end offsets
* // ^^^^ above line is needed since some sources assume last batch always re-executes
* Setup for a new batch i.e., start = last batch end, and identify new end
* DONE
* ELSE
* Identify a brand new batch
* DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
case Some((latestBatchId, nextOffsets)) =>
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
}
committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// update offset metadata
nextOffsets.metadata.foreach { metadata =>
OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)
offsetSeqMetadata = OffsetSeqMetadata(
metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf)
}

// ...
}

populateStartOffsets 方法会用上一次 batch 的元数据更新下一次 batch 的 SparkSession Config,填充逻辑在 OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Set the SparkSession configuration with the values in the metadata */
def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: RuntimeConfig): Unit = {
OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>

metadata.conf.get(confKey) match {

case Some(valueInMetadata) =>
// Config value exists in the metadata, update the session config with this value
val optionalValueInSession = sessionConf.getOption(confKey)
if (optionalValueInSession.isDefined && optionalValueInSession.get != valueInMetadata) {
logWarning(s"Updating the value of conf '$confKey' in current session from " +
s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
}
sessionConf.set(confKey, valueInMetadata)

case None =>
// For backward compatibility, if a config was not recorded in the offset log,
// then log it, and let the existing conf value in SparkSession prevail.
logWarning (s"Conf '$confKey' was not found in the offset log, using existing value")
}
}
}

到此,配置不生效的原因已经清楚了:

初次执行时由于没有修改该配置项,导致使用系统默认值 300,后续的 batch 每次都延续上一个 batch 的元数据,所以始终都会是 300

查看下 HDFS 上元数据也印证了以上结论

解决办法

重新指定 checkpoint 路径,重新提交后配置生效,Tasks 数由 305 (5 + 300) 降至 7 (5 + 2),Job 跑的如丝般顺滑 :)

喜欢我的文章的人,可以自愿付费。