A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 不二晨 金牌黑马   /  2018-11-19 09:54  /  1129 人查看  /  4 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

flume代码示例



flume主要组成是agent,agent的组成分为Source(数据进入端口),Channel(数据管道),Sink(数据输出端)

# example.conf: A single-node Flume configuration
#对agent的组件其名称
# Name the components on this agent
//定义agent的名称,对agent中的三个组件进行命名
//sources,sinks,channels 后加S所以可以同时定义多个,来适应不同的业务场景
a1.sources = r1
a1.sinks = k1
a1.channels = c1

//设置source端 source的数据来源是什么? 根据不同的数据来源,设置source的内容
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
//设置sinks端 sink端输出的数据类型,根据不同的业务场景,来设置
# Describe the sink
a1.sinks.k1.type = logger
//设置channels channels存在的类型,大小为1000,每次传输的大小为100
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
//规定source、channel对应关系和channnel、sink对应的关系
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
模型简介

不同的模型对应不同的业务逻辑



flume与flume之间连接使用的数据类型是AVRO
为了将数据通过多个代理或跳数据流,前一代理和当前跳转源的接收器需要是 avro 类型,该接收器指向主机名(或 IP 地址)和源的端口。
多个flume连接
代码示例:在两台机器上

//从第一台webService上获取数据,传输送到第二台机器上,写入到第二台机器的磁盘上

//第一台上的配置文件
//对agent组件进行命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
//source端的配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/bigdata/webapp.log
# Describe the sink
//sink短的配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata
a1.sinks.k1.port = 8888
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
//channel的配置
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


//第二台的配置文件
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata
a2.sources.r1.port = 8888
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata:9000/logDir
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1


多个flume汇聚到一个flume在输出

A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.
日志集合中一个非常常见的场景是大量的日志生成客户端将数据发送给附属于存储子系统的一些消费者代理。 例如,从数百个网络服务器收集的日志发送给十几个写给 HDFS 集群的代理

This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination
通过使用 avro 接收器配置一些一级代理可以在 Flume 中实现,这些代理都指向单个代理的 avro 源(在这种情况下,你可以使用节约源 / 汇 / 客户端)。 第二层代理的这个源将接收到的事件合并到一个通道中,这个通道被一个接收器消耗到它的最终目的地


Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.
A fan-out flow using a (multiplexing) channel selector
The above example shows a source from agent “foo” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called “txnType” is set to “customer”, then it should go to channel1 and channel3, if it’s “vendor” then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file.

支持将事件流向一个或多个目的地。 这是通过定义一个可以复制或选择性地将事件路由到一个或多个通道的流多路器来实现的。
一种使用(多路复用)通道选择器的扇形流
上面的例子显示了来自"foo"的源代码,将流程分散到三个不同的通道。 这个风扇可以复制或复用。 在复制流程的情况下,每个事件被发送到所有三个通道。 对于多路复用情况,当事件的属性与预配置值匹配时,将事件传递给可用的子集通道。 例如,如果一个被称为"txnType"的事件属性设置为"customer",那么它应该被引导到信道1和信道3,如果它是"供应商",那么它应该被引导到信道2,否则是信道3。 映射可以设置在代理的配置文件。
flume中有多个sink输出到不同的位置

第一台机器收集flume日志信息

第二台机器实时显示第一台的日志信息

第三台机器将日志信息保存到hdfs上

Flume通过文件来读取数据
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#source端的根据数据类型来确定type
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/app/hive-0.13.1-cdh5.3.6/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname =hh4
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1





# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hh4
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hh4
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1




# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hh4
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://hh4:8020/flume3/%Y%m%d/%H

#上传文件的前缀
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照时间滚动文件夹
a3.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是 128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

---------------------
【转载】
作者:qq_41330277
原文:https://blog.csdn.net/qq_41330277/article/details/84072908


4 个回复

倒序浏览
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马