HENNG

Pulsar 简介

因为工作关系,会接触到不同类型的MQ。前段时间,调研了 Yahoo 开源的分布式消息中间件 - Pulsar。根据 Yahoo 的说法,Pulsar 诞生在 Kafka 之前,这就是他们为什么不用 Kafka 的原因。下面就介绍下 Pulsar 的各个方面。

主要特性

  • 可水平扩展的集群(得益于 BookKeeper 的特性)
  • 强顺序和一致性保证
  • 低延迟
  • 负载均衡
  • 多租户/授权验证/配额
  • 跨地理位置的复制

快速入门

  • Topic

    Pulsar Topic 有点特别,格式如下:

    1
    persistent://my-property/us-west/my-namespace/my-topic

    persistent 持久化的,目前唯一支持的存储类型,官方说未来可能支持 non-persistent 类型;

    my-property Property 定义一个租户,可以理解为一个用户;

    us-west Cluster,topic 所在的机房,地理位置;

    my-namespace Namespace 定义多个同组的 topics,可以理解为一个应用;

    my-topic Topic,最终的 Topic 名称,在代码里面也可能被称为 Destination。

    总结起来可以这么看:

    persistent://某个用户/地理机房/该用户的应用或业务/topic 名称

  • 订阅模式

    Pulsar 支持三种订阅模式:

    • Exclusive:无论有多少实例,只会有一个实例能消费,可以保证消息顺序;
    • Shared:多个实例通过轮询方式各获得部分消息,不能保证消息顺序;
    • Failover:类似于 Exclusive 方式,始终只会有一个实例消费,但其他实例在该实例挂掉的时候可以补上去,也可以保证消息顺序;

    实际上,Pulsar 也支持 Listener 模式,不过二者互斥,使用 Subscribe 就不能使用 Listener 了。

  • Demo

    直接看官方给的例子,非常简单,Pulsar 的 API 的确非常容易理解。目前官方提供 Java 版本,但其他语言的客户端也可以使用,Pulsar 有个 Websocket 模块负责和其他语言交互。在生产消费之前,你可以先起一个单例 server:

    1
    bin/pulsar standalone

    Producer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    PulsarClient client = PulsarClient.create("http://localhost:8080");

    Producer producer = client.createProducer(
    "persistent://sample/standalone/ns1/my-topic");

    // Publish 10 messages to the topic
    for (int i = 0; i < 10; i++) {
    producer.send("my-message".getBytes());
    }

    client.close();

    Consumer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    PulsarClient client = PulsarClient.create("http://localhost:8080");

    Consumer consumer = client.subscribe(
    "persistent://sample/standalone/ns1/my-topic",
    "my-subscribtion-name");

    while (true) {
    // Wait for a message
    Message msg = consumer.receive();

    System.out.println("Received message: " + msg.getData());

    // Acknowledge the message so that it can be deleted by broker
    consumer.acknowledge(msg);
    }

    client.close();

架构设计

Architecture

从上往下看,

  • Producer/Consumer

    Java 客户端

  • Service discovery

    一个 HTTP 服务,提供 REST 接口,客户端先发送 HTTP 请求获取可用的服务器地址;

  • Dispatcher

    一个异步的 TCP 服务,派发调度消息;

  • Load Balancer

    负载均衡,通过CPU、Load等物理指标,以及 Pulsar Server 的服务指标,判断 Server 实例的负载情况,可用于自动将负载低的 Server 分配给某些 Topic;

  • Managed Ledger/BK Client/Cache

    Pulsar 使用了 Apache BookKeeper 作为它的存储服务,因此在 BK 之上开发了 Managed Ledger 这样一层 lib,封装了很多对 BK Client 的操作。例如 Managed Ledger 里面使用了多个 Ledgers,因为一个 Ledger 只能由单线程写,且在崩溃后为只读状态,必须使用多个 Ledgers 来保障正常写入,同时也是并发性能考虑。Pulsar 会先从 Cache 里面拿数据,当积压超过 Cache 上限,再通过 BK Client 从 BK 里读取数据;

  • Global replicators

    全局复制,这里主要指的是跨地理位置/跨机房的复制;主要原理就是从本地机房再转发一份数据到其他机房(跟正常的发送消息流程一样);

  • Bookie

    上面提到,Pulsar 使用 BookKeeper 作为存储服务,Bookie 就是 BookKeeper 的存储服务实例;

  • ZK/Global ZK

    Pulsar 使用了多个 ZK,存储不同的数据:Global ZK 主要存储的例如 properties, namespaces 以及一些需要全局持久化的 policies;而 local ZK 主要存储本地集群相关数据,例如 topic 所有者元信息,server 的负载均衡结果,以及 BK Ledgers 的元信息数据等;

客户端

  • Produce

    发送支持 “同步” 和 “异步” 方式,但看代码其实底层实现都是 “异步” 接口。“同步” 会等待服务器确认消息,异步则直接返回,只是把消息放入队列(a blocking queue)异步发送。这里顺便提一下 Pulsar 使用的是 Java8,运用了很多 Java8 的新特性。

    消息支持 “LZ4”、“ZLIB” 两种压缩方式,以减少数据量节省带宽。支持批量发送。

  • Consume

    消费同样支持 “同步” 和 “异步” 方式,底层同样都是 “异步” 接口,只是回调的区别而已。“同步” 会一直阻塞直到有消息可用,异步则直接返回,消息到达时会调用其回调方法。

    消费的确认方式,可以一条消息一条消息的单独确认,也可以多个消息累计确认。使用累积确认时,只需对最后一条消息确认即可。需要注意的是,累积确认方式在 Shared 订阅模式下不可用。

    前面也提到过,消费支持 Listener,消息达到时,会传给自定义的监听器。

  • Partitioned Topic

    通常情况下,一个 topic 只能占据一个 server,这对于某些场景下来说,可能达不到要求的吞吐量性能。为了解决吞吐量,可以使用 Partitioned Topic,这点其实和 Kafka 很类似。对于 Pulsar 来说,Partitioned Topic 在内部其实等同于 N(N = Num of Partitions) 个普通 topic,订阅模式通用。

    同 Kafka 类似,可以选择不同的策略,来决定消息发往哪个 Partition,例如 Key Hash、Round Robin 等策略。

以上就是 Pulsar 的一点简介。至于Kafka,由于我接触的时候还是 0.8.2 版本,且正式环境使用的也是这个版本,企业图稳定不图更新,线上运行这么长的时间非常稳定,也没有贸然升级的计划。而目前 Kafka 已经迭代到 0.10.1 版本,更新了大量内容,重新设计了客户端,加入权限控制、Connector、Stream 等等,已经从传统意义的消息 MQ,升级为流处理平台。 因此我觉得 Kafka 系列可能就先到此为止,毕竟写出来也是有点过时了,但关于消息的那部分,应该还是万变不离其宗。如果你想沟通交流,可以和我联系。