Flume高可用集群安装
Flume NG: Flume next generation, 即flume 1.x版本 多个agent连接到一个agent,此agent也就相当于collector,支持负载均衡
集群架构
角色 | 主机名 | ip |
---|---|---|
collector1 | hadoop100 | 8.8.8.100 |
collector2 | hadoop101 | 8.8.8.101 |
agent | hadoop102 | 8.8.8.102 |
agent | hadoop103 | 8.8.8.103 |
# 1. agent配置
两台agent上所有配置相同
#agent1 name
agent1.channels = channel1
agent1.sources = source1
agent1.sinks = sink1 sink2
#set gruop
agent1.sinkgroups = sinkgroup1
#set channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
agent1.sources.source1.channels = channel1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /soft/module/flume/logstestfile/test.txt
agent1.sources.source1.interceptors = i1 i2
agent1.sources.source1.interceptors.i1.type = static
agent1.sources.source1.interceptors.i1.key = Type
agent1.sources.source1.interceptors.i1.value = LOGIN
agent1.sources.source1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.type = avro
agent1.sinks.sink1.hostname = hadoop100
agent1.sinks.sink1.port = 52020
# set sink2
agent1.sinks.sink2.channel = channel1
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.hostname = hadoop101
agent1.sinks.sink2.port = 52020
#set sink group
agent1.sinkgroups.sinkgroup1.sinks = sink1 sink2
#set failover
agent1.sinkgroups.sinkgroup1.processor.type = failover
agent1.sinkgroups.sinkgroup1.processor.priority.sink1 = 10
agent1.sinkgroups.sinkgroup1.processor.priority.sink2 = 1
agent1.sinkgroups.sinkgroup1.processor.maxpenalty = 10000
# 2. collector配置
在两台collector上操作,除了修改hostname,其他配置项相同
#set Agent name
agent2.sources = source1
agent2.channels = channel1
agent2.sinks = sink1
#set channel
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 1000
agent2.channels.channel1.transactionCapacity = 100
# other node,nna to nns
agent2.sources.source1.type = avro
agent2.sources.source1.bind = hadoop100 #此处修改
agent2.sources.source1.port = 52020
#增加拦截器 所有events,增加头,类似json格式里的"headers":{" key":" value"}
agent2.sources.source1.interceptors = i1
agent2.sources.source1.interceptors.i1.type = static
agent2.sources.source1.interceptors.i1.key = Collector
agent2.sources.source1.interceptors.i1.value = hadoop100 #此处修改
agent2.sources.source1.channels = channel1
#set sink to hdfs
#agent2.sinks.sink1.type=logger
#指定sink类型
agent2.sinks.sink1.type=hdfs
agent2.sinks.sink1.hdfs.path=hdfs://mycluster/flume
agent2.sinks.sink1.hdfs.fileType=DataStream
agent2.sinks.sink1.hdfs.writeFormat=TEXT
#多久生成新的文件
agent2.sinks.sink1.hdfs.rollInterval=5
agent2.sinks.sink1.hdfs.rollSize=1000
agent2.sinks.sink1.hdfs.rollCount=0
#agent2.sinks.sink1.hdfs.rollCount=1
agent2.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
#agent2.sinks.sink1.hdfs.filePrefix=%Y-%m-%d/%H%M/%S
agent2.sinks.sink1.hdfs.fileSuffix=.txt
agent2.sinks.sink1.channel=channel1
# 3. 先启动所有server,再启动所有client,否则会报错。
[root@slave1] /usr/local/flume/conf$ ../bin/flume-ng agent -n agent1 -c ../conf -f flume-client.conf -Dflume.root.logger=DEBUG,console
flume-ng agent --conf conf --conf-file /soft/module/flume/conf/flumeHA_server.conf --name agent1 -Dflume.root.logger=INFO,console > /soft/module/flume/logs/flumeHA_server.log 2>&1 &
[root@slave3] /usr/local/flume/conf$ ../bin/flume-ng agent --conf ../conf --conf-file flume-server.conf --name agent2 -Dflume.root.logger=INFO,console
flume-ng agent --conf conf --conf-file /soft/module/flume/conf/flumeHA_client.conf --name agent2 -Dflume.root.logger=DEBUG,console > /soft/module/flume/logs/flumeHA_client.log 2>&1 &
# 4. 测试高可用功能
- agent1上创建源消息
echo "hello failover" >> test.txt
- 由于collector1的priority高,所以会收到,而collector2不会,查看控制台信息
- 停止collector1,在agent1上创建源消息
echo "hello failover1" >> test.txt
- 查看collector2控制台
# 错误: 配置文件没写对
error1: 因为网上的配置文件不全,所以sink部分是从之前的配置文件拷的, agent的名字忘记改了,所以引起了异常,修改之后重启了几次服务,就好使了