[toc]
(1)熟悉flume的工作流程,source、channel、sink、拦截器、以及自定义source、自定义sink、自定义拦截器
(2)熟悉kafka的主要组件、kafka工作原理、kafka跟其他MQ对比的一个情况、kafka怎么保证三种消费状态
(3)结合kafka具体可能出现的网络瓶颈、zookeeper的GC情况
flume概述
什么是flume
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
;支持在日志系统中订制各种数据发送方
,用于收集数据,同时,flume提供对数据进行简单处理,并写到各种数据接收方的能力;flume的数据流由事件(Event)贯穿始终,事件是flume的基本数据单位,这些事件由Agent外部的Source生成,当Source捕获事件后,会进行特定的格式化,然后Source会把事件推入Channel中;Channel将保存的事件直到Sink处理完该事件;Sink负责持久化或者把事件推向另一个Source。
配置文件
#flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.8.0_191
export HADOOP_HOME=/opt/modules/hadoop-2.6.0
export HBASE_HOME=/opt/modules/hbase-1.0.0-cdh5.4.0
#flume-conf.properties
agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks = kafkaSink hbaseSink
agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = bigdata-pro01.bigDAta.com
agent1.sources.r1.port = 5555
agent1.sources.r1.threads = 5
# flume-hbase
agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive = 20
agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
#flume-kafka
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive = 20
agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.brokerList = bigdata-pro01.bigDAta.com:9092,bigdata-pro02.bigDAta.com:9092,bigdata-pro03.bigDAta.com:9092
agent1.sinks.kafkaSink.topic = weblogs
agent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.bigDAta.com:2181,bigdata-pro02.bigDAta.com:2181,bigdata-pro03.bigDAta.com:2181
agent1.sinks.kafkaSink.requiredAcks = 1
agent1.sinks.kafkaSink.batchSize = 1
agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
编写启动flume服务程序的shell脚本
#/bin/bash
echo "flume-1 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console
启动flume采集相关的所有服务
HDFS服务
Zookeeper服务
Hbase服务
Kafka服务
flume与kafka之间版本的要求:flume-1.7.0要求kafka在9.0.x以上
Flume服务