Contact Information

7154 N University Dr #95,
Tamarac, FL 33321, USA

We Are Available 24/ 7. Email Us.


In my previous blog that introduces Apache BookKeeper, it mentions that Apache Pulsar maintains a cursor ledger for each subscription in Apache BookKeeper. After a consumer has processed a message with an acknowledgment sent to the broker and the broker has received it, the broker updates the cursor ledger accordingly. In this blog, let’s take a closer look at the cursor, how it works, and some concepts related to it.

What Are Cursors

Ensuring messages can be consumed successfully represents a key part of any distributed message system. In exceptions, such as machine failures on either the client side or server side, a message system featuring strong fault tolerance makes sure that no messages are lost or reconsumed. To provide such capability, message systems need a precise tracking mechanism for message consumption and acknowledgment. Apache Pulsar uses cursors for that purpose.

As we can tell from the name, the word “cursor” means the blinking vertical line that moves on a computer screen as you type. It shows you the position on the screen where new text will be added. Similarly, a cursor in Pulsar tells the broker the progress of message consumption.

To gain a comprehensive understanding of cursors, we need to first know the meaning of different subscription types in Pulsar. A subscription defines how a group of consumers (one or multiple) consume messages on a topic. A single topic supports multiple subscriptions of varied types working at the same time. The reason behind this design philosophy is that we can easily configure different subscription patterns as needed for various applications. Currently, Pulsar supports the following four subscription types.

  • Exclusive. The default subscription type in Pulsar. An exclusive subscription only allows a single consumer to read messages for that subscription. If other consumers want to connect to the subscription, the request will be rejected by the broker. Exclusive subscriptions provide message ordering guarantees.
  • Failover. Supports multiple consumers associated with the same subscription while only one instance can process messages. You can consider a failover subscription as the “advanced” version of an exclusive subscription. If a consumer is unable to receive messages for some reason, Pulsar automatically chooses another consumer from the instance pool to take over, keeping message consumption ongoing.
  • Shared. Allows multiple consumers to consume messages from a topic in a round-robin way. Therefore, shared subscriptions cannot guarantee orders in message consumption. Although multiple consumers receive messages for a shared subscription, no messages are consumed repeatedly under normal circumstances. If a consumer fails or is disconnected suddenly, all messages sent to the consumer that are not acknowledged will be delivered to other consumers for acknowledgment.
  • Key_Shared. Similar to a shared subscription, a key_shared subscription also allows multiple consumers to be attached to it. This subscription type ensures that messages are delivered to different consumers based on key values; that is, messages with the same key are sent to the same consumer.

In Pulsar, each subscription in a topic has a cursor bound to it. Depending on the subscription type adopted, the cursor tracks the consumption and acknowledgment information of one or multiple consumers. For example, consumers share the cursor for a shared or key_shared subscription. A Pulsar subscription is similar to a Kafka consumer group, while a cursor is like an offset in Kafka (Kafka uses a numerical offset to track the position of a consumer). That said, the cursor works in a much more sophisticated way than the offset.

Note that as it is possible that each topic has more than one subscription, you can create multiple subscriptions of different types for that topic. This also means multiple consumer groups can read messages on the same topic. In this case, the cursor of each subscription records the message consumption position of consumers in the respective group, not impacting one another.

How Cursors Work

After we learn these subscription types, let’s take a look at how the cursor works. See the figure below for a simple example.

  1. The broker sends a message to the consumer.
  2. Upon receipt of this message, the consumer acknowledges it and sends the acknowledgment back to the broker.
  3. The broker receives the acknowledgment and moves the cursor, updating the cursor ledger stored in BookKeeper.

As brokers are stateless and cursors are stateful, Pulsar stores the consumption position information in BookKeeper. When a broker receives an acknowledgment from a consumer, it updates the cursor ledger for the subscription that the consumer is bound to. In the case of consumer failures, the message consumption and acknowledgment information is still safe and no messages will be reconsumed when consumption restarts. This is because the cursor ledger data are safely stored on bookies based on the replica strategy configured (i.e. the values of Ensemble Size, Write Quorum, and Ack Quorum).

Note: A small portion of cursor metadata is stored in ZooKeeper instead of BookKeeper, such as cursorLedger index information. The reason is that as the number of topics and consumers grows, the cursor metadata size becomes extremely large, leaving ZooKeeper overwhelmed and affecting the performance of the entire Pulsar cluster.

So, after consumers acknowledge one or multiple messages in a topic, does the cursor move for sure? And how many spots (messages) does the broker move the cursor? To answer these two questions, we must first know how messages are acknowledged in Pulsar.

How Messages Are Acknowledged 

By default, each message in Pulsar needs to be acknowledged (ACK for short). Currently, Pulsar supports two ways of acknowledgments — individual and cumulative.

Individual acknowledgment, also known as selective acknowledgment, allows consumers to selectively acknowledge a single message in a stream. Messages before or after this message can remain unacknowledged. For example, for a shared subscription, multiple consumers may consume and acknowledge messages at a different pace, leaving unacknowledged messages between those that have been acknowledged (this results in “acknowledgment holes”, which will be covered later).

Cumulative acknowledgment allows consumers to only acknowledge the last message it has processed. In this way, the broker marks that message and all the messages delivered before it as acknowledged. As such, using cumulative acknowledgment is conducive to improving acknowledgment efficiency.

Not all subscription types support both ways of acknowledgments. Individual acknowledgment can be configured for all four subscription types, while shared and key_shared subscriptions do not support cumulative acknowledgment.

Now that we know how messages are acknowledged in Pulsar, let’s analyze the two questions. The cursor does not necessarily move after consumers acknowledge messages. This is because it is possible that some messages are still unacknowledged before the latest acknowledged message. How many spots the broker moves the cursor forward depends on the acknowledgment method adopted for the subscription. For example, if five messages are acknowledged through cumulative acknowledgment, the cursor moves forward for five spots on the premise that all preceding messages are already acknowledged.

To know the exact position of a cursor in a topic, we can check the markDeletePosition attribute of the cursor, which marks the position of the acknowledged message that is before the oldest unacknowledged message. As that message and all the messages before it are acknowledged, they are ready for deletion.

Note: You can check the details of a topic by using the pulsar-admin topics stats command, the output of which contains cursor-related information, such as markDeletePosition, cursorLedger, and individuallyDeletedMessages. For more information, see the Pulsar documentation.

To summarize, whether the broker moves the cursor is related to the markDeletePosition attribute. The message marked by markDeletePosition and all messages before it are already consumed and acknowledged successfully. Note that as the consumption position marked by a cursor is only applicable to the subscription that the cursor is associated with, it is possible that multiple subscription cursors are in different positions on a topic. See the figure below as an example.

Acknowledgment Holes

Different from message systems like Kafka and RocketMQ, Pulsar supports both individual acknowledgment and cumulative acknowledgment. When individual acknowledgment is used, unacknowledged messages may exist between acknowledged ones as multiple consumer instances receive messages and acknowledge them on their own (the subscription type is shared). These unacknowledged messages are also known as “acknowledgment holes”.

The cursor in Pulsar uses an abstraction called individuallyDeletedMessages to record the information of acknowledgment holes. Specifically, this abstraction contains multiple ranges with open and closed intervals (acknowledgment information is stored in the form of ranges). An open interval means the message is not acknowledged while a closed interval represents an acknowledged message. In the example below, messages M4 and M7 are unacknowledged so the intervals are open.

As shown in the figure, an acknowledgment hole refers to the unacknowledged message between two consecutive ranges. The number of consumers for a subscription and their consumption rate could all impact the number of acknowledgment holes. As mentioned above, when acknowledgment holes exist, the cursor (more precisely, markDeletePosition) stays before the oldest unacknowledged message. After all messages within a certain broader range are acknowledged, smaller ranges are merged. As a result, the cursor moves accordingly.

Note: How to reduce the number of acknowledgment holes in production is a very complicated topic. After all, there may be a variety of factors that could lead to acknowledgment holes, such as exceptions on brokers or clients. Generally, based on the value of individuallyDeletedMessages, we can try to reduce the number of consumers and adjust the frequency of message acknowledgment for performance optimization.

How TTL Impacts Cursors

By default, Pulsar stores all unacknowledged messages forever. You might be wondering whether the cursor remains static if the messages stay unacknowledged for a long time for some reason. In reality, we should avoid the case where plenty of unacknowledged messages exist since this could mean great pressure on disk space. In this case, whether the cursor moves depends on Time to live (TTL) in Pulsar.

By setting a TTL policy, we can define the retention period for unacknowledged messages. After the configured timeframe, Pulsar automatically acknowledges these messages, forcing the cursor to be moved. This also means these messages are ready to be deleted.

Note: This blog does not explain TTL in detail, as it relates to message retention and deletion policies in Pulsar, which is another complicated topic that requires a separate blog for detailed discussions. Here, you only need to know we still have another way to acknowledge messages to move the cursor if messages cannot be acknowledged by the consumer. For more information about TTL, see Message retention and expiry.

Summary

This blog introduces the concept of the cursor and how it works. To get a clear picture of the cursor, we also talk about different subscription types in Pulsar and two ways of message acknowledgments. The cursor mechanism not only ensures the accuracy of message consumption and acknowledgment for the distributed message system, but also provides the ability of fault tolerance. Even if the client goes wrong, the cursor ensures that messages are not consumed repeatedly and that no messages are missed after consumption restarts.



Source link

Share:

administrator

Leave a Reply

Your email address will not be published. Required fields are marked *