您可以使用 Pulsar 来创建 Pulsar producers , , 和 readers .

这里也提供 api 文档.

有关标准 api 文档, 请参阅Godoc.

You can install the library locally using go get.

Once installed locally, you can import it into your project:

  1. import "github.com/apache/pulsar-client-go/pulsar"

连接URL

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here’s an example for localhost:

  1. pulsar://localhost:6650

A URL for a production Pulsar cluster may look something like this:

  1. pulsar://pulsar.us-west.example.com:6650

If you’re using authentication, the URL will look like something like this:

  1. pulsar+ssl://pulsar.us-west.example.com:6651
  1. import (
  2. "log"
  3. "time"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{
  8. URL: "pulsar://localhost:6650",
  9. OperationTimeout: 30 * time.Second,
  10. ConnectionTimeout: 30 * time.Second,
  11. })
  12. if err != nil {
  13. log.Fatalf("Could not instantiate Pulsar client: %v", err)
  14. }
  15. defer client.Close()
  16. }

The following configurable parameters are available for Pulsar clients:

Name | Description | Default | :———— | :————— |:————— | | URL | Configure the service URL for the Pulsar service. This parameter is required | | | ConnectionTimeout | Timeout for the establishment of a TCP connection | 30s | | OperationTimeout| Set the operation timeout. Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed| 30s| | Authentication | Configure the authentication provider. Example: Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem") | no authentication | | TLSTrustCertsFilePath | Set the path to the trusted TLS certificate file | | | TLSAllowInsecureConnection | Configure whether the Pulsar client accept untrusted TLS certificate from broker | false | | TLSValidateHostname | Configure whether the Pulsar client verify the validity of the host name from broker | false |

Producers

Pulsar producers publish messages to Pulsar topics. You can Go producers using a ProducerOptions object. Here’s an example:

Producer operations

Pulsar Go producers have the following methods available:

生产者示例

How to use message router in producer

  1. client, err := NewClient(ClientOptions{
  2. URL: serviceURL,
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. // Only subscribe on the specific partition
  9. consumer, err := client.Subscribe(ConsumerOptions{
  10. Topic: "my-partitioned-topic-partition-2",
  11. SubscriptionName: "my-sub",
  12. })
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. defer consumer.Close()
  17. producer, err := client.CreateProducer(ProducerOptions{
  18. Topic: "my-partitioned-topic",
  19. MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
  20. fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
  21. return 2
  22. },
  23. })
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. defer producer.Close()

How to use delay relative in producer

  1. client, err := NewClient(ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topicName := newTopicName()
  9. producer, err := client.CreateProducer(ProducerOptions{
  10. Topic: topicName,
  11. })
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer producer.Close()
  16. consumer, err := client.Subscribe(ConsumerOptions{
  17. Topic: topicName,
  18. SubscriptionName: "subName",
  19. Type: Shared,
  20. })
  21. if err != nil {
  22. log.Fatal(err)
  23. }
  24. defer consumer.Close()
  25. ID, err := producer.Send(context.Background(), &ProducerMessage{
  26. Payload: []byte(fmt.Sprintf("test")),
  27. DeliverAfter: 3 * time.Second,
  28. })
  29. if err != nil {
  30. log.Fatal(err)
  31. }
  32. fmt.Println(ID)
  33. ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
  34. msg, err := consumer.Receive(ctx)
  35. if err != nil {
  36. log.Fatal(err)
  37. }
  38. fmt.Println(msg.Payload())
  39. canc()
  40. ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
  41. msg, err = consumer.Receive(ctx)
  42. if err != nil {
  43. log.Fatal(err)
  44. }
  45. fmt.Println(msg.Payload())
  46. canc()

Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Name | Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName(). | | | Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | | | MaxPendingMessages| MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. | | | HashingScheme | HashingScheme change the HashingScheme used to chose the partition on where to publish a particular message. | JavaStringHash | | CompressionType | CompressionType set the compression type for the producer. | not compressed | | MessageRouter | MessageRouter set a custom message routing policy by passing an implementation of MessageRouter | | | DisableBatching | DisableBatching control whether automatic batching of messages is enabled for the producer. | false | | BatchingMaxPublishDelay | BatchingMaxPublishDelay set the time period within which the messages sent will be batched | 10ms | | BatchingMaxMessages | BatchingMaxMessages set the maximum number of messages permitted in a batch. | 1000 |

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here’s a basic example that uses channels:

  1. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  2. Topic: "topic-1",
  3. Type: pulsar.Shared,
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer consumer.Close()
  9. for i := 0; i < 10; i++ {
  10. msg, err := consumer.Receive(context.Background())
  11. if err != nil {
  12. log.Fatal(err)
  13. }
  14. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  15. msg.ID(), string(msg.Payload()))
  16. consumer.Ack(msg)
  17. }
  18. if err := consumer.Unsubscribe(); err != nil {
  19. log.Fatal(err)
  20. }

Consumer operations

Pulsar Go consumers have the following methods available:

Receive example

How to use regx consumer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. defer client.Close()
  5. p, err := client.CreateProducer(ProducerOptions{
  6. Topic: topicInRegex,
  7. DisableBatching: true,
  8. })
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer p.Close()
  13. topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
  14. opts := ConsumerOptions{
  15. TopicsPattern: topicsPattern,
  16. SubscriptionName: "regex-sub",
  17. }
  18. consumer, err := client.Subscribe(opts)
  19. if err != nil {
  20. log.Fatal(err)
  21. }
  22. defer consumer.Close()

How to use multi topics Consumer

  1. func newTopicName() string {
  2. return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
  3. }
  4. topic1 := "topic-1"
  5. topic2 := "topic-2"
  6. client, err := NewClient(ClientOptions{
  7. URL: "pulsar://localhost:6650",
  8. })
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. topics := []string{topic1, topic2}
  13. consumer, err := client.Subscribe(ConsumerOptions{
  14. Topics: topics,
  15. SubscriptionName: "multi-topic-sub",
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer consumer.Close()

How to use consumer listener

How to use consumer receive timeout

  1. client, err := NewClient(ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "test-topic-with-no-messages"
  9. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  10. defer cancel()
  11. // create consumer
  12. consumer, err := client.Subscribe(ConsumerOptions{
  13. Topic: topic,
  14. SubscriptionName: "my-sub1",
  15. Type: Shared,
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer consumer.Close()
  21. msg, err := consumer.Receive(ctx)
  22. fmt.Println(msg.Payload())
  23. if err != nil {
  24. log.Fatal(err)
  25. }

Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Topics | Specify a list of topics this consumer will subscribe on. Either a topic, a list of topics or a topics pattern are required when subscribing| | | TopicsPattern | Specify a regular expression to subscribe to multiple topics under the same namespace. Either a topic, a list of topics or a topics pattern are required when subscribing | | | AutoDiscoveryPeriod | Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern. | | | SubscriptionName | Specify the subscription name for this consumer. This argument is required when subscribing | | | Name | Set the consumer name | | | Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | | | Type | Select the subscription type to be used when subscribing to the topic. | Exclusive | | SubscriptionInitialPosition | InitialPosition at which the cursor will be set when subscribe | Latest | | DLQ | Configuration for Dead Letter Queue consumer policy. | no DLQ | | MessageChannel | Sets a MessageChannel for the consumer. When a message is received, it will be pushed to the channel for consumption | | | ReceiverQueueSize | Sets the size of the consumer receive queue. | 1000| | NackRedeliveryDelay | The delay after which to redeliver the messages that failed to be processed | 1min | | ReadCompacted | If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog of the topic | false | | ReplicateSubscriptionState | Mark the subscription as replicated to keep it in sync across clusters | false |

Readers

  1. Topic: "topic-1",
  2. StartMessageID: pulsar.EarliestMessageID(),
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer reader.Close()

Reader operations

Pulsar Go readers have the following methods available:

Reader 示例

How to use reader to read ‘next’ message

Here’s an example usage of a Go reader that uses the Next() method to process incoming messages:

  1. import (
  2. "context"
  3. "fmt"
  4. "log"
  5. "github.com/apache/pulsar-client-go/pulsar"
  6. )
  7. func main() {
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer client.Close()
  13. reader, err := client.CreateReader(pulsar.ReaderOptions{
  14. Topic: "topic-1",
  15. StartMessageID: pulsar.EarliestMessageID(),
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer reader.Close()
  21. for reader.HasNext() {
  22. msg, err := reader.Next(context.Background())
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  27. msg.ID(), string(msg.Payload()))
  28. }
  29. }

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. Here’s an example:

  1. lastSavedId := // Read last saved message id from external store as byte[]
  2. reader, err := client.CreateReader(pulsar.ReaderOptions{
  3. Topic: "my-golang-topic",
  4. StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
  5. })

How to use reader to read specific message

  1. client, err := NewClient(ClientOptions{
  2. URL: lookupURL,
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "topic-1"
  9. ctx := context.Background()
  10. // create producer
  11. producer, err := client.CreateProducer(ProducerOptions{
  12. Topic: topic,
  13. DisableBatching: true,
  14. })
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. defer producer.Close()
  19. // send 10 messages
  20. msgIDs := [10]MessageID{}
  21. for i := 0; i < 10; i++ {
  22. msgID, err := producer.Send(ctx, &ProducerMessage{
  23. Payload: []byte(fmt.Sprintf("hello-%d", i)),
  24. })
  25. assert.NoError(t, err)
  26. assert.NotNil(t, msgID)
  27. msgIDs[i] = msgID
  28. }
  29. // create reader on 5th message (not included)
  30. reader, err := client.CreateReader(ReaderOptions{
  31. Topic: topic,
  32. StartMessageID: msgIDs[4],
  33. })
  34. if err != nil {
  35. log.Fatal(err)
  36. }
  37. defer reader.Close()
  38. // receive the remaining 5 messages
  39. for i := 5; i < 10; i++ {
  40. msg, err := reader.Next(context.Background())
  41. if err != nil {
  42. log.Fatal(err)
  43. }
  44. // create reader on 5th message (included)
  45. readerInclusive, err := client.CreateReader(ReaderOptions{
  46. Topic: topic,
  47. StartMessageID: msgIDs[4],
  48. StartMessageIDInclusive: true,
  49. })
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. defer readerInclusive.Close()

Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Name | Name set the reader name. | | | Properties | Attach a set of application defined properties to the reader. This properties will be visible in the topic stats | | | StartMessageID | StartMessageID initial reader positioning is done by specifying a message id. | | | StartMessageIDInclusive | If true, the reader will start at the StartMessageID, included. Default is false and the reader will start from the “next” message | false | | MessageChannel | MessageChannel sets a MessageChannel for the consumer When a message is received, it will be pushed to the channel for consumption| | | ReceiverQueueSize | ReceiverQueueSize sets the size of the consumer receive queue. | 1000 | | SubscriptionRolePrefix| SubscriptionRolePrefix set the subscription role prefix. | “reader” | | ReadCompacted | If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic. ReadCompacted can only be enabled when reading from a persistent topic. | false|

The Pulsar Go client provides a ProducerMessage interface that you can use to construct messages to producer on Pulsar topics. Here’s an example message:

The following methods parameters are available for ProducerMessage objects:

TLS encryption and authentication

In order to use , you’ll need to configure your client to do so:

  • Use pulsar+ssl URL type
  • Set TLSTrustCertsFilePath to the path to the TLS certs used by your client and the Pulsar broker
  • Configure Authentication option

Here’s an example:

  1. opts := pulsar.ClientOptions{
  2. URL: "pulsar+ssl://my-cluster.com:6651",
  3. TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",