Spark Structure Streaming 实现监控告警

实现 StreamingQueryListener 抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class KafkaMetrics(servers: String) extends StreamingQueryListener {
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](kafkaProperties)

override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
producer.send(new ProducerRecord("streamingMetrics", event.progress.json))
}

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}

添加 Listener

1
spark.streams.addListener(new KafkaMetrics("kafkaAddress"))

通过实现 kafka 消费者程序, 将监控数据写入 HDFS, 使用公共 dataPipeline, 数据结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
"id":"ee6a297a-07c6-4d59-97b2-e8b5f3a3a240",
"runId":"58cae410-0208-4644-99f2-ba2cd9c3b5c5",
"name":"query_zhx_pv_action",
"timestamp":"2018-11-22T01:38:14.041Z",
"batchId":493,
"numInputRows":1986,
"processedRowsPerSecond":253.5103395455706,
"durationMs":{
"addBatch":6783,
"getBatch":60,
"getOffset":228,
"queryPlanning":371,
"triggerExecution":7834,
"walCommit":116
},
"eventTime":{
"avg":"2018-11-21T05:45:08.262Z",
"max":"2018-11-22T01:31:58.202Z",
"min":"2018-11-20T09:25:45.249Z",
"watermark":"2018-11-18T09:18:23.404Z"
},
"stateOperators":[
{
"numRowsTotal":56,
"numRowsUpdated":47,
"memoryUsedBytes":37583
}
],
"sources":[
{
"description":"KafkaSource[Subscribe[topicBuryPoint-3]]",
"startOffset":{
"topicBuryPoint-3":{
"0":30880
}
},
"endOffset":{
"topicBuryPoint-3":{
"0":32866
}
},
"numInputRows":1986,
"processedRowsPerSecond":253.5103395455706
}
],
"sink":{
"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider@6837dfea"
}
}

使用 Spark 创建分区和视图, 在 redash 里通过 sql 查询并制作图表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
select 
date_format(timestamp, 'HH') as hour,
avg(inputRowsPerSecond) as inputRowsPerSecondAvg,
avg(processedRowsPerSecond) as processedRowsPerSecondAvg,
count(1) as batchNum,
avg(durationMs_getOffset) as durationMs_getOffsetAvg,
avg(durationMs_triggerExecution) as durationMs_triggerExecutionAvg
from
ods.ods_fm_bizdata_streaming_metrics_view
where
date_format(current_timestamp(), 'yyyy-MM-dd') = date_format(timestamp, 'yyyy-MM-dd')
and name like 'query_order%'
group by hour
order by hour desc

image

使用 redash 制作告警

配置定时任务: 每分钟查询前1分钟内的监控数据, 如果低于阈值则为服务异常

测试告警结果:-)

image