In order to do JSON streaming you should on the server side declare a controller method that returns a application/x-json-stream
of JSON objects. For example:
Streaming JSON on the Server
Streaming JSON on the Server
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.reactivex.Flowable
import java.time.ZonedDateTime
import java.util.concurrent.TimeUnit
@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) (1)
Flowable<Headline> streamHeadlines() {
Flowable.fromCallable({ (2)
Headline headline = new Headline()
headline.setText("Latest Headline at " + ZonedDateTime.now())
return headline
}).repeat(100) (3)
.delay(1, TimeUnit.SECONDS) (4)
}
Then on the client simply subscribe to the stream using and every time the server emits a JSON object the client will decode and consume it:
Streaming JSON on the Client
Flowable<Headline> headlineStream = client.jsonStream(GET("/streaming/headlines"), Headline.class); (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); (2)
@Override
public void onSubscribe(Subscription s) {
s.request(1); (3)
}
@Override
public void onNext(Headline headline) {
System.out.println("Received Headline = " + headline.getText());
future.complete(headline); (4)
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t); (5)
}
@Override
public void onComplete() {
// no-op (6)
});
Streaming JSON on the Client
Streaming JSON on the Client
val future = CompletableFuture<Headline>() (2)
headlineStream.subscribe(object : Subscriber<Headline> {
override fun onSubscribe(s: Subscription) {
s.request(1) (3)
}
override fun onNext(headline: Headline) {
println("Received Headline = " + headline.text!!)
future.complete(headline) (4)
}
override fun onError(t: Throwable) {
future.completeExceptionally(t) (5)
}
override fun onComplete() {
// no-op (6)
})