Pulsar C# client

    你可以通过 dotnet CLI 或者 Visual Studio 来安装 Pulsar C# 客户端。 本章节描述了如何通过 dotnet CLI 来安装 Pulsar C# 客户端类库。 关于如何通过 Visual Studio 安装 Pulsar C# 客户端,请参阅 这里

    安装 ,它提供了 dotnet 命令行工具。 从 Visual Studio 2017 开始,dotnet CLI 将自动随任何与 .NET Core 相关的工作负载一起安装。

    操作步骤

    按照以下步骤安装 Pulsar C# 客户端库:

    1. 创建一个新项目。

      1. 为项目创建一个文件夹。

      2. 打开终端窗口并切换到新文件夹。

      3. 使用以下命令创建项目:

      4. 使用 命令来测试应用程序是否已经创建成功。

    2. 添加 DotPulsar Nuget 包。

      1. 使用以下命令安装 DotPulsar 包:

        1. dotnet add package DotPulsar
      2. 在命令完成后,打开 .csproj 文件来查看添加的引用:

        1. <ItemGroup>
        2. <PackageReference Include="DotPulsar" Version="0.11.0" />
        3. </ItemGroup>

    Client

    本节描述了Pulsar C# 客户端的一些配置示例。

    创建客户端

    本示例展示如何创建连接到本机的 Pulsar C# 客户端。

    1. var client = PulsarClient.Builder().Build();

    要使用 Builder 创建一个 Pulsar C# 客户端,你需要指定以下参数:

    创建生产者

    本节介绍如何创建生产者。

      1. var producer = client.NewProducer()
      2. .Topic("persistent://public/default/mytopic")
      3. .Create();
    • 不使用 Builder 创建生产者。

      1. var options = new ProducerOptions("persistent://public/default/mytopic");
      2. var producer = client.CreateProducer(options);

    创建消费者

    本节介绍如何创消费者。

    • 使用 Builder 创建消费者。

      1. var consumer = client.NewConsumer()
      2. .SubscriptionName("MySubscription")
      3. .Topic("persistent://public/default/mytopic")
      4. .Create();
    • 不使用 Builder 创建消费者。

      1. var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic");
      2. var consumer = client.CreateConsumer(options);

    本节介绍如何创建 Reader。

    • 使用 Builder 创建 Reader。

    • 不使用 Builder 创建 Reader。

      1. var options = new ReaderOptions(MessageId.Earliest, "persistent://public/default/mytopic");

    配置加密策略

    Pulsar C# 客户端支持四种加密策略:

    • EnforceUnencrypted:总是使用未加密的连接。
    • EnforceEncrypted:总是使用加密的连接。
    • PreferUnencrypted:如果可以,优先使用未加密的连接。
    • PreferEncrypted:如果可以,优先使用加密连接。

    此示例显示如何设置 EnforceUnencrypted 加密策略。

    1. var client = PulsarClient.Builder()
    2. .ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
    3. .Build();

    配置身份验证

    目前,Pulsar C# 客户端支持 TLS (Transport Layer Security) 和 JWT (JSON Web Token) 认证。

    如果你使用了 ,你需要准备好当时获得的证书和密钥。 从 Pulsar C# 客户端使用它们,请参考以下步骤:

    1. 创建一个未加密且无密码的 pfx 文件。

      1. openssl pkcs12 -export -keypbe NONE -certpbe NONE -out admin.pfx -inkey admin.key.pem -in admin.cert.pem -passout pass:
    2. 使用 admin.pfx 文件创建一个 X509Certificate2 对象,并传递到 Pulsar C# 客户端。

      1. var clientCertificate = new X509Certificate2("admin.pfx");
      2. var client = PulsarClient.Builder()
      3. .AuthenticateUsingClientCertificate(clientCertificate)
      4. .Build();

    生产者是一个关联到具体主题的进程,它会源源不断地将消息发布到 Pulsar broker 上。 本节描述了生产者的一些配置示例。

    发送数据

    此示例显示如何发送数据。

    1. var data = Encoding.UTF8.GetBytes("Hello World");
    2. await producer.Send(data);

    发送带有自定义元数据的消息

      1. var data = Encoding.UTF8.GetBytes("Hello World");
      2. var messageId = await producer.NewMessage()
      3. .Property("SomeKey", "SomeValue")
      4. .Send(data);
    • 不使用生成器发送带有自定义元数据的消息。

      1. var data = Encoding.UTF8.GetBytes("Hello World");
      2. var metadata = new MessageMetadata();
      3. var messageId = await producer.Send(metadata, data));

    消费者是一个通过订阅关联到某一主题,并接收消息的程序。 本节介绍一些消费者配置示例。

    接收消息

    此示例显示消费者如何从主题接收消息。

    消息可以是逐个确认,也可以累积一起确认。 关于消息确认的详情,请参阅 章节。

    • 逐条确认消息。

      1. await foreach (var message in consumer.Messages())
      2. {
      3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
      4. }
    • 累积确认消息。

      1. await consumer.AcknowledgeCumulative(message);

    取消订阅主题

    此示例显示消费者如何取消订阅某主题。

    1. await consumer.Unsubscribe();

    Note

    Reader

    Reader 实际上只是一个没有游标的消费者。 这意味着 Pulsar 不会跟踪消息的消费进度,也不会去做消息确认。

    此示例展示 Reader 如何接收消息。

    1. await foreach (var message in reader.Messages())
    2. {
    3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
    4. }

    本节介绍如何监测生产者、消费者和 reader 的状态。

    监控生产者

    下表列出了可监测的生产者状态。

    此示例显示如何监测生产者状态。

    1. private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
    2. {
    3. var state = ProducerState.Disconnected;
    4. while (!cancellationToken.IsCancellationRequested)
    5. {
    6. state = await producer.StateChangedFrom(state, cancellationToken);
    7. var stateMessage = state switch
    8. {
    9. ProducerState.Connected => $"The producer is connected",
    10. ProducerState.Disconnected => $"The producer is disconnected",
    11. ProducerState.Closed => $"The producer has closed",
    12. ProducerState.Faulted => $"The producer has faulted",
    13. _ => $"The producer has an unknown state '{state}'"
    14. };
    15. Console.WriteLine(stateMessage);
    16. if (producer.IsFinalState(state))
    17. return;
    18. }
    19. }

    监测消费者状态

    下表列出了可监测的消费者状态。

    此示例显示如何监测消费者状态。

    1. private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
    2. {
    3. var state = ConsumerState.Disconnected;
    4. while (!cancellationToken.IsCancellationRequested)
    5. {
    6. var stateMessage = state switch
    7. {
    8. ConsumerState.Active => "The consumer is active",
    9. ConsumerState.Inactive => "The consumer is inactive",
    10. ConsumerState.Disconnected => "The consumer is disconnected",
    11. ConsumerState.Closed => "The consumer has closed",
    12. ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
    13. ConsumerState.Faulted => "The consumer has faulted",
    14. _ => $"The consumer has an unknown state '{state}'"
    15. };
    16. Console.WriteLine(stateMessage);
    17. if (consumer.IsFinalState(state))
    18. return;
    19. }
    20. }

    监测 Reader 状态

    此示例显示如何监测 Reader 状态。

    1. private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
    2. {
    3. var state = ReaderState.Disconnected;
    4. while (!cancellationToken.IsCancellationRequested)
    5. {
    6. state = await reader.StateChangedFrom(state, cancellationToken);
    7. var stateMessage = state switch
    8. {
    9. ReaderState.Connected => "The reader is connected",
    10. ReaderState.Disconnected => "The reader is disconnected",
    11. ReaderState.Closed => "The reader has closed",
    12. ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
    13. ReaderState.Faulted => "The reader has faulted",
    14. _ => $"The reader has an unknown state '{state}'"
    15. };
    16. Console.WriteLine(stateMessage);
    17. if (reader.IsFinalState(state))
    18. return;
    19. }