概述
在大数据中,使用了大量的数据.关于数据,我们有两个主要挑战.第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据.为了克服这些挑战,您必须需要一个消息系统.
Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。
什么是消息系统
消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它,分布式消息传递基于可靠消息队列的概念,消息再客户端应用程序和消息传递系统之间异步排队,有两种类型的消息模式可用,一种是点对点,另外一种是发布-订阅消息系统,大多数消息系统遵循pub-sub
点对点消息系统:
在点对点系统中,消息被保留在队列中,一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费,一旦消费者读取队列中的消息,它就从该队列中消失,概系统的典型示例就是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器可以同时工作,如下图所示:
发布-订阅消息系统
在发布-订阅系统中,消息被保留在主题中,与点对点系统不同,消费组可以订阅一个或多个主题并使用该主题中的所有消息,在发布-订阅系统中,消息生产者称为发布者,消息使用者称为订阅者,一个现实生活的例子就是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获的他们订阅的频道时可用。
什么是Kafka?
Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
好处
- 可靠性 : Kafak是分布式,分区,复制和容错的
- 可扩展性: Kafka消息系统轻松缩放,无需停机。
- 耐用性: Kafka使用分布式提交日志,这意味着消息会尽可能地保留在磁盘上,因此它是持久的。
- 性能: Kafka对于发布和订阅消息都具有高吞吐量,即使存储了许多TB的消息,它也能保持稳定的性能。
使用案例
- 指标: Kafka通常用于操作监控数据,这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
- 日志聚合解决方案: Kafka可跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
- 流处理: 流行的框架(如Storm、SparkStreaming、Flink)从主题中读取数据,对其进行处理,并将处理后的数据写入新的主题,供用户和应用程序使用,Kafka的强耐久性在流处理的上下文中也非常有用
Kafka集群架构
组件 | Broker(代理) | ZooKeeper | Producers(生产者) | Consumers(消费者) |
---|---|---|---|---|
说明 | Kafka集群通常由多个代理组成以保持负载平衡。 Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka代理实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。Kafka经纪人领导选举可以由ZooKeeper完成。 | ZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知,然后产品和消费者采取决定并开始与某些其他代理协调他们的任务。 | 生产者将数据推送给经纪人。 当新代理启动时,所有生产者搜索它并自动向该新代理发送消息。 Kafka生产者不等待来自代理的确认,并且发送消息的速度与代理可以处理的一样快。 | 因为Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。 消费者向代理发出异步拉取请求,以具有准备好消耗的字节缓冲区。 消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。 消费者偏移值由ZooKeeper通知。 |
Kafka工作流程
Kafka只是分为一个或多个分区的主题的集合。 Kafka分区是消息的线性有序序列,其中每个消息由它们的索引(称为偏移)来标识。 Kafka集群中的所有数据都是不相连的分区联合。 传入消息写在分区的末尾,消息由消费者顺序读取。 通过将消息复制到不同的代理提供持久性。
Kafka以快速,可靠,持久,容错和零停机的方式提供基于pub-sub和队列的消息系统。 在这两种情况下,生产者只需将消息发送到主题,消费者可以根据自己的需要选择任何一种类型的消息传递系统。 让我们按照下一节中的步骤来了解消费者如何选择他们选择的消息系统。
发布 - 订阅消息的工作流程
以下是Pub-Sub消息的逐步工作流程 -
- 生产者定期向主题发送消息。
- Kafka代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息。
- 消费者订阅特定主题。
- 一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper系综中。
- 消费者将定期请求Kafka(如100 Ms)新消息。
- 一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者。
- 消费者将收到消息并进行处理。
- 一旦消息被处理,消费者将向Kafka代理发送确认。
- 一旦Kafka收到确认,它将偏移更改为新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
- 以上流程将重复,直到消费者停止请求。
- 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息
队列消息/用户组的工作流
在队列消息传递系统而不是单个消费者中,具有相同组ID 的一组消费者将订阅主题。 简单来说,订阅具有相同 Group ID 的主题的消费者被认为是单个组,并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。
- 生产者以固定间隔向某个主题发送消息。
- Kafka存储在为该特定主题配置的分区中的所有消息,类似于前面的方案。
- 单个消费者订阅特定主题,假设 Topic-01 为 Group ID 为 Group-1 。
- Kafka以与发布 - 订阅消息相同的方式与消费者交互,直到新消费者以相同的组ID 订阅相同主题 Topic-01 1 。
- 一旦新消费者到达,Kafka将其操作切换到共享模式,并在两个消费者之间共享数据。 此共享将继续,直到用户数达到为该特定主题配置的分区数。
- 一旦消费者的数量超过分区的数量,新消费者将不会接收任何进一步的消息,直到现有消费者取消订阅任何一个消费者。 出现这种情况是因为Kafka中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待。
- 此功能也称为使用者组。 同样,Kafka将以非常简单和高效的方式提供两个系统中最好的。
ZooKeeper的作用
Apache Kafka的一个关键依赖是Apache Zookeeper,它是一个分布式配置和同步服务。 Zookeeper是Kafka代理和消费者之间的协调接口。 Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。
由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态,一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。
实战
KafkaProducer API
生产者示例代码
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducer {
public static void main(String[] args) throws Exception{
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", “localhost:9092");
props.put("acks", “all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully");
producer.close();
}
}
参数说明
S.No | 配置设置和说明 |
---|---|
client.id | 标识生产者应用程序 |
producer.type | 同步或异步 |
acks | acks配置控制生产者请求下的标准是完全的。 |
重试 | 如果生产者请求失败,则使用特定值自动重试。 |
bootstrapping | 代理列表。 |
linger.ms | 如果你想减少请求的数量,你可以将linger.ms设置为大于某个值的东西。 |
key.serializer | 序列化器接口的键。 |
value.serializer | 值。 |
batch.siz | 缓冲区大小。 |
buffer.memory | 控制生产者可用于缓冲的存储器的总量。 |
Kafka 消费者组示例
消费者群体
- 消费者可以使用相同的 group.id 加入群组
- 一个组的最大并行度是组中的消费者数量←不是分区。
- Kafka将主题的分区分配给组中的使用者,以便每个分区仅由组中的一个使用者使用。
- Kafka保证消息只能被组中的一个消费者读取。
- 消费者可以按照消息存储在日志中的顺序查看消息。
重新平衡消费者
添加更多进程/线程将导致Kafka重新平衡。 如果任何消费者或代理无法向ZooKeeper发送心跳,则可以通过Kafka集群重新配置。 在此重新平衡期间,Kafka将分配可用分区到可用线程,可能将分区移动到另一个进程。
消费者示例代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class KafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", args[0]);
props.setProperty("group.id", UUID.randomUUID().toString());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.setProperty("auto.offset.reset", "earliest");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
List<String> topics = Collections.singletonList(args[1]);
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("offset = %s, key = %s, value = %s", record.offset(), record.key(), record.value()));
}
}
}
}
Kafka 与SparkStreaming集成
Spark streaming接收Kafka数据
用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:
- 利用Receiver接收数据
- 直接从kafka读取数据。
基于Receiver的方式
这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。如下图:
注意的点:
- 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
- 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
- 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
遇到的坑,kafka数据量一天150亿左右,生产者每秒生产250万条数据,采用高阶api消费,消费速率提不上去,遇到了消费瓶颈。采用低阶api提供并行拉取速度,解决了消费瓶颈问题。
接读取方式
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图:
这种方法相较于Receiver方式的优势在于:
- 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
- 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
- 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。
不同于Receiver的方式,是从Zookeeper中读取offset值,那么自然zookeeper就保存了当前消费的offset值,那么如果重新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,我们是直接从kafka来读数据,那么offset需要自己记录,可以利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里我们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类
SparkStreaming 低阶API消费Kafka
public void createInputDStream(JavaStreamingContext jsc, String topics) throws SparkException {
HashSet<String> topicSet = new HashSet<>();
topicSet.add(topics);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", args[0]);
kafkaParams.put("group.id", args[1];
kafkaParams.put("zookeeper.connection.timeout.ms", "10000");
kafkaParams.put("auto.offset.reset", "largest");
JavaKafkaManager javaKafkaManager = new JavaKafkaManager(kafkaParams);
JavaInputDStream<String> records = javaKafkaManager.createDirectStream(jsc,kafkaParams,topicSet);
records.transform((Function<JavaRDD<String>, JavaRDD<String>>) v1 -> v1).foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
if (!rdd.isEmpty()){
rdd.foreach((VoidFunction<String>) r -> {
});
javaKafkaManager.updateZKOffsets(rdd);
}
});
}
使用高阶API消费Kafka代码示例
public void createInputDStream(JavaStreamingContext jsc, String topics) throws SparkException{
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put(topics, ConfigUtil.getInt("kafka.numThreads"));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", args[0]);
kafkaParams.put("group.id", args[1]);
kafkaParams.put("zookeeper.connection.timeout.ms", "10000");
kafkaParams.put("auto.offset.reset", "largest");
//消费kafka消息
JavaPairReceiverInputDStream<String, String> records = KafkaUtils.createStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
}
Kafka 与Flink集成
public class StreamDemo {
private static final Logger logger = LoggerFactory.getLogger(StreamDemo.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final Properties properties = new Properties();
properties.setProperty("bootstrap.servers", PropertiesUtil.get("kafka.bootstrap.servers"));
properties.setProperty("group.id", PropertiesUtil.get("kafka.group.id"));
properties.setProperty("auto.offset.reset", PropertiesUtil.get("kafka.auto.offset.reset"));
// 序列化方式
DeserializationSchema<SkynetLogVO> deserializer = new LogDeserializationSchema();
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(Constant.KAFKA_TOPIC, deserializer, properties);
escountConsumer.setStartFromGroupOffsets();
DataStream<String> esCountStream = env.addSource(escountConsumer);
}
####