Streaming RPC

There are some scenarios when the client or server needs to send huge amount of data, which may grow over time or is too large to put into the RPC attachment. For example, it could be the replica or snapshot transmitting between different nodes in a distributed system. Although we could send data segmentation across multiple RPC between client and server, this will introduce the following problems:

  • If these RPCs are parallel, there is no guarantee on the order of the data at the receiving side, which leads to complicate code of reassembling.
  • If these RPCs are serial, we have to endure the latency of the network RTT for each RPC together with the process time, which is especially unpredictable.

In order to allow large packets to flow between client and server like a stream, we provide a new communication model: Streaming RPC. Streaming RPC enables users to establishes Stream which is a user-space connection between client and service. Multiple Streams can share the same TCP connection at the same time. The basic transmission unit on Stream is message. As a result, the sender can continuously write to messages to a Stream, while the receiver can read them out in the order of sending.

Streaming RPC ensures/provides:

  • The message order at the receiver is exactly the same as that of the sender
  • Boundary for messages
  • Flow control
  • Notification on timeout

We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to Head-of-line blocking problem. Please avoid putting huge data into single message until we provide automatic segmentation.

Create a Stream

Currently stream is established by the client only. A new Stream object is created in client and then is used to issues an RPC (through baidu_std protocol) to the specified service. The service could accept this stream by responding to the request without error, thus a Stream is created once the client receives the response successfully. Any error during this process fails the RPC and thus fails the Stream creation. Take the Linux environment as an example, the client creates a socket first (creates a Stream), and then try to establish a connection with the remote side by (establish a Stream through RPC). Finally the stream has been created once the remote side accept the request.

In the code we use to represent a Stream, which is the key ID to pass when reading, writing, closing the Stream.

Accept a Stream

If a Stream is attached inside the request of an RPC, the service can accept the Stream by StreamAccept. On success this function fill the created Stream into response_stream, which can be used to send message to the client.

  1. // [Called at the server side]
  2. // Accept the Stream. If client didn't create a Stream with the request
  3. // (cntl.has_remote_stream() returns false), this method would fail.
  4. // Return 0 on success, -1 otherwise.
  5. int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options);

Read from a Stream

Write to a Stream

  1. // Write |message| into |stream_id|. The remote-side handler will received the
  2. // message by the written order
  3. // Returns 0 on success, errno otherwise
  4. // - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
  5. // - EINVAL: |stream_id| is invalied or has been closed
  6. int StreamWrite(StreamId stream_id, const butil::IOBuf &message);

Flow Control

When the amount of unacknowledged data reaches the limit, the Write operation at the sender will fail with EAGAIN immediately. At this moment, you should wait for the receiver to consume the data synchronously or asynchronously.

Close a Stream

  1. // Close |stream_id|, after this function is called:
  2. // - All the following |StreamWrite| would fail
  3. // - |StreamWait| wakes up immediately.
  4. // - Both sides |on_closed| would be notifed after all the pending buffers have
  5. // been received
  6. // This function could be called multiple times without side-effects