您可以使用 Pulsar Go客户端 来创建 Pulsar , consumers , 和 .
这里也提供 api 文档.
有关标准 api 文档, 请参阅.
You can install the library locally using go get
.
Once installed locally, you can import it into your project:
import "github.com/apache/pulsar-client-go/pulsar"
连接URL
To connect to Pulsar using client libraries, you need to specify a URL.
Pulsar protocol URLs are assigned to specific clusters, use the pulsar
scheme and have a default port of 6650. Here’s an example for localhost
:
pulsar://localhost:6650
A URL for a production Pulsar cluster may look something like this:
pulsar://pulsar.us-west.example.com:6650
If you’re using TLS authentication, the URL will look like something like this:
pulsar+ssl://pulsar.us-west.example.com:6651
创建客户端
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
The following configurable parameters are available for Pulsar clients:
Name | Description | Default | :———— | :————— |:————— | | URL | Configure the service URL for the Pulsar service. This parameter is required | | | ConnectionTimeout | Timeout for the establishment of a TCP connection | 30s | | OperationTimeout| Set the operation timeout. Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed| 30s| | Authentication | Configure the authentication provider. Example: Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")
| no authentication | | TLSTrustCertsFilePath | Set the path to the trusted TLS certificate file | | | TLSAllowInsecureConnection | Configure whether the Pulsar client accept untrusted TLS certificate from broker | false | | TLSValidateHostname | Configure whether the Pulsar client verify the validity of the host name from broker | false |
Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions
object. Here’s an example:
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Producer operations
Pulsar Go producers have the following methods available:
生产者示例
How to use message router in producer
How to use delay relative in producer
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topicName := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "subName",
Type: Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("test")),
DeliverAfter: 3 * time.Second,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(ID)
ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()
ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
msg, err = consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Payload())
canc()
Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Name | Name specify a name for the producer. If not assigned, the system will generate a globally unique name which can be access with Producer.ProducerName(). | | | Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | | | MaxPendingMessages| MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. | | | HashingScheme | HashingScheme change the HashingScheme
used to chose the partition on where to publish a particular message. | JavaStringHash | | CompressionType | CompressionType set the compression type for the producer. | not compressed | | MessageRouter | MessageRouter set a custom message routing policy by passing an implementation of MessageRouter | | | DisableBatching | DisableBatching control whether automatic batching of messages is enabled for the producer. | false | | BatchingMaxPublishDelay | BatchingMaxPublishDelay set the time period within which the messages sent will be batched | 10ms | | BatchingMaxMessages | BatchingMaxMessages set the maximum number of messages permitted in a batch. | 1000 |
Consumers
Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions
object. Here’s a basic example that uses channels:
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
Consumer operations
Pulsar Go consumers have the following methods available:
Receive example
How to use regx consumer
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
p, err := client.CreateProducer(ProducerOptions{
Topic: topicInRegex,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer p.Close()
topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace)
opts := ConsumerOptions{
TopicsPattern: topicsPattern,
SubscriptionName: "regex-sub",
}
consumer, err := client.Subscribe(opts)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
How to use multi topics Consumer
func newTopicName() string {
return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
}
topic1 := "topic-1"
topic2 := "topic-2"
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
topics := []string{topic1, topic2}
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "multi-topic-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
How to use consumer listener
import (
"fmt"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
channel := make(chan pulsar.ConsumerMessage, 100)
options := pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
}
options.MessageChannel = channel
consumer, err := client.Subscribe(options)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// Receive messages from channel. The channel returns a struct which contains message and the consumer from where
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
// shared across multiple consumers as well
for cm := range channel {
msg := cm.Message
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
}
How to use consumer receive timeout
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := "test-topic-with-no-messages"
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
msg, err := consumer.Receive(ctx)
fmt.Println(msg.Payload())
if err != nil {
log.Fatal(err)
}
Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Topics | Specify a list of topics this consumer will subscribe on. Either a topic, a list of topics or a topics pattern are required when subscribing| | | TopicsPattern | Specify a regular expression to subscribe to multiple topics under the same namespace. Either a topic, a list of topics or a topics pattern are required when subscribing | | | AutoDiscoveryPeriod | Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern. | | | SubscriptionName | Specify the subscription name for this consumer. This argument is required when subscribing | | | Name | Set the consumer name | | | Properties | Properties attach a set of application defined properties to the producer This properties will be visible in the topic stats | | | Type | Select the subscription type to be used when subscribing to the topic. | Exclusive | | SubscriptionInitialPosition | InitialPosition at which the cursor will be set when subscribe | Latest | | DLQ | Configuration for Dead Letter Queue consumer policy. | no DLQ | | MessageChannel | Sets a MessageChannel
for the consumer. When a message is received, it will be pushed to the channel for consumption | | | ReceiverQueueSize | Sets the size of the consumer receive queue. | 1000| | NackRedeliveryDelay | The delay after which to redeliver the messages that failed to be processed | 1min | | ReadCompacted | If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog of the topic | false | | ReplicateSubscriptionState | Mark the subscription as replicated to keep it in sync across clusters | false |
Readers
Reader operations
Pulsar Go readers have the following methods available:
Reader 示例
How to use reader to read ‘next’ message
Here’s an example usage of a Go reader that uses the Next()
method to process incoming messages:
import (
"context"
"log"
"github.com/apache/pulsar-client-go/pulsar"
)
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
}
In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage
). The reader can also begin reading from the latest message (pulsar.LatestMessage
) or some other message ID specified by bytes using the DeserializeMessageID
function, which takes a byte array and returns a MessageID
object. Here’s an example:
lastSavedId := // Read last saved message id from external store as byte[]
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "my-golang-topic",
StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
})
How to use reader to read specific message
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := "topic-1"
ctx := context.Background()
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// send 10 messages
msgIDs := [10]MessageID{}
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
msgIDs[i] = msgID
}
// create reader on 5th message (not included)
reader, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
// receive the remaining 5 messages
for i := 5; i < 10; i++ {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: msgIDs[4],
StartMessageIDInclusive: true,
})
if err != nil {
log.Fatal(err)
}
defer readerInclusive.Close()
Name | Description | Default | :———— | :————— |:————— | | Topic | Topic specify the topic this consumer will subscribe to. This argument is required when constructing the reader. | | | Name | Name set the reader name. | | | Properties | Attach a set of application defined properties to the reader. This properties will be visible in the topic stats | | | StartMessageID | StartMessageID initial reader positioning is done by specifying a message id. | | | StartMessageIDInclusive | If true, the reader will start at the StartMessageID
, included. Default is false
and the reader will start from the “next” message | false | | MessageChannel | MessageChannel sets a MessageChannel
for the consumer When a message is received, it will be pushed to the channel for consumption| | | ReceiverQueueSize | ReceiverQueueSize sets the size of the consumer receive queue. | 1000 | | SubscriptionRolePrefix| SubscriptionRolePrefix set the subscription role prefix. | “reader” | | ReadCompacted | If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog of the topic. ReadCompacted can only be enabled when reading from a persistent topic. | false|
The Pulsar Go client provides a ProducerMessage
interface that you can use to construct messages to producer on Pulsar topics. Here’s an example message:
msg := pulsar.ProducerMessage{
Payload: []byte("Here is some message data"),
Key: "message-key",
Properties: map[string]string{
"foo": "bar",
},
EventTime: time.Now(),
ReplicationClusters: []string{"cluster1", "cluster3"},
}
if _, err := producer.send(msg); err != nil {
log.Fatalf("Could not publish message due to: %v", err)
}
The following methods parameters are available for ProducerMessage
objects:
TLS encryption and authentication
In order to use , you’ll need to configure your client to do so:
- Use
pulsar+ssl
URL type - Set
TLSTrustCertsFilePath
to the path to the TLS certs used by your client and the Pulsar broker - Configure
Authentication
option
Here’s an example:
opts := pulsar.ClientOptions{
URL: "pulsar+ssl://my-cluster.com:6651",
TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
}
OAuth2 authentication
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "https://dev-kt-aa9ne.us.auth0.com/oauth/token",
"audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/",
"privateKey": "/path/to/privateKey",
"clientId": "0Xx...Yyxeny",
})
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "puslar://my-cluster:6650",
})