For example to write a client that streams data from the controller defined in the section of the documentation you can simply define a client that returns an unbound Publisher such as a RxJava or Reactor :

    HeadlineClient.java

    HeadlineClient.java

    1. import io.micronaut.http.MediaType
    2. import io.micronaut.http.annotation.Get
    3. import io.micronaut.http.client.annotation.Client
    4. import io.reactivex.Flowable
    5. @Client("/streaming")
    6. interface HeadlineClient {
    7. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
    8. Flowable<Headline> streamHeadlines() (2)
    9. }

    HeadlineClient.java

    1. import io.micronaut.http.MediaType
    2. import io.micronaut.http.annotation.Get
    3. import io.micronaut.http.client.annotation.Client
    4. import io.reactivex.Flowable
    5. @Client("/streaming")
    6. interface HeadlineClient {
    7. @Get(value = "/headlines", processes = [MediaType.APPLICATION_JSON_STREAM]) (1)
    8. fun streamHeadlines(): Flowable<Headline> (2)

    The following example shows how the previously defined HeadlineClient can be invoked from a JUnit test:

    Streaming HeadlineClient

    1. @Test
    2. public void testClientAnnotationStreaming() throws Exception {
    3. try( EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class) ) {
    4. HeadlineClient headlineClient = embeddedServer
    5. .getApplicationContext()
    6. .getBean(HeadlineClient.class); (1)
    7. Headline headline = firstHeadline.blockingGet(); (3)
    8. assertNotNull( headline );
    9. assertTrue( headline.getText().startsWith("Latest Headline") );
    10. }

    Streaming HeadlineClient

    1. void "test client annotation streaming"() throws Exception {
    2. when:
    3. HeadlineClient headlineClient = embeddedServer.getApplicationContext()
    4. .getBean(HeadlineClient.class) (1)
    5. Maybe<Headline> firstHeadline = headlineClient.streamHeadlines().firstElement() (2)
    6. Headline headline = firstHeadline.blockingGet() (3)
    7. then:
    8. null != headline
    9. headline.getText().startsWith("Latest Headline")
    10. }

    Streaming HeadlineClient

    The example defined in the previous section expects the server to respond with a stream of JSON objects and the content type to be application/x-json-stream. For example:

    1. {"title":"The Stand"}
    2. {"title":"The Shining"}

    The reason for this is simple, a sequence of JSON object is not, in fact, valid JSON and hence the response content type cannot be application/json. For the JSON to be valid it would have to return an array:

    A JSON Array

    1. [
    2. {"title":"The Stand"},
    3. {"title":"The Shining"}
    4. ]

    Micronaut’s client does however support streaming of both individual JSON objects via application/x-json-stream and also JSON arrays defined with application/json.

    If the server returns application/json and a non-single Publisher is returned (such as an or a Reactor Flux) then the client with stream the array elements as they become available.

    When streaming responses from servers, the underlying HTTP client will not apply the default readTimeout setting (which defaults to 10 seconds) of the HttpClientConfiguration since the delay between reads for streaming responses may differ from normal reads.

    Instead the read-idle-timeout setting (which defaults to 60 seconds) is used to dictate when a connection should be closed after becoming idle.

    If you are streaming data from a server that defines a longer delay than 60 seconds between items being sent to the client you should adjust the readIdleTimeout. The following configuration in application.yml demonstrates how:

    Adjusting the readIdleTimeout

    1. micronaut:
    2. http:
    3. client:
    4. read-idle-timeout: 5m

    The above example sets the readIdleTimeout to 5 minutes.

    You can use this client to stream SSE events from any server that emits them.

    The annotation also supports consuming SSE streams. For example, consider the following controller method that produces a stream of SSE events:

    SSE Controller

    1. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM) (1)
    2. Flowable<Event<Headline>> streamHeadlines() {
    3. return Flowable.<Event<Headline>>create((emitter) -> { (2)
    4. emitter.onNext(Event.of(headline));
    5. emitter.onComplete();
    6. }, BackpressureStrategy.BUFFER).repeat(100) (3)
    7. .delay(1, TimeUnit.SECONDS); (4)
    8. }

    SSE Controller

    SSE Controller

    1. @Get(value = "/headlines", processes = [MediaType.TEXT_EVENT_STREAM]) (1)
    2. internal fun streamHeadlines(): Flowable<Event<Headline>> {
    3. return Flowable.create<Event<Headline>>( { (2)
    4. emitter ->
    5. val headline = Headline()
    6. headline.text = "Latest Headline at " + ZonedDateTime.now()
    7. emitter.onNext(Event.of(headline))
    8. emitter.onComplete()
    9. }, BackpressureStrategy.BUFFER).repeat(100) (3)
    10. .delay(1, TimeUnit.SECONDS) (4)
    11. }

    Notice that the return type of the controller is also Event and that the Event.of method is used to create events to stream to the client.

    To define a client that consumes the events you simply have to define a method that processes MediaType.TEXT_EVENT_STREAM:

    SSE Client

    1. @Client("/streaming/sse")
    2. public interface HeadlineClient {
    3. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM)
    4. Flowable<Event<Headline>> streamHeadlines();
    5. }

    SSE Client

    1. @Client("/streaming/sse")
    2. interface HeadlineClient {
    3. @Get(value = "/headlines", processes = MediaType.TEXT_EVENT_STREAM)
    4. Flowable<Event<Headline>> streamHeadlines()
    5. }
    1. @Client("/streaming/sse")
    2. interface HeadlineClient {
    3. @Get(value = "/headlines", processes = [MediaType.TEXT_EVENT_STREAM])
    4. fun streamHeadlines(): Flowable<Event<Headline>>
    5. }

    The generic type of the or Flowable can be either an , in which case you will receive the full event object, or a POJO, in which case you will receive only the data contained within the event converted from JSON.