【郑州校区】大数据离线阶段Day4之Flume简单案例
1. 采集目录到HDFS采集需求:服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去 根据需求,首先定义以下3大要素 l 采集源,即source——监控文件目录 : spooldir l 下沉目标,即sink——HDFS文件系统 : hdfs sink l source和sink之间的传递通道——channel,可用file channel 也可以用内存channel 配置文件编写: # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##注意:不能往监控目中重复丢同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
Channel参数解释: capacity:默认该通道中最大的可以存储的event数量 trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量 2. 采集文件到HDFS采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs 根据需求,首先定义以下3大要素 l 采集源,即source——监控文件内容更新 : exec ‘tail -F file’ l 下沉目标,即sink——HDFS文件系统 : hdfs sink l Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel 配置文件编写: # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/test.log a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
参数解析: · rollInterval 默认值:30 hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒; 如果设置成0,则表示不根据时间来滚动文件; 注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据; · rollSize 默认值:1024 当临时文件达到该大小(单位:bytes)时,滚动成目标文件; 如果设置成0,则表示不根据临时文件大小来滚动文件; · rollCount 默认值:10 当events数据达到该数量时候,将临时文件滚动成目标文件; 如果设置成0,则表示不根据events数据来滚动文件; · round 默认值:false 是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。 · roundValue 默认值:1 时间上进行“舍弃”的值; · roundUnit 默认值:seconds 时间上进行“舍弃”的单位,包含:second,minute,hour 示例: a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute 当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为: /flume/events/20151016/17:30/00 因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。 传智播客·黑马程序员郑州校区地址 河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
|