尚硅谷大數(shù)據(jù)技術(shù)之Kafka第6章 kafka Streams

6.1 概述

6.1.1 Kafka Streams

Kafka Streams。Apache Kafka開(kāi)源項(xiàng)目的一個(gè)組成部分。是一個(gè)功能強(qiáng)大,易于使用的庫(kù)。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯(cuò)的應(yīng)用程序。

6.1.2 Kafka Streams特點(diǎn)

1)功能強(qiáng)大?

高擴(kuò)展性,彈性,容錯(cuò)?

2)輕量級(jí)?

無(wú)需專(zhuān)門(mén)的集群?

一個(gè)庫(kù),而不是框架

3)完全集成?

100%的Kafka 0.10.0版本兼容

易于集成到現(xiàn)有的應(yīng)用程序?

4)實(shí)時(shí)性

毫秒級(jí)延遲?

并非微批處理?

窗口允許亂序數(shù)據(jù)?

允許遲到數(shù)據(jù)

6.1.3 為什么要有Kafka Stream

當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開(kāi)源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級(jí)別的處理能力,當(dāng)前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計(jì)算,SQL處理等集成,功能強(qiáng)大,對(duì)于熟悉其它Spark應(yīng)用開(kāi)發(fā)的用戶而言使用門(mén)檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark與Apache Storm擁用如此多的優(yōu)勢(shì),那為何還需要Kafka Stream呢?主要有如下原因。

第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個(gè)基于Kafka的流式處理類(lèi)庫(kù)??蚣芤箝_(kāi)發(fā)者按照特定的方式去開(kāi)發(fā)邏輯部分,供框架調(diào)用。開(kāi)發(fā)者很難了解框架的具體運(yùn)行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類(lèi)庫(kù),直接提供具體的類(lèi)給開(kāi)發(fā)者調(diào)用,整個(gè)應(yīng)用的運(yùn)行方式主要由開(kāi)發(fā)者控制,方便使用和調(diào)試。

第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對(duì)復(fù)雜。而Kafka Stream作為類(lèi)庫(kù),可以非常方便的嵌入應(yīng)用程序中,它對(duì)應(yīng)用的打包和部署基本沒(méi)有任何要求。

第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專(zhuān)門(mén)的kafka-spout,而Spark也提供專(zhuān)門(mén)的spark-streaming-kafka模塊。事實(shí)上,Kafka基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時(shí)使用Kafka Stream的成本非常低。

第四,使用Storm或Spark Streaming時(shí),需要為框架本身的進(jìn)程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對(duì)于應(yīng)用實(shí)例而言,框架本身也會(huì)占用部分資源,如Spark Streaming需要為shuffle和storage預(yù)留內(nèi)存。但是Kafka作為類(lèi)庫(kù)不占用系統(tǒng)資源。

第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動(dòng)部署和滾動(dòng)升級(jí)以及重新計(jì)算的能力。

第六,由于Kafka Consumer Rebalance機(jī)制,Kafka Stream可以在線動(dòng)態(tài)調(diào)整并行度。

6.2 Kafka Stream數(shù)據(jù)清洗案例

0)需求:

實(shí)時(shí)處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”atguigu>>>ximenqing”,最終處理成“ximenqing”

1)需求分析:

2)案例實(shí)操

(1)創(chuàng)建一個(gè)工程,并添加jar包

(2)創(chuàng)建主類(lèi)

package com.atguigu.kafka.stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import org.apache.kafka.streams.processor.TopologyBuilder;

 

public class Application {

 

public static void main(String[] args) {

 

// 定義輸入的topic

????????String from = "first";

????????// 定義輸出的topic

????????String to = "second";

 

????????// 設(shè)置參數(shù)

????????Properties settings = new Properties();

????????settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");

????????settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

 

????????StreamsConfig config = new StreamsConfig(settings);

 

????????// 構(gòu)建拓?fù)?/p>

????????TopologyBuilder builder = new TopologyBuilder();

 

????????builder.addSource("SOURCE", from)

???????????????.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

 

@Override

public Processor<byte[], byte[]> get() {

// 具體分析處理

return new LogProcessor();

}

}, "SOURCE")

????????????????.addSink("SINK", to, "PROCESS");

 

????????// 創(chuàng)建kafka stream

????????KafkaStreams streams = new KafkaStreams(builder, config);

????????streams.start();

}

}

(3)具體業(yè)務(wù)處理

package com.atguigu.kafka.stream;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

 

public class LogProcessor implements Processor<byte[], byte[]> {

private ProcessorContext context;

@Override

public void init(ProcessorContext context) {

this.context = context;

}

 

@Override

public void process(byte[] key, byte[] value) {

String input = new String(value);

// 如果包含“>>>”則只保留該標(biāo)記后面的內(nèi)容

if (input.contains(">>>")) {

input = input.split(">>>")[1].trim();

// 輸出到下一個(gè)topic

context.forward("logProcessor".getBytes(), input.getBytes());

}else{

context.forward("logProcessor".getBytes(), input.getBytes());

}

}

 

@Override

public void punctuate(long timestamp) {

}

 

@Override

public void close() {

}

}

(4)運(yùn)行程序

(5)在hadoop104上啟動(dòng)生產(chǎn)者

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first

>hello>>>world

>h>>>atguigu

>hahaha

(6)在hadoop103上啟動(dòng)消費(fèi)者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic second

world

atguigu

hahaha