In order to do JSON streaming you should on the server side declare a controller method that returns a application/x-json-stream of JSON objects. For example:

    Streaming JSON on the Server

    Streaming JSON on the Server

    1. import io.micronaut.http.MediaType
    2. import io.micronaut.http.annotation.Controller
    3. import io.micronaut.http.annotation.Get
    4. import io.reactivex.Flowable
    5. import java.time.ZonedDateTime
    6. import java.util.concurrent.TimeUnit
    7. @Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
    8. Flowable<Headline> streamHeadlines() {
    9. Flowable.fromCallable({ (2)
    10. Headline headline = new Headline()
    11. headline.setText("Latest Headline at " + ZonedDateTime.now())
    12. return headline
    13. }).repeat(100) (3)
    14. .delay(1, TimeUnit.SECONDS) (4)
    15. }

    Then on the client simply subscribe to the stream using and every time the server emits a JSON object the client will decode and consume it:

    Streaming JSON on the Client

    1. Flowable<Headline> headlineStream = client.jsonStream(GET("/streaming/headlines"), Headline.class); (1)
    2. CompletableFuture<Headline> future = new CompletableFuture<>(); (2)
    3. @Override
    4. public void onSubscribe(Subscription s) {
    5. s.request(1); (3)
    6. }
    7. @Override
    8. public void onNext(Headline headline) {
    9. System.out.println("Received Headline = " + headline.getText());
    10. future.complete(headline); (4)
    11. }
    12. @Override
    13. public void onError(Throwable t) {
    14. future.completeExceptionally(t); (5)
    15. }
    16. @Override
    17. public void onComplete() {
    18. // no-op (6)
    19. });

    Streaming JSON on the Client

    Streaming JSON on the Client

    1. val future = CompletableFuture<Headline>() (2)
    2. headlineStream.subscribe(object : Subscriber<Headline> {
    3. override fun onSubscribe(s: Subscription) {
    4. s.request(1) (3)
    5. }
    6. override fun onNext(headline: Headline) {
    7. println("Received Headline = " + headline.text!!)
    8. future.complete(headline) (4)
    9. }
    10. override fun onError(t: Throwable) {
    11. future.completeExceptionally(t) (5)
    12. }
    13. override fun onComplete() {
    14. // no-op (6)
    15. })