Apache Pulsar Connector

    You can use the connector with the Pulsar 2.8.1 or higher. Because the Pulsar connector supports Pulsar transactions, it is recommended to use the Pulsar 2.9.2 or higher. Details on Pulsar compatibility can be found in .

    Copied to clipboard!

    Flink’s streaming connectors are not part of the binary distribution. See how to link with them for cluster execution here.

    Pulsar Source

    The Pulsar source provides a builder class for constructing a PulsarSource instance. The code snippet below builds a PulsarSource instance. It consumes messages from the earliest cursor of the topic “persistent://public/default/my-topic” in Exclusive subscription type () and deserializes the raw payload of the messages as strings.

    1. PulsarSource<String> source = PulsarSource.builder()
    2. .setServiceUrl(serviceUrl)
    3. .setAdminUrl(adminUrl)
    4. .setStartCursor(StartCursor.earliest())
    5. .setTopics("my-topic")
    6. .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
    7. .setSubscriptionName("my-subscription")
    8. .setSubscriptionType(SubscriptionType.Exclusive)
    9. .build();
    10. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

    The following properties are required for building a PulsarSource:

    • Pulsar service URL, configured by setServiceUrl(String)
    • Pulsar service HTTP URL (also known as admin URL), configured by setAdminUrl(String)
    • Pulsar subscription name, configured by setSubscriptionName(String)
    • Topics / partitions to subscribe, see the following topic-partition subscription for more details.
    • Deserializer to parse Pulsar messages, see the following for more details.

    It is recommended to set the consumer name in Pulsar Source by setConsumerName(String). This sets a unique name for the Flink connector in the Pulsar statistic dashboard. You can use it to monitor the performance of your Flink connector and applications.

    Topic-partition Subscription

    Pulsar source provide two ways of topic-partition subscription:

    • Topic list, subscribing messages from all partitions in a list of topics. For example:

      1. PulsarSource.builder().setTopics("some-topic1", "some-topic2");
      2. // Partition 0 and 2 of topic "topic-a"
      3. PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
    • Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:

      1. PulsarSource.builder().setTopicPattern("topic-*");

    Flexible Topic Naming

    Since Pulsar 2.0, all topic names internally are in a form of {persistent|non-persistent}://tenant/namespace/topic. Now, for partitioned topics, you can use short names in many cases (for the sake of simplicity). The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace in a Pulsar cluster.

    This table lists a mapping relationship between your input topic name and the translated topic name:

    Input topic nameTranslated topic name
    my-topicpersistent://public/default/my-topic
    my-tenant/my-namespace/my-topicpersistent://my-tenant/my-namespace/my-topic

    For non-persistent topics, you need to specify the entire topic name, as the default-based rules do not apply for non-partitioned topics. Thus, you cannot use a short name like non-persistent://my-topic and need to use non-persistent://public/default/my-topic instead.

    Subscribing Pulsar Topic Partition

    Internally, Pulsar divides a partitioned topic as a set of non-partitioned topics according to the partition size.

    For example, if a simple-string topic with 3 partitions is created under the sample tenant with the flink namespace. The topics on Pulsar would be:

    Topic namePartitioned
    persistent://sample/flink/simple-stringY
    persistent://sample/flink/simple-string-partition-0N
    persistent://sample/flink/simple-string-partition-1N
    persistent://sample/flink/simple-string-partition-2N

    You can directly consume messages from the topic partitions by using the non-partitioned topic names above. For example, use PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", "sample/flink/simple-string-partition-2") would consume the partitions 1 and 2 of the sample/flink/simple-string topic.

    Setting Topic Patterns

    The Pulsar source extracts the topic type (persistent or non-persistent) from the provided topic pattern. For example, you can use the PulsarSource.builder().setTopicPattern("non-persistent://my-topic*") to specify a non-persistent topic. By default, a persistent topic is created if you do not specify the topic type in the regular expression.

    You can use setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics) to consume both persistent and non-persistent topics based on the topic pattern. The Pulsar source would filter the available topics by the RegexSubscriptionMode.

    Deserializer

    A deserializer (PulsarDeserializationSchema) is for decoding Pulsar messages from bytes. You can configure the deserializer using setDeserializationSchema(PulsarDeserializationSchema). The PulsarDeserializationSchema defines how to deserialize a Pulsar Message<byte[]>.

    If only the raw payload of a message (message data in bytes) is needed, you can use the predefined PulsarDeserializationSchema. Pulsar connector provides three implementation methods.

    • Decode the message by using Pulsar’s .

      1. // Primitive types
      2. PulsarDeserializationSchema.pulsarSchema(Schema);
      3. // Struct types (JSON, Protobuf, Avro, etc.)
      4. PulsarDeserializationSchema.pulsarSchema(Schema, Class);
      5. PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
    • Decode the message by using Flink’s DeserializationSchema

      1. PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
    • Decode the message by using Flink’s TypeInformation

      1. PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);

    Pulsar Message<byte[]> contains some extra properties, such as message key, message publish time, message time, and application-defined key/value pairs etc. These properties could be defined in the Message<byte[]> interface.

    If you want to deserialize the Pulsar message by these properties, you need to implement PulsarDeserializationSchema. Ensure that the TypeInformation from the PulsarDeserializationSchema.getProducedType() is correct. Flink uses this TypeInformation to pass the messages to downstream operators.

    Pulsar Subscriptions

    A Pulsar subscription is a named configuration rule that determines how messages are delivered to Flink readers. The subscription name is required for consuming messages. Pulsar connector supports four subscription types:

    There is no difference between Exclusive and Failover in the Pulsar connector. When a Flink reader crashes, all (non-acknowledged and subsequent) messages are redelivered to the available Flink readers.

    By default, if no subscription type is defined, Pulsar source uses the Shared subscription type.

    1. // Shared subscription with name "my-shared"
    2. PulsarSource.builder().setSubscriptionName("my-shared");
    3. // Exclusive subscription with name "my-exclusive"
    4. PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);

    Ensure that you provide a RangeGenerator implementation if you want to use the Key_Shared subscription type on the Pulsar connector. The RangeGenerator generates a set of key hash ranges so that a respective reader subtask only dispatches messages where the hash of the message key is contained in the specified range.

    The Pulsar connector uses UniformRangeGenerator that divides the range by the Flink source parallelism if no RangeGenerator is provided in the Key_Shared subscription type.

    Starting Position

    The Pulsar source is able to consume messages starting from different positions by setting the setStartCursor(StartCursor) option. Built-in start cursors include:

    • Start from the earliest available message in the topic.

      1. StartCursor.earliest();
    • Start from the latest available message in the topic.

    • Start from a specified message between the earliest and the latest. The Pulsar connector consumes from the latest available message if the message ID does not exist.

      The start message is included in consuming result.

      1. StartCursor.fromMessageId(MessageId);
    • Start from a specified message between the earliest and the latest. The Pulsar connector consumes from the latest available message if the message ID doesn’t exist.

      Include or exclude the start message by using the second boolean parameter.

      1. StartCursor.fromMessageId(MessageId, boolean);
    • Start from the specified message time by Message<byte[]>.getPublishTime().

      1. StartCursor.fromMessageTime(long);

    Each Pulsar message belongs to an ordered sequence on its topic. The sequence ID (MessageId) of the message is ordered in that sequence. The MessageId contains some extra information (the ledger, entry, partition) about how the message is stored, you can create a MessageId by using DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex).

    Boundedness

    The Pulsar source supports streaming and batch execution mode. By default, the PulsarSource is configured for unbounded data.

    For unbounded data the Pulsar source never stops until a Flink job is stopped or failed. You can use the setUnboundedStopCursor(StopCursor) to set the Pulsar source to stop at a specific stop position.

    You can use setBoundedStopCursor(StopCursor) to specify a stop position for bounded data.

    • The Pulsar source never stops consuming messages.

      1. StopCursor.never();
    • Stop at the latest available message when the Pulsar source starts consuming messages.

      1. StopCursor.latest();
    • Stop when the connector meets a given message, or stop at a message which is produced after this given message.

      1. StopCursor.atMessageId(MessageId);
    • Stop but include the given message in the consuming result.

      1. StopCursor.afterMessageId(MessageId);
    • Stop at the specified message time by Message<byte[]>.getPublishTime().

    StopCursor.atEventTime(long) is now deprecated.

    Source Configurable Options

    In addition to configuration options described above, you can set arbitrary options for PulsarClient, PulsarAdmin, Pulsar Consumer and PulsarSource by using setConfig(ConfigOption<T>, T), setConfig(Configuration) and setConfig(Properties).

    PulsarClient Options

    The Pulsar connector uses the client API to create the Consumer instance. The Pulsar connector extracts most parts of Pulsar’s ClientConfigurationData, which is required for creating a PulsarClient, as Flink configuration options in PulsarOptions.

    PulsarAdmin Options

    The admin API is used for querying topic metadata and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription. It shares most part of the configuration options with the client API. The configuration options listed here are only used in the admin API. They are also defined in PulsarOptions.

    KeyDefaultTypeDescription
    pulsar.admin.adminUrl
    (none)StringThe Pulsar service HTTP URL for the admin endpoint. For example, , or https://my-broker.example.com:8443 for TLS.
    pulsar.admin.autoCertRefreshTime
    300000IntegerThe auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.
    pulsar.admin.connectTimeout
    60000IntegerThe connection time out (in ms) for the PulsarAdmin client.
    pulsar.admin.readTimeout
    60000IntegerThe server response read timeout (in ms) for the PulsarAdmin client for any request.
    pulsar.admin.requestTimeout
    300000IntegerThe server request timeout (in ms) for the PulsarAdmin client for any request.

    Pulsar Consumer Options

    In general, Pulsar provides the Reader API and Consumer API for consuming messages in different scenarios. The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar’s ConsumerConfigurationData as Flink configuration options in PulsarSourceOptions.

    KeyDefaultTypeDescription
    pulsar.consumer.ackReceiptEnabled
    falseBooleanAcknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.
    pulsar.consumer.ackTimeoutMillis
    0LongThe timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.
    By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.
    When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).
    pulsar.consumer.acknowledgementsGroupTimeMicros
    100000LongGroup a consumer acknowledgment for a specified time (in μs). By default, a consumer uses 100μs grouping time to send out acknowledgments to a broker. If the group time is set to 0, acknowledgments are sent out immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
    pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
    falseBooleanBuffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull is true. Otherwise, it marks them for redelivery.
    pulsar.consumer.autoUpdatePartitionsIntervalSeconds
    60IntegerThe interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled.
    pulsar.consumer.consumerName
    (none)StringThe consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.
    pulsar.consumer.cryptoFailureAction
    FAIL

    Enum

    The consumer should take action when it receives a message that can not be decrypted.
    • FAIL: this is the default option to fail messages until crypto succeeds.
    • DISCARD: silently acknowledge but do not deliver messages to an application.
    • CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

    Fail to decompress the messages.
    If messages contain batch messages, a client is not be able to retrieve individual messages in batch.
    The delivered encrypted message contains EncryptionContext which contains encryption and compression information in. You can use an application to decrypt the consumed message payload.

    Possible values:
    • “FAIL”
    • “DISCARD”
    • “CONSUME”
    pulsar.consumer.deadLetterPolicy.deadLetterTopic
    (none)StringName of the dead topic where the failed messages are sent.
    pulsar.consumer.deadLetterPolicy.maxRedeliverCount
    0IntegerThe maximum number of times that a message are redelivered before being sent to the dead letter queue.
    pulsar.consumer.deadLetterPolicy.retryLetterTopic
    (none)StringName of the retry topic where the failed messages are sent.
    pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis
    60000LongIf a producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).
    pulsar.consumer.maxPendingChunkedMessage
    10IntegerThe consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages’ chunks. So, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.
    For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.
    Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this pulsar.consumer.maxPendingChunkedMessage threshold. Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged. This behavior can be controlled by the pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull option.
    pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
    50000IntegerThe maximum total receiver queue size across partitions.
    This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
    pulsar.consumer.negativeAckRedeliveryDelayMicros
    60000000LongDelay (in μs) to wait before redelivering messages that failed to be processed.
    When an application uses Consumer.negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
    pulsar.consumer.poolMessages
    falseBooleanEnable pooling of messages and the underlying data buffers.
    pulsar.consumer.priorityLevel
    0IntegerPriority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.
    The broker follows descending priorities. For example, 0=max-priority, 1, 2,…
    In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits. Otherwise, the broker considers consumers on the next priority level.

    Example 1
    If a subscription has consumer A with priorityLevel 0 and consumer B with priorityLevel 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.
    Example 2
    Consumer Priority, Level, Permits C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1
    The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
    pulsar.consumer.properties
    MapA name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
    pulsar.consumer.readCompacted
    falseBooleanIf enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.
    A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.
    Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).
    Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
    pulsar.consumer.receiverQueueSize
    1000IntegerSize of a consumer’s receiver queue.
    For example, the number of messages accumulated by a consumer before an application calls Receive.
    A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
    pulsar.consumer.replicateSubscriptionState
    falseBooleanIf replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.
    pulsar.consumer.retryEnable
    falseBooleanIf enabled, the consumer will automatically retry messages.
    pulsar.consumer.subscriptionInitialPosition
    Latest

    Enum

    Initial position at which to set cursor when subscribing to a topic at first time.

    Possible values:
    • “Latest”
    • “Earliest”
    pulsar.consumer.subscriptionMode
    Durable

    Enum

    Select the subscription mode to be used when subscribing to the topic.
    • Durable: Make the subscription to be backed by a durable cursor that will retain messages and persist the current position.
    • NonDurable: Lightweight subscription mode that doesn’t have a durable cursor associated


    Possible values:
    • “Durable”
    • “NonDurable”
    pulsar.consumer.subscriptionName
    (none)StringSpecify the subscription name for this consumer. This argument is required when constructing the consumer.
    pulsar.consumer.subscriptionType
    Shared

    Enum

    Subscription type.

    Four subscription types are available:
    • Exclusive
    • Failover
    • Shared
    • Key_Shared


    Possible values:
    • “Exclusive”
    • “Shared”
    • “Failover”
    • “Key_Shared”
    pulsar.consumer.tickDurationMillis
    1000LongGranularity (in ms) of the ack-timeout redelivery.
    A greater (for example, 1 hour) tickDurationMillis reduces the memory overhead to track messages.

    PulsarSource Options

    The configuration options below are mainly used for customizing the performance and message acknowledgement behavior. You can ignore them if you do not have any performance issues.

    To handle scenarios like topic scaling-out or topic creation without restarting the Flink job, the Pulsar source periodically discover new partitions under a provided topic-partition subscription pattern. To enable partition discovery, you can set a non-negative value for the PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS option:

    Event Time and Watermarks

    By default, the message uses the timestamp embedded in Pulsar Message<byte[]> as the event time. You can define your own WatermarkStrategy to extract the event time from the message, and emit the watermark downstream:

    1. env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy");

    This documentation describes details about how to define a WatermarkStrategy.

    Message Acknowledgement

    When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when the connector acknowledges that all these messages are processed successfully. The Pulsar connector supports four subscription types, which makes the acknowledgement behaviors vary among different subscriptions.

    Acknowledgement on Exclusive and Failover Subscription Types

    Exclusive and Failover subscription types support cumulative acknowledgment. In these subscription types, Flink only needs to acknowledge the latest successfully consumed message. All the message before the given message are marked with a consumed status.

    The Pulsar source acknowledges the current consuming message when checkpoints are completed, to ensure the consistency between Flink’s checkpoint state and committed position on the Pulsar brokers.

    If checkpointing is disabled, Pulsar source periodically acknowledges messages. You can use the PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL option to set the acknowledgement period.

    Pulsar source does NOT rely on committed positions for fault tolerance. Acknowledging messages is only for exposing the progress of consumers and monitoring on these two subscription types.

    Acknowledgement on Shared and Key_Shared Subscription Types

    In Shared and Key_Shared subscription types, messages are acknowledged one by one. You can acknowledge a message in a transaction and commit it to Pulsar.

    You should enable transaction in the Pulsar borker.conf file when using these two subscription types in connector:

    1. transactionCoordinatorEnabled=true

    The default timeout for Pulsar transactions is 3 hours. Make sure that that timeout is greater than checkpoint interval + maximum recovery time. A shorter checkpoint interval indicates a better consuming performance. You can use the PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS option to change the transaction timeout.

    If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE to true. The message is immediately acknowledged after consuming. No consistency guarantees can be made in this scenario.

    All acknowledgements in a transaction are recorded in the Pulsar broker side.

    The Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.

    This part describes the Pulsar sink based on the new API.

    If you still want to use the legacy SinkFunction or on Flink 1.14 or previous releases, just use the StreamNative’s pulsar-flink.

    Usage

    The Pulsar Sink uses a builder class to construct the PulsarSink instance. This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.

    1. DataStream<String> stream = ...
    2. .setServiceUrl(serviceUrl)
    3. .setAdminUrl(adminUrl)
    4. .setTopics("topic1")
    5. .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
    6. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    7. .build();
    8. stream.sinkTo(sink);

    The following properties are required for building PulsarSink:

    • Pulsar service url, configured by setServiceUrl(String)
    • Pulsar service http url (aka. admin url), configured by setAdminUrl(String)
    • Topics / partitions to write, see writing targets for more details.
    • Serializer to generate Pulsar messages, see for more details.

    It is recommended to set the producer name in Pulsar Source by setProducerName(String). This sets a unique name for the Flink connector in the Pulsar statistic dashboard. You can use it to monitor the performance of your Flink connector and applications.

    Producing to topics

    Defining the topics for producing is similar to the in the Pulsar source. We support a mix-in style of topic setting. You can provide a list of topics, partitions, or both of them.

    1. // Topic "some-topic1" and "some-topic2"
    2. // Partition 0 and 2 of topic "topic-a"
    3. PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
    4. // Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
    5. PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")

    The topics you provide support auto partition discovery. We query the topic metadata from the Pulsar in a fixed interval. You can use the PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL option to change the discovery interval option.

    Configuring writing targets can be replaced by using a custom [TopicRouter] message routing. Configuring partitions on the Pulsar connector is explained in the section.

    If you build the Pulsar sink based on both the topic and its corresponding partitions, Pulsar sink merges them and only uses the topic.

    For example, when using the PulsarSink.builder().setTopics("some-topic1", "some-topic1-partition-0") option to build the Pulsar sink, this is simplified to PulsarSink.builder().setTopics("some-topic1").

    Serializer

    A serializer (PulsarSerializationSchema) is required for serializing the record instance into bytes. Similar to PulsarSource, Pulsar sink supports both Flink’s SerializationSchema and Pulsar’s Schema. Pulsar’s Schema.AUTO_PRODUCE_BYTES() is not supported in the Pulsar sink.

    • Encode the message by using Pulsar’s .

      1. // Primitive types
      2. PulsarSerializationSchema.pulsarSchema(Schema)
      3. // Struct types (JSON, Protobuf, Avro, etc.)
      4. PulsarSerializationSchema.pulsarSchema(Schema, Class)
      5. // KeyValue type
      6. PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
    • Encode the message by using Flink’s SerializationSchema

      1. PulsarSerializationSchema.flinkSchema(SerializationSchema)

    Schema evolution can be enabled by users using PulsarSerializationSchema.pulsarSchema() and PulsarSinkBuilder.enableSchemaEvolution(). This means that any broker schema validation is in place.

    1. Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
    2. PulsarSerializationSchema<SomePojo> pulsarSchema = PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
    3. PulsarSink<String> sink = PulsarSink.builder()
    4. ...
    5. .setSerializationSchema(pulsarSchema)
    6. .enableSchemaEvolution()
    7. .build();

    Message Routing

    Routing in Pulsar Sink is operated on the partition level. For a list of partitioned topics, the routing algorithm first collects all partitions from different topics, and then calculates routing within all the partitions. By default Pulsar Sink supports two router implementation.

    • KeyHashTopicRouter: use the hashcode of the message’s key to decide the topic partition that messages are sent to.

      The message key is provided by PulsarSerializationSchema.key(IN, PulsarSinkContext) You need to implement this interface and extract the message key when you want to send the message with the same key to the same topic partition.

      If you do not provide the message key. A topic partition is randomly chosen from the topic list.

      The message key can be hashed in two ways: MessageKeyHash.JAVA_HASH and MessageKeyHash.MURMUR3_32_HASH. You can use the PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH option to choose the hash method.

    • RoundRobinRouter: Round-robin among all the partitions.

      All messages are sent to the first partition, and switch to the next partition after sending a fixed number of messages. The batch size can be customized by the PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES option.

    Let’s assume there are ten messages and two topics. Topic A has two partitions while topic B has three partitions. The batch size is set to five messages. In this case, topic A has 5 messages per partition which topic B does not receive any messages.

    You can configure custom routers by using the TopicRouter interface. If you implement a TopicRouter, ensure that it is serializable. And you can return partitions which are not available in the pre-discovered partition list.

    Thus, you do not need to specify topics using the PulsarSinkBuilder.setTopics option when you implement the custom topic router.

    1. @PublicEvolving
    2. public interface TopicRouter<IN> extends Serializable {
    3. String route(IN in, List<String> partitions, PulsarSinkContext context);
    4. default void open(SinkConfiguration sinkConfiguration) {
    5. // Nothing to do by default.
    6. }
    7. }

    Internally, a Pulsar partition is implemented as a topic. The Pulsar client provides APIs to hide this implementation detail and handles routing under the hood automatically. Pulsar Sink uses a lower client API to implement its own routing layer to support multiple topics routing.

    For details, see partitioned topics.

    PulsarSink supports three delivery guarantee semantics.

    • NONE: Data loss can happen even when the pipeline is running. Basically, we use a fire-and-forget strategy to send records to Pulsar topics in this mode. It means that this mode has the highest throughput.
    • AT_LEAST_ONCE: No data loss happens, but data duplication can happen after a restart from checkpoint.
    • EXACTLY_ONCE: No data loss happens. Each record is sent to the Pulsar broker only once. Pulsar Sink uses and two-phase commit (2PC) to ensure records are sent only once even after pipeline restarts.

    Delayed message delivery

    enables you to delay the possibility to consume a message. With delayed message enabled, the Pulsar sink sends a message to the Pulsar topic immediately, but the message is delivered to a consumer once the specified delay is over.

    Delayed message delivery only works in the Shared subscription type. In Exclusive and Failover subscription types, the delayed message is dispatched immediately.

    You can configure the MessageDelayer to define when to send the message to the consumer. The default option is to never delay the message dispatching. You can use the MessageDelayer.fixed(Duration) option to Configure delaying all messages in a fixed duration. You can also implement the MessageDelayer interface to dispatch messages at different time.

    The dispatch time should be calculated by the PulsarSinkContext.processTime().

    Sink Configurable Options

    You can set options for PulsarClient, PulsarAdmin, Pulsar Producer and PulsarSink by using setConfig(ConfigOption<T>, T), setConfig(Configuration) and setConfig(Properties).

    PulsarClient and PulsarAdmin Options

    For details, refer to PulsarAdmin options.

    Pulsar Producer Options

    The Pulsar connector uses the Producer API to send messages. It extracts most parts of Pulsar’s ProducerConfigurationData as Flink configuration options in PulsarSinkOptions.

    KeyDefaultTypeDescription
    pulsar.producer.batchingEnabled
    trueBooleanEnable batch send ability, it was enabled by default.
    pulsar.producer.batchingMaxBytes
    131072IntegerThe maximum size of messages permitted in a batch. Keep the maximum consistent as previous versions.
    pulsar.producer.batchingMaxMessages
    1000IntegerThe maximum number of messages permitted in a batch.
    pulsar.producer.batchingMaxPublishDelayMicros
    1000LongBatching time period of sending messages.
    pulsar.producer.batchingPartitionSwitchFrequencyByPublishDelay
    10IntegerThe maximum wait time for switching topic partitions.
    pulsar.producer.chunkingEnabled
    falseBoolean
    pulsar.producer.compressionType
    NONE

    Enum

    Message data compression type used by a producer.Available options:

    Possible values:
    • “NONE”
    • “LZ4”
    • “ZLIB”
    • “ZSTD”
    • “SNAPPY”
    pulsar.producer.initialSequenceId
    (none)LongThe sequence id for avoiding the duplication, it’s used when Pulsar doesn’t have transaction.
    pulsar.producer.maxPendingMessages
    1000IntegerThe maximum size of a queue holding pending messages.
    For example, a message waiting to receive an acknowledgment from a https://pulsar.apache.org/docs/en/reference-terminology#broker.
    By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.
    pulsar.producer.maxPendingMessagesAcrossPartitions
    50000IntegerThe maximum number of pending messages across partitions.
    Use the setting to lower the max pending messages for each partition (setMaxPendingMessages) if the total number exceeds the configured value.
    pulsar.producer.producerName
    (none)StringA producer name which would be displayed in the Pulsar’s dashboard. If no producer name was provided, we would use a Pulsar generated name instead.
    pulsar.producer.properties
    MapA name or value property of this consumer. properties is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.
    pulsar.producer.sendTimeoutMs
    30000LongMessage send timeout in ms.If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.

    PulsarSink Options

    The configuration options below are mainly used for customizing the performance and message sending behavior. You can just leave them alone if you do not have any performance issues.

    KeyDefaultTypeDescription
    pulsar.sink.deliveryGuarantee
    none

    Enum

    Optional delivery guarantee when committing.

    Possible values:
    • “exactly-once”: Records are only delivered exactly-once also under failover scenarios. To build a complete exactly-once pipeline is required that the source and sink support exactly-once and are properly configured.
    • “at-least-once”: Records are ensured to be delivered but it may happen that the same record is delivered multiple times. Usually, this guarantee is faster than the exactly-once delivery.
    • “none”: Records are delivered on a best effort basis. It is often the fastest way to process records but it may happen that records are lost or duplicated.
    pulsar.sink.enableSchemaEvolution
    falseBooleanIf you enable this option and use PulsarSerializationSchema.pulsarSchema(), we would consume and deserialize the message by using Pulsar’s Schema.
    pulsar.sink.maxRecommitTimes
    5IntegerThe allowed transaction recommit times if we meet some retryable exception. This is used in Pulsar Transaction.
    pulsar.sink.messageKeyHash
    murmur-3-32-hash

    Enum

    The hash policy for routing message by calculating the hash code of message key.

    Possible values:
    • “java-hash”: This hash would use String.hashCode() to calculate the message key string’s hash code.
    • “murmur-3-32-hash”: This hash would calculate message key’s hash code by using Murmur3 algorithm.
    pulsar.sink.topicMetadataRefreshInterval
    1800000LongAuto update the topic metadata in a fixed interval (in ms). The default value is 30 minutes.
    pulsar.sink.transactionTimeoutMillis
    10800000LongThis option is used when the user require the DeliveryGuarantee.EXACTLY_ONCE semantic.We would use transaction for making sure the message could be write only once.

    Sink Metrics

    This table lists supported metrics. The first 6 metrics are standard Pulsar Sink metrics as described in FLIP-33: Standardize Connector Metrics

    • numBytesOut, numRecordsOut, numRecordsOutErrors are retrieved from Pulsar client metrics.

    • currentSendTime tracks the time from when the producer calls sendAync() to the time when the message is acknowledged by the broker. This metric is not available in NONE delivery guarantee.

    The Pulsar producer refreshes its stats every 60 seconds by default. The PulsarSink retrieves the Pulsar producer stats every 500ms. That means that numRecordsOut, numBytesOut, numAcksReceived, and numRecordsOutErrors are updated every 60 seconds. To increase the metrics refresh frequency, you can change the Pulsar producer stats refresh interval to a smaller value (minimum 1 second), as shown below.

    numBytesOutRate and numRecordsOutRate are calculated based on the numBytesOut and numRecordsOUt counter respectively. Flink internally uses a fixed 60 seconds window to calculate the rates.

    Brief Design Rationale

    Pulsar sink follow the Sink API defined in FLIP-191.

    Stateless SinkWriter

    In EXACTLY_ONCE mode, the Pulsar sink does not store transaction information in a checkpoint. That means that new transactions will be created after a restart. Therefore, any message in previous pending transactions is either aborted or timed out (They are never visible to the downstream Pulsar consumer). The Pulsar team is working to optimize the needed resources by unfinished pending transactions.

    Pulsar Schema Evolution

    allows you to reuse the same Flink job after certain “allowed” data model changes, like adding or deleting a field in a AVRO-based Pojo class. Please note that you can specify Pulsar schema validation rules and define an auto schema update. For details, refer to Pulsar Schema Evolution.

    Known Issues

    This section describes some known issues about the Pulsar connectors.

    Unstable on Java 11

    Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector on Java 8.

    No TransactionCoordinatorNotFound, but automatic reconnect

    Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2 introduces a break change in transactions. If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a TransactionCoordinatorNotFound exception.

    You can use the latest release to resolve this issue.

    The generic upgrade steps are outlined in . The Pulsar connector does not store any state on the Flink side. The Pulsar connector pushes and stores all the states on the Pulsar side. For Pulsar, you additionally need to know these limitations:

    • Do not upgrade the Pulsar connector and Pulsar broker version at the same time.
    • Always use a newer Pulsar client with Pulsar connector to consume messages from Pulsar.

    Troubleshooting