使用 Flume 消费 Kafka 数据到 HDFS

最近使用 Flume 消费 Kafka 数据到 HDFS 上,本文记录下使用过程一些注意点,处理数据格式如下:

1
2
3
4
5
6
7
8
9
10
11
{
"ver": 1,
"type": 101,
"env": 1,
"partnerId": "6763ca7f-bc55-4021-8817-d3f529754b20",
"ctime": "2018-09-09 12:23:10",
"fpath": "xxxx",
"isAnalysis": 0,
"eventId": "121194142179082755",
"data": ""
}

使用数据 Event Time 替换 Process Time

数据写入 HDFS 的目录结构是 /path/to/dt=2018-11-23/at=eventType/...,Flume 默认使用 Process Time,考虑到异步消费 Kafka 数据延时问题,这里需要使用 Event Time 代替,可以使用拦截器来实现

1
2
3
4
5
6
7
eventAgent.sources.s1.interceptors = i1
eventAgent.sources.s1.interceptors.i1.type = regex_extractor
eventAgent.sources.s1.interceptors.i1.regex = \"ctime\":(.*?),
eventAgent.sources.s1.interceptors.i1.serializers = s1
eventAgent.sources.s1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
eventAgent.sources.s1.interceptors.i1.serializers.s1.name = timestamp
eventAgent.sources.s1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm:ss

使用 regex_extractor 正则拦截器,匹配数据中的 ctime 字段,然后按照 yyyy-MM-dd HH:mm:ss 格式序列化为毫秒时间戳,替换 Flume 数据包的 timestamp 字段值

写入 HDFS 单个文件尽量写满一个块

考虑到不浪费 HDFS Name Node 内存,避免过多小文件,配置 HDFS Sink 根据文件大小 roll,单文件写满 126M 才会关闭,另外 Flume 在未写满时会产生 .tmp 临时文件,导致其他计算引擎无法读取该块信息,
所以再加一道保险,当该块一段时间不活跃即主动关闭,我这边指定 30分钟

1
2
3
4
5
6
7
8
9
10
11
12
eventAgent.sinks.s1.type = hdfs
eventAgent.sinks.s1.channel = c1
eventAgent.sinks.s1.hdfs.path = /path/to/dt=%Y-%m-%d/at=eventType
eventAgent.sinks.s1.hdfs.filePrefix = events-
eventAgent.sinks.s1.hdfs.fileType = DataStream
eventAgent.sinks.s1.hdfs.writeFormat = Text
eventAgent.sinks.s1.hdfs.rollCount = 0
eventAgent.sinks.s1.hdfs.rollInterval = 0
## 126M
eventAgent.sinks.s1.hdfs.rollSize = 132120576
## 30分钟不活跃就关闭该文件
eventAgent.sinks.s1.hdfs.idleTimeout = 1800

完整的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
eventAgent.sources = s1
eventAgent.sinks = s1
eventAgent.channels = c1

eventAgent.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
eventAgent.sources.s1.channels = c1
eventAgent.sources.s1.batchSize = 50
eventAgent.sources.s1.batchDurationMillis = 2000
eventAgent.sources.s1.kafka.bootstrap.servers = localhost:9092
eventAgent.sources.s1.kafka.topics = topicBuryPoint2-zhx
eventAgent.sources.s1.kafka.consumer.group.id = topicBuryPoint2-zhx-9009

eventAgent.sources.s1.interceptors = i1
eventAgent.sources.s1.interceptors.i1.type = regex_extractor
eventAgent.sources.s1.interceptors.i1.regex = \"ctime\":\"(.*?)\",
eventAgent.sources.s1.interceptors.i1.serializers = s1
eventAgent.sources.s1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
eventAgent.sources.s1.interceptors.i1.serializers.s1.name = timestamp
eventAgent.sources.s1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm:ss

eventAgent.sinks.s1.type = hdfs
eventAgent.sinks.s1.channel = c1
eventAgent.sinks.s1.hdfs.path = /path/to/dt=%Y-%m-%d/at=appType
eventAgent.sinks.s1.hdfs.filePrefix = events-
eventAgent.sinks.s1.hdfs.fileType = DataStream
eventAgent.sinks.s1.hdfs.writeFormat = Text
eventAgent.sinks.s1.hdfs.rollCount = 0
eventAgent.sinks.s1.hdfs.rollInterval = 0
## 126M
eventAgent.sinks.s1.hdfs.rollSize = 132120576
## 30分钟不活跃就关闭该文件
eventAgent.sinks.s1.hdfs.idleTimeout = 1800

## Channel definition
eventAgent.channels.c1.type = file
eventAgent.channels.c1.checkpointDir = /path/to/recovery/eventAgent/checkpoint
eventAgent.channels.c1.dataDirs = /path/to/recovery/eventAgent/data

## Connect
eventAgent.sources.s1.channels = c1
eventAgent.sinks.s1.channel = c1
喜欢我的文章的人,可以自愿付费。