flume


[toc]

(1)熟悉flume的工作流程,source、channel、sink、拦截器、以及自定义source、自定义sink、自定义拦截器

(2)熟悉kafka的主要组件、kafka工作原理、kafka跟其他MQ对比的一个情况、kafka怎么保证三种消费状态

(3)结合kafka具体可能出现的网络瓶颈、zookeeper的GC情况

flume概述

什么是flume

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统;支持在日志系统中订制各种数据发送方,用于收集数据,同时,flume提供对数据进行简单处理,并写到各种数据接收方的能力;flume的数据流由事件(Event)贯穿始终,事件是flume的基本数据单位,这些事件由Agent外部的Source生成,当Source捕获事件后,会进行特定的格式化,然后Source会把事件推入Channel中;Channel将保存的事件直到Sink处理完该事件;Sink负责持久化或者把事件推向另一个Source。

配置文件

#flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.8.0_191
export HADOOP_HOME=/opt/modules/hadoop-2.6.0
export HBASE_HOME=/opt/modules/hbase-1.0.0-cdh5.4.0
#flume-conf.properties
agent1.sources = r1
agent1.channels = kafkaC hbaseC 
agent1.sinks =  kafkaSink hbaseSink

agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = bigdata-pro01.bigDAta.com
agent1.sources.r1.port = 5555
agent1.sources.r1.threads = 5
# flume-hbase
agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive = 20

agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
#flume-kafka
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive = 20

agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.brokerList = bigdata-pro01.bigDAta.com:9092,bigdata-pro02.bigDAta.com:9092,bigdata-pro03.bigDAta.com:9092
agent1.sinks.kafkaSink.topic = weblogs
agent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.bigDAta.com:2181,bigdata-pro02.bigDAta.com:2181,bigdata-pro03.bigDAta.com:2181
agent1.sinks.kafkaSink.requiredAcks = 1
agent1.sinks.kafkaSink.batchSize = 1
agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

编写启动flume服务程序的shell脚本

#/bin/bash
echo "flume-1 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

启动flume采集相关的所有服务

  • HDFS服务

  • Zookeeper服务

  • Hbase服务

  • Kafka服务

    flume与kafka之间版本的要求:flume-1.7.0要求kafka在9.0.x以上

  • Flume服务

出现的问题:


Manba_girl: Mamba_girl
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Mamba_girl !
  目录