博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
4、spark streaming+kafka
阅读量:5220 次
发布时间:2019-06-14

本文共 12482 字,大约阅读时间需要 41 分钟。

一、Receiver模式

1、 receiver模式原理图

在SparkStreaming程序运行起来后,Executor中会有receiver tasks接收kafka推送过来的数据。数据会被持久化,默认级别为MEMORY_AND_DISK_SER_2,这个级别也可以修改。receiver task对接收过来的数据进行存储和备份,这个过程会有节点之间的数据传输。备份完成后去zookeeper中更新消费偏移量,然后向Driver中的receiver tracker汇报数据的位置。最后Driver根据数据本地化将task分发到不同节点上执行。

2、receiver模式中存在的问题及解决

当Driver进程挂掉后,Driver下的Executor都会被杀掉,当更新完zookeeper消费偏移量的时候,Driver如果挂掉了,就会存在找不到数据的问题,相当于丢失数据。如何解决这个问题?开启WAL(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到HDFS上一份(我们需要将接收来的数据的持久化级别降级到MEMORY_AND_DISK),这样就能保证数据的安全性。不过,因为写HDFS比较消耗性能,要在备份完数据之后才能进行更新zookeeper以及汇报位置等,这样会增加job的执行时间,这样对于任务的执行提高了延迟度。

3、receiver模式描述

1.kafka有两种消费者api:    1.High Level Consumer APl消费者不能做到自己去维护消费者offset,使用高级api时,不关心数据丢失。    kafka+SparkStreaming Receiver模式就是High Level Consumer API实现的。    2.Simple Consumer APl消费者可以自己管理offset.    2.过程:    kafka+SparkStreaming receiver 模式接受数据,当向zookeeper中更新完offset后,Driver如果挂掉,Driver 下的Executor 会被kill,会造成丢失数据。    怎么解决?    开启WAL(Write Ahead Log)预写日志机利,将数据备份到HDFS中一份,再去更新zookeeper offset,如果开启了WAL机利,接收数据的存储级别要降级,    去掉"2”开启WAL机利会加大application处理的时间。    3.receiver模式依赖zookeeper管理offset.4.receiver模式的并行度?由spark.streaming.blockInterval=200ms决定。    receiver 模式接受数据时,每隔spark.streaming.blockInterval将数据落地一个block,假设batchlnterval=5s,一个batch内生成25个block。    batch-block,batch封装到RDD中,RDD-partition,这里的block对应的就是RDD中的partition。    如何提高receiver模式的并行度?    在batchlnterval一定情况下,减少spark.streaming.blocklnterval 参数值,增大生成的DStream中RDD的partition个数,    但是建议spark.streaming.blocklnterval最低不能低于50ms.

3、Receive模式Wordcount案例

package cn.spark.study.streaming;import java.util.Arrays;import java.util.HashMap;import java.util.Map;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.KafkaUtils;import scala.Tuple2;/** * 基于Kafka receiver方式的实时wordcount程序 * @author Administrator * */public class KafkaReceiverWordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setMaster("local[2]")                .setAppName("KafkaWordCount");          JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));                // 使用KafkaUtils.createStream()方法,创建针对Kafka的输入数据流        Map
topicThreadMap = new HashMap
(); // 使用多少个线程去拉取topic的数据 topicThreadMap.put("WordCount", 1); // 这里接收的四个参数;第一个:streamingContext // 第二个:ZK quorum; 第三个:consumer group id 可以自己写; // 第四个:per-topic number of Kafka partitions to consume JavaPairReceiverInputDStream
lines = KafkaUtils.createStream( jssc, "192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181", "DefaultConsumerGroup", topicThreadMap); // wordcount逻辑 JavaDStream
words = lines.flatMap( new FlatMapFunction
, String>() { private static final long serialVersionUID = 1L; @Override public Iterable
call(Tuple2
tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); } }); JavaPairDStream
pairs = words.mapToPair( new PairFunction
() { private static final long serialVersionUID = 1L; @Override public Tuple2
call(String word) throws Exception { return new Tuple2
(word, 1); } }); JavaPairDStream
wordCounts = pairs.reduceByKey( new Function2
() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }##eclipse中运行程序##新建一个topic[root@spark1 kafka]# bin/kafka-topics.sh --zookeeper 192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181 --topic WordCount --replication-factor 1 --partitions 1 --create##启动生产者,然后可以输入一些数据,观察程序端的输出统计[root@spark1 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.1.135:9092,192.168.1.136:9092,192.168.1.137:9092 --topic WordCount

二、Driect模式

1、driect模式原理图

2、Direct模式理解

Direct 模式采用的是kafka的Simple Consumer APl。Driect模式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。消费者偏移量也不是用zookeeper来管理,而是SparkStreaming内部对消费者偏移量自动来维护,默认消费偏移量是在内存中,当然如果设置了checkpoint目录,那么消费偏移量也会保存在checkpoint中。当然也可以实现用zookeeper来管理。Direct模式生成的DStream中的RDD的并行度是与读取的topic中的partition个数一致。
Direct模式最好指定checkpoint

3、Direct模式Wordcount案例

package cn.spark.study.streaming;import java.util.Arrays;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import kafka.serializer.StringDecoder;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaPairInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.KafkaUtils;import scala.Tuple2;/** * 基于Kafka Direct方式的实时wordcount程序 * @author Administrator * */public class KafkaDirectWordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setMaster("local[2]")                .setAppName("KafkaDirectWordCount");          JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));                // 首先,要创建一份kafka参数map        Map
kafkaParams = new HashMap
(); kafkaParams.put("metadata.broker.list", "192.168.1.135:9092,192.168.1.136:9092,192.168.1.137:9092"); // 然后,要创建一个set,里面放入,你要读取的topic // 这个,就是我们所说的,它自己给你做的很好,可以并行读取多个topic Set
topics = new HashSet
(); topics.add("WordCount"); // 创建输入DStream JavaPairInputDStream
lines = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); // 执行wordcount操作 JavaDStream
words = lines.flatMap( new FlatMapFunction
, String>() { private static final long serialVersionUID = 1L; @Override public Iterable
call(Tuple2
tuple) throws Exception { return Arrays.asList(tuple._2.split(" ")); } }); JavaPairDStream
pairs = words.mapToPair( new PairFunction
() { private static final long serialVersionUID = 1L; @Override public Tuple2
call(String word) throws Exception { return new Tuple2
(word, 1); } }); JavaPairDStream
wordCounts = pairs.reduceByKey( new Function2
() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }##检查运行,和receive模式类似

三、手动管理offset

1、手动管理offset

在zookeeper中自己管理offset;使用mysql管理;使用HBase管理;

2、代码

package com.manage;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.atomic.AtomicReference;import com.google.common.collect.ImmutableMap;import com.manage.getOffset.GetTopicOffsetFromKafkaBroker;import com.manage.getOffset.GetTopicOffsetFromZookeeper;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.RetryUntilElapsed;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;import org.apache.spark.streaming.kafka.HasOffsetRanges;import org.apache.spark.streaming.kafka.KafkaUtils;import org.apache.spark.streaming.kafka.OffsetRange;import kafka.cluster.Broker;import com.fasterxml.jackson.databind.ObjectMapper;import kafka.api.PartitionOffsetRequestInfo;import kafka.common.TopicAndPartition;import kafka.javaapi.OffsetRequest;import kafka.javaapi.OffsetResponse;import kafka.javaapi.PartitionMetadata;import kafka.javaapi.TopicMetadata;import kafka.javaapi.TopicMetadataRequest;import kafka.javaapi.TopicMetadataResponse;import kafka.javaapi.consumer.SimpleConsumer;import kafka.message.MessageAndMetadata;import kafka.serializer.StringDecoder;import scala.Tuple2;public class UseZookeeperManageOffset {    /**     * 使用log4j打印日志,“UseZookeeper.class” 设置日志的产生类     */    static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);            public static void main(String[] args) {        /**         * 加载log4j的配置文件,方便打印日志         */        ProjectUtil.LoadLogConfig();        logger.info("project is starting...");                /**         * 从kafka集群中得到topic每个分区中生产消息的最大偏移量位置         */        Map
topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic"); /** * 从zookeeper中获取当前topic每个分区 consumer 消费的offset位置 */ Map
consumerOffsets = GetTopicOffsetFromZookeeper.getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic"); /** * 合并以上得到的两个offset , * 思路是: * 如果zookeeper中读取到consumer的消费者偏移量,那么就zookeeper中当前的offset为准。 * 否则,如果在zookeeper中读取不到当前消费者组消费当前topic的offset,就是当前消费者组第一次消费当前的topic, * offset设置为topic中消息的最大位置。 */ if(null!=consumerOffsets && consumerOffsets.size()>0){ topicOffsets.putAll(consumerOffsets); } /** * 如果将下面的代码解开,是将topicOffset 中当前topic对应的每个partition中消费的消息设置为0,就是从头开始。 */// for(Map.Entry
item:topicOffsets.entrySet()){// item.setValue(0l);// } /** * 构建SparkStreaming程序,从当前的offset消费消息 */ JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"zhy"); jsc.start(); jsc.awaitTermination(); jsc.close(); }}

package com.manage;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.log4j.Logger;import org.apache.log4j.PropertyConfigurator;public class ProjectUtil {    /**     * 使用log4j配置打印日志     */    static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);    /**     * 加载配置的log4j.properties,默认读取的路径在src下,如果将log4j.properties放在别的路径中要手动加载     */    public static void LoadLogConfig() {        PropertyConfigurator.configure("d:/eclipse4.7WS/SparkStreaming_Kafka_Manage/resource/log4j.properties");     }        /**     * 加载配置文件     * 需要将放config.properties的目录设置成资源目录     * @return     */    public static Properties loadProperties() {        Properties props = new Properties();        InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.properties");        if(null != inputStream) {            try {                props.load(inputStream);            } catch (IOException e) {                logger.error(String.format("Config.properties file not found in the classpath"));            }        }        return props;    }        public static void main(String[] args) {        Properties props = loadProperties();        String value = props.getProperty("hello");        System.out.println(value);    }}

转载于:https://www.cnblogs.com/weiyiming007/p/11401591.html

你可能感兴趣的文章
Python正则表达式操作指南(转)
查看>>
Android --时间控件的使用
查看>>
html-列表-3
查看>>
Servlet(5)—ServletRequest接口和ServletResponse接口
查看>>
读心球游戏
查看>>
[iOS]C语言技术视频-16-指针变量高级用法(堆栈内存)
查看>>
EXCEPTION-TOMCAT
查看>>
基于Vue框架开发的仿饿了么前端小应用
查看>>
Python中整数和浮点数
查看>>
吉首大学校赛 K 白山茶与红玫瑰 (线段树区间操作)
查看>>
eclipse cdt运行c程序报错“launch failed,binary not found”
查看>>
Unity3d fbx纹理不显示 原因
查看>>
修改系统默认的NSButton的按下变灰
查看>>
rsync
查看>>
c# 压缩文件
查看>>
H5WebSocket消息推送
查看>>
Linux 实现校园网认证
查看>>
mysql配置
查看>>
T-SQL之触发器
查看>>
动态规划 -- “最”系列题目
查看>>