To emit events from the server you simply return a Reactive Streams that emits objects of type Event.
The itself could publish events from a background task, via an event system or whatever.
Imagine for an example a event stream of news headlines, you may define a data class as follows:
Headline
String title;
String description;
Headline() {}
Headline(String title, String description) {
this.title = title;
this.description = description;
}
}
Headline
To emit news headline events you can write a controller that returns a Publisher of instances using which ever Reactive library you prefer. The example below uses RxJava 2’s Flowable via the generate
method:
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.*;
import io.micronaut.http.sse.Event;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
@Controller("/headlines")
public class HeadlineController {
@ExecuteOn(TaskExecutors.IO)
@Get(produces = MediaType.TEXT_EVENT_STREAM)
String[] versions = new String[]{"1.0", "2.0"}; (2)
return Flowable.generate(() -> 0, (i, emitter) -> { (3)
if (i < versions.length) {
emitter.onNext( (4)
Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
);
} else {
emitter.onComplete(); (5)
}
return ++i;
});
}
}
Publishing Server Sent Events from a Controller
Publishing Server Sent Events from a Controller
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.sse.Event
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.reactivex.Emitter
import io.reactivex.Flowable
import io.reactivex.functions.BiFunction
import org.reactivestreams.Publisher
@Controller("/headlines")
@ExecuteOn(TaskExecutors.IO)
@Get(produces = [MediaType.TEXT_EVENT_STREAM])
fun index(): Publisher<Event<Headline>> { (1)
val versions = arrayOf("1.0", "2.0") (2)
return Flowable.generate<Event<Headline>, Int>(Callable<Int>{ 0 }, BiFunction { (3)
i: Int, emitter: Emitter<Event<Headline>> ->
var nextInt: Int = i
if (i < versions.size) {
emitter.onNext( (4)
Event.of<Headline>(Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
)
} else {
emitter.onComplete() (5)
}
++nextInt
})
}
}
You typically want to schedule SSE event streams on a separate executor. The previous example uses to execute the stream on the I/O executor. |
The above example will send back a response of type text/event-stream
and for each Event emitted the type previously will be converted to JSON resulting in responses such as:
You can use the methods of the interface to customize the Server Sent Event data sent back including associating event ids, comments, retry timeouts etc.