Pulsar 简介
因为工作关系,会接触到不同类型的MQ。前段时间,调研了 Yahoo 开源的分布式消息中间件 - Pulsar。根据 Yahoo 的说法,Pulsar 诞生在 Kafka 之前,这就是他们为什么不用 Kafka 的原因。下面就介绍下 Pulsar 的各个方面。
主要特性
- 可水平扩展的集群(得益于 BookKeeper 的特性)
- 强顺序和一致性保证
- 低延迟
- 负载均衡
- 多租户/授权验证/配额
- 跨地理位置的复制
快速入门
Topic
Pulsar Topic 有点特别,格式如下:
1
persistent://my-property/us-west/my-namespace/my-topic
persistent
持久化的,目前唯一支持的存储类型,官方说未来可能支持 non-persistent 类型;my-property
Property 定义一个租户,可以理解为一个用户;us-west
Cluster,topic 所在的机房,地理位置;my-namespace
Namespace 定义多个同组的 topics,可以理解为一个应用;my-topic
Topic,最终的 Topic 名称,在代码里面也可能被称为 Destination。总结起来可以这么看:
persistent://某个用户/地理机房/该用户的应用或业务/topic 名称
订阅模式
Pulsar 支持三种订阅模式:
- Exclusive:无论有多少实例,只会有一个实例能消费,可以保证消息顺序;
- Shared:多个实例通过轮询方式各获得部分消息,不能保证消息顺序;
- Failover:类似于 Exclusive 方式,始终只会有一个实例消费,但其他实例在该实例挂掉的时候可以补上去,也可以保证消息顺序;
实际上,Pulsar 也支持 Listener 模式,不过二者互斥,使用 Subscribe 就不能使用 Listener 了。
Demo
直接看官方给的例子,非常简单,Pulsar 的 API 的确非常容易理解。目前官方提供 Java 版本,但其他语言的客户端也可以使用,Pulsar 有个 Websocket 模块负责和其他语言交互。在生产消费之前,你可以先起一个单例 server:
1
bin/pulsar standalone
Producer
1
2
3
4
5
6
7
8
9
10
11PulsarClient client = PulsarClient.create("http://localhost:8080");
Producer producer = client.createProducer(
"persistent://sample/standalone/ns1/my-topic");
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++) {
producer.send("my-message".getBytes());
}
client.close();Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17PulsarClient client = PulsarClient.create("http://localhost:8080");
Consumer consumer = client.subscribe(
"persistent://sample/standalone/ns1/my-topic",
"my-subscribtion-name");
while (true) {
// Wait for a message
Message msg = consumer.receive();
System.out.println("Received message: " + msg.getData());
// Acknowledge the message so that it can be deleted by broker
consumer.acknowledge(msg);
}
client.close();
架构设计
从上往下看,
Producer/Consumer
Java 客户端
Service discovery
一个 HTTP 服务,提供 REST 接口,客户端先发送 HTTP 请求获取可用的服务器地址;
Dispatcher
一个异步的 TCP 服务,派发调度消息;
Load Balancer
负载均衡,通过CPU、Load等物理指标,以及 Pulsar Server 的服务指标,判断 Server 实例的负载情况,可用于自动将负载低的 Server 分配给某些 Topic;
Managed Ledger/BK Client/Cache
Pulsar 使用了 Apache BookKeeper 作为它的存储服务,因此在 BK 之上开发了 Managed Ledger 这样一层 lib,封装了很多对 BK Client 的操作。例如 Managed Ledger 里面使用了多个 Ledgers,因为一个 Ledger 只能由单线程写,且在崩溃后为只读状态,必须使用多个 Ledgers 来保障正常写入,同时也是并发性能考虑。Pulsar 会先从 Cache 里面拿数据,当积压超过 Cache 上限,再通过 BK Client 从 BK 里读取数据;
Global replicators
全局复制,这里主要指的是跨地理位置/跨机房的复制;主要原理就是从本地机房再转发一份数据到其他机房(跟正常的发送消息流程一样);
Bookie
上面提到,Pulsar 使用 BookKeeper 作为存储服务,Bookie 就是 BookKeeper 的存储服务实例;
ZK/Global ZK
Pulsar 使用了多个 ZK,存储不同的数据:Global ZK 主要存储的例如 properties, namespaces 以及一些需要全局持久化的 policies;而 local ZK 主要存储本地集群相关数据,例如 topic 所有者元信息,server 的负载均衡结果,以及 BK Ledgers 的元信息数据等;
客户端
Produce
发送支持 “同步” 和 “异步” 方式,但看代码其实底层实现都是 “异步” 接口。“同步” 会等待服务器确认消息,异步则直接返回,只是把消息放入队列(a blocking queue)异步发送。这里顺便提一下 Pulsar 使用的是 Java8,运用了很多 Java8 的新特性。
消息支持 “LZ4”、“ZLIB” 两种压缩方式,以减少数据量节省带宽。支持批量发送。
Consume
消费同样支持 “同步” 和 “异步” 方式,底层同样都是 “异步” 接口,只是回调的区别而已。“同步” 会一直阻塞直到有消息可用,异步则直接返回,消息到达时会调用其回调方法。
消费的确认方式,可以一条消息一条消息的单独确认,也可以多个消息累计确认。使用累积确认时,只需对最后一条消息确认即可。需要注意的是,累积确认方式在 Shared 订阅模式下不可用。
前面也提到过,消费支持 Listener,消息达到时,会传给自定义的监听器。
Partitioned Topic
通常情况下,一个 topic 只能占据一个 server,这对于某些场景下来说,可能达不到要求的吞吐量性能。为了解决吞吐量,可以使用 Partitioned Topic,这点其实和 Kafka 很类似。对于 Pulsar 来说,Partitioned Topic 在内部其实等同于 N(N = Num of Partitions) 个普通 topic,订阅模式通用。
同 Kafka 类似,可以选择不同的策略,来决定消息发往哪个 Partition,例如 Key Hash、Round Robin 等策略。
以上就是 Pulsar 的一点简介。至于Kafka,由于我接触的时候还是 0.8.2 版本,且正式环境使用的也是这个版本,企业图稳定不图更新,线上运行这么长的时间非常稳定,也没有贸然升级的计划。而目前 Kafka 已经迭代到 0.10.1 版本,更新了大量内容,重新设计了客户端,加入权限控制、Connector、Stream 等等,已经从传统意义的消息 MQ,升级为流处理平台。 因此我觉得 Kafka 系列可能就先到此为止,毕竟写出来也是有点过时了,但关于消息的那部分,应该还是万变不离其宗。如果你想沟通交流,可以和我联系。