尚硅谷大數(shù)據(jù)技術(shù)之電信客服

3.2?數(shù)據(jù)采集/消費(fèi)(存儲(chǔ))

歡迎來(lái)到數(shù)據(jù)采集模塊(消費(fèi)),在企業(yè)中你要清楚流式數(shù)據(jù)采集框架flume和kafka的定位是什么。我們?cè)诖诵枰獙?shí)時(shí)數(shù)據(jù)通過(guò)flume采集到kafka然后供給給hbase消費(fèi)。

flume:cloudera公司研發(fā)

適合下游數(shù)據(jù)消費(fèi)者不多的情況;

適合數(shù)據(jù)安全性要求不高的操作;

適合與Hadoop生態(tài)圈對(duì)接的操作。

kafka:linkedin公司研發(fā)

適合數(shù)據(jù)下游消費(fèi)眾多的情況;

適合數(shù)據(jù)安全性要求較高的操作(支持replication);

因此我們常用的一種模型是:

線上數(shù)據(jù) --> flume?--> kafka?--> flume(根據(jù)情景增刪該流程) -->?HDFS

消費(fèi)存儲(chǔ)模塊流程如圖2所示:

圖2 消費(fèi)存儲(chǔ)模塊流程圖

3.2.1?數(shù)據(jù)采集

思路:

  1. a) 配置kafka,啟動(dòng)zookeeper和kafka集群;
  2. b) 創(chuàng)建kafka主題;
  3. c) 啟動(dòng)kafka控制臺(tái)消費(fèi)者(此消費(fèi)者只用于測(cè)試使用);
  4. d)配置flume,監(jiān)控日志文件;
  5. e) 啟動(dòng)flume監(jiān)控任務(wù);
  6. f)運(yùn)行日志生產(chǎn)腳本;
  7. g)觀察測(cè)試。

1)啟動(dòng)zookeeper,kafka集群

$/opt/module/kafka/bin/kafka-server-start.sh?/opt/module/kafka/config/server.properties

2)創(chuàng)建kafka主題

$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic calllog --create --replication-factor 1 --partitions 3

檢查一下是否創(chuàng)建主題成功:

$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

3)啟動(dòng)kafka控制臺(tái)消費(fèi)者,等待flume信息的輸入

$ /opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 -topic?calllog --from-beginning

4)配置flume(flume-kafka.conf)

# define

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F -c +0 /home/atguigu/call/calllog.csv

a1.sources.r1.shell = /bin/bash -c

 

# sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sinks.k1.kafka.topic = calllog

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

 

# channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

5)啟動(dòng)flume

$ /opt/module/flume/bin/flume-ng agent --conf /opt/module/flume/conf/ --name a1 --conf-file?/home/atguigu/calllog/flume2kafka.conf

6)運(yùn)行生產(chǎn)日志的任務(wù)腳本,觀察kafka控制臺(tái)消費(fèi)者是否成功顯示產(chǎn)生的數(shù)據(jù)

$ sh /home/atguigu/calllog/productlog.sh