Pulsar C# client
你可以通过 dotnet CLI 或者 Visual Studio 来安装 Pulsar C# 客户端。 本章节描述了如何通过 dotnet CLI 来安装 Pulsar C# 客户端类库。 关于如何通过 Visual Studio 安装 Pulsar C# 客户端,请参阅 这里。
安装 ,它提供了 dotnet 命令行工具。 从 Visual Studio 2017 开始,dotnet CLI 将自动随任何与 .NET Core 相关的工作负载一起安装。
操作步骤
按照以下步骤安装 Pulsar C# 客户端库:
创建一个新项目。
为项目创建一个文件夹。
打开终端窗口并切换到新文件夹。
使用以下命令创建项目:
使用 命令来测试应用程序是否已经创建成功。
添加 DotPulsar Nuget 包。
使用以下命令安装
DotPulsar
包:dotnet add package DotPulsar
在命令完成后,打开
.csproj
文件来查看添加的引用:<ItemGroup>
<PackageReference Include="DotPulsar" Version="0.11.0" />
</ItemGroup>
Client
本节描述了Pulsar C# 客户端的一些配置示例。
创建客户端
本示例展示如何创建连接到本机的 Pulsar C# 客户端。
var client = PulsarClient.Builder().Build();
要使用 Builder 创建一个 Pulsar C# 客户端,你需要指定以下参数:
创建生产者
本节介绍如何创建生产者。
-
var producer = client.NewProducer()
.Topic("persistent://public/default/mytopic")
.Create();
不使用 Builder 创建生产者。
var options = new ProducerOptions("persistent://public/default/mytopic");
var producer = client.CreateProducer(options);
创建消费者
本节介绍如何创消费者。
使用 Builder 创建消费者。
var consumer = client.NewConsumer()
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
不使用 Builder 创建消费者。
var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic");
var consumer = client.CreateConsumer(options);
本节介绍如何创建 Reader。
使用 Builder 创建 Reader。
不使用 Builder 创建 Reader。
var options = new ReaderOptions(MessageId.Earliest, "persistent://public/default/mytopic");
配置加密策略
Pulsar C# 客户端支持四种加密策略:
EnforceUnencrypted
:总是使用未加密的连接。EnforceEncrypted
:总是使用加密的连接。PreferUnencrypted
:如果可以,优先使用未加密的连接。PreferEncrypted
:如果可以,优先使用加密连接。
此示例显示如何设置 EnforceUnencrypted
加密策略。
var client = PulsarClient.Builder()
.ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
.Build();
配置身份验证
目前,Pulsar C# 客户端支持 TLS (Transport Layer Security) 和 JWT (JSON Web Token) 认证。
如果你使用了 ,你需要准备好当时获得的证书和密钥。 从 Pulsar C# 客户端使用它们,请参考以下步骤:
创建一个未加密且无密码的 pfx 文件。
openssl pkcs12 -export -keypbe NONE -certpbe NONE -out admin.pfx -inkey admin.key.pem -in admin.cert.pem -passout pass:
使用 admin.pfx 文件创建一个 X509Certificate2 对象,并传递到 Pulsar C# 客户端。
var clientCertificate = new X509Certificate2("admin.pfx");
var client = PulsarClient.Builder()
.AuthenticateUsingClientCertificate(clientCertificate)
.Build();
生产者是一个关联到具体主题的进程,它会源源不断地将消息发布到 Pulsar broker 上。 本节描述了生产者的一些配置示例。
发送数据
此示例显示如何发送数据。
var data = Encoding.UTF8.GetBytes("Hello World");
await producer.Send(data);
发送带有自定义元数据的消息
-
var data = Encoding.UTF8.GetBytes("Hello World");
var messageId = await producer.NewMessage()
.Property("SomeKey", "SomeValue")
.Send(data);
不使用生成器发送带有自定义元数据的消息。
var data = Encoding.UTF8.GetBytes("Hello World");
var metadata = new MessageMetadata();
var messageId = await producer.Send(metadata, data));
消费者是一个通过订阅关联到某一主题,并接收消息的程序。 本节介绍一些消费者配置示例。
接收消息
此示例显示消费者如何从主题接收消息。
消息可以是逐个确认,也可以累积一起确认。 关于消息确认的详情,请参阅 章节。
逐条确认消息。
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
累积确认消息。
await consumer.AcknowledgeCumulative(message);
取消订阅主题
此示例显示消费者如何取消订阅某主题。
await consumer.Unsubscribe();
Note
Reader
Reader 实际上只是一个没有游标的消费者。 这意味着 Pulsar 不会跟踪消息的消费进度,也不会去做消息确认。
此示例展示 Reader 如何接收消息。
await foreach (var message in reader.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
本节介绍如何监测生产者、消费者和 reader 的状态。
监控生产者
下表列出了可监测的生产者状态。
此示例显示如何监测生产者状态。
private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
{
var state = ProducerState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
state = await producer.StateChangedFrom(state, cancellationToken);
var stateMessage = state switch
{
ProducerState.Connected => $"The producer is connected",
ProducerState.Disconnected => $"The producer is disconnected",
ProducerState.Closed => $"The producer has closed",
ProducerState.Faulted => $"The producer has faulted",
_ => $"The producer has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (producer.IsFinalState(state))
return;
}
}
监测消费者状态
下表列出了可监测的消费者状态。
此示例显示如何监测消费者状态。
private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
{
var state = ConsumerState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
var stateMessage = state switch
{
ConsumerState.Active => "The consumer is active",
ConsumerState.Inactive => "The consumer is inactive",
ConsumerState.Disconnected => "The consumer is disconnected",
ConsumerState.Closed => "The consumer has closed",
ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
ConsumerState.Faulted => "The consumer has faulted",
_ => $"The consumer has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (consumer.IsFinalState(state))
return;
}
}
监测 Reader 状态
此示例显示如何监测 Reader 状态。
private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
{
var state = ReaderState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
state = await reader.StateChangedFrom(state, cancellationToken);
var stateMessage = state switch
{
ReaderState.Connected => "The reader is connected",
ReaderState.Disconnected => "The reader is disconnected",
ReaderState.Closed => "The reader has closed",
ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
ReaderState.Faulted => "The reader has faulted",
_ => $"The reader has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (reader.IsFinalState(state))
return;
}