Flume中的HDFS Sink配置 和示例

Flume中的HDFS Sink配置参数说明

type:hdfs

path:hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/

filePrefix:默认值:FlumeData,写入hdfs的文件名前缀

fileSuffix:写入 hdfs 的文件名后缀,比如:.lzo .log等。

inUsePrefix:临时文件的文件名前缀,hdfs sink 会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;

inUseSuffix:默认值:.tmp,临时文件的文件名后缀。

rollInterval:默认值:30:hdfs sink 间隔多长将临时文件滚动成最终目标文件,单位:秒;

                       如果设置成0,则表示不根据时间来滚动文件;

注:滚动(roll)指的是,hdfs sink 将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

rollSize:默认值:1024:当临时文件达到多少(单位:bytes)时,滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;

rollCount:默认值:10:当 events 数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件;

idleTimeout:默认值:0:当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;

batchSize:默认值:100:每个批次刷新到 HDFS 上的 events 数量;

codeC:文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy

fileType:默认值:SequenceFile,文件格式,包括:SequenceFile, DataStream,CompressedStream

                                   当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;

                                   当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;

maxOpenFiles:默认值:5000:最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;

minBlockReplicas:默认值:HDFS副本数,写入 HDFS 文件块的最小副本数。

                                     该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。

writeFormat:写 sequence 文件的格式。包含:Text, Writable(默认)

callTimeout:默认值:10000,执行HDFS操作的超时时间(单位:毫秒);

threadsPoolSize:默认值:10,hdfs sink 启动的操作HDFS的线程数。

rollTimerPoolSize:默认值:1,hdfs sink 启动的根据时间滚动文件的线程数。

kerberosPrincipal:HDFS安全认证kerberos配置;

kerberosKeytab:HDFS安全认证kerberos配置;

proxyUser:代理用户

round:默认值:false,是否启用时间上的”舍弃”;

roundValue:默认值:1,时间上进行“舍弃”的值;

roundUnit:默认值:seconds,时间上进行”舍弃”的单位,包含:second,minute,hour

示例:

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:/flume/events/20151016/17:30/00

因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。

timeZone:默认值:Local Time,时区。

useLocalTimeStamp:默认值:flase,是否使用当地时间。

closeTries:默认值:0,hdfs sink 关闭文件的尝试次数;

                      如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。

                      设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。

retryInterval:默认值:180(秒),hdfs sink 尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.

serializer:默认值:TEXT,序列化类型。

示例:

agent.sources.DECISION-HBASE-DATA.type=spooldir 
agent.sources.DECISION-HBASE-DATA.spoolDir=/var/local/flume
agent.sources.DECISION-HBASE-DATA.channels=DECISION-HBASE-DATA

agent.channels.DECISION-HBASE-DATA.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.DECISION-HBASE-DATA.kafka.bootstrap.servers=192.168.90.31:9094,192.168.90.32:9094,192.168.90.33:9094
agent.channels.DECISION-HBASE-DATA.kafka.topic=DECISION-HBASE-DATA
agent.channels.DECISION-HBASE-DATA.kafka.consumer.group.id=hdfs-20180906-20
agent.channels.DECISION-HBASE-DATA.parseAsFlumeEvent=false
agent.channels.DECISION-HBASE-DATA.migrateZookeeperOffsets=false
agent.channels.DECISION-HBASE-DATA.pollTimeout=2000
agent.channels.DECISION-HBASE-DATA.kafka.consumer.auto.offset.reset=earliest
agent.channels.DECISION-HBASE-DATA.kafka.producer.security.protocol=PLAINTEXT
agent.channels.DECISION-HBASE-DATA.kafka.consumer.security.protocol=PLAINTEXT

agent.sinks.DECISION-HBASE-DATA.type = hdfs
agent.sinks.DECISION-HBASE-DATA.hdfs.path=hdfs://ns1/flume/topic/DECISION-HBASE-DATA/pk_day=%Y%m%d/pk_hour=%H
agent.sinks.DECISION-HBASE-DATA.hdfs.filePrefix=%H
agent.sinks.DECISION-HBASE-DATA.hdfs.fileSuffix=.log.gz
agent.sinks.DECISION-HBASE-DATA.hdfs.rollInterval=3600
agent.sinks.DECISION-HBASE-DATA.hdfs.rollSize=128000000
agent.sinks.DECISION-HBASE-DATA.hdfs.hdfs.minBlockReplicas=1
agent.sinks.DECISION-HBASE-DATA.hdfs.rollCount=0
agent.sinks.DECISION-HBASE-DATA.hdfs.idleTimeout=0
agent.sinks.DECISION-HBASE-DATA.hdfs.batchSize=100
agent.sinks.DECISION-HBASE-DATA.hdfs.codeC=gzip
agent.sinks.DECISION-HBASE-DATA.hdfs.fileType=CompressedStream
agent.sinks.DECISION-HBASE-DATA.hdfs.callTimeout=200000
agent.sinks.DECISION-HBASE-DATA.hdfs.round=true
agent.sinks.DECISION-HBASE-DATA.hdfs.roundValue=1
agent.sinks.DECISION-HBASE-DATA.hdfs.useLocalTimeStamp=true

agent.sinks.DECISION-HBASE-DATA.hdfs.kerberosPrincipal=yarn
agent.sinks.DECISION-HBASE-DATA.hdfs.kerberosKeytab=/opt/yarn.keytab
agent.sinks.DECISION-HBASE-DATA.channel=DECISION-HBASE-DATA

flume 频繁产生小文件原因分析及解决办法:

agent.sinks.DECISION-HBASE-DATA.hdfs.hdfs.minBlockReplicas=1

您可以选择一种方式赞助本站