To emit events from the server you simply return a Reactive Streams that emits objects of type Event.

    The itself could publish events from a background task, via an event system or whatever.

    Imagine for an example a event stream of news headlines, you may define a data class as follows:

    Headline

    1. String title;
    2. String description;
    3. Headline() {}
    4. Headline(String title, String description) {
    5. this.title = title;
    6. this.description = description;
    7. }
    8. }

    Headline

    To emit news headline events you can write a controller that returns a Publisher of instances using which ever Reactive library you prefer. The example below uses RxJava 2’s Flowable via the generate method:

    1. import io.micronaut.http.MediaType;
    2. import io.micronaut.http.annotation.*;
    3. import io.micronaut.http.sse.Event;
    4. import io.micronaut.scheduling.TaskExecutors;
    5. import io.micronaut.scheduling.annotation.ExecuteOn;
    6. import io.reactivex.Flowable;
    7. import org.reactivestreams.Publisher;
    8. @Controller("/headlines")
    9. public class HeadlineController {
    10. @ExecuteOn(TaskExecutors.IO)
    11. @Get(produces = MediaType.TEXT_EVENT_STREAM)
    12. String[] versions = new String[]{"1.0", "2.0"}; (2)
    13. return Flowable.generate(() -> 0, (i, emitter) -> { (3)
    14. if (i < versions.length) {
    15. emitter.onNext( (4)
    16. Event.of(new Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
    17. );
    18. } else {
    19. emitter.onComplete(); (5)
    20. }
    21. return ++i;
    22. });
    23. }
    24. }

    Publishing Server Sent Events from a Controller

    Publishing Server Sent Events from a Controller

    1. import io.micronaut.http.MediaType
    2. import io.micronaut.http.annotation.Controller
    3. import io.micronaut.http.annotation.Get
    4. import io.micronaut.http.sse.Event
    5. import io.micronaut.scheduling.TaskExecutors
    6. import io.micronaut.scheduling.annotation.ExecuteOn
    7. import io.reactivex.Emitter
    8. import io.reactivex.Flowable
    9. import io.reactivex.functions.BiFunction
    10. import org.reactivestreams.Publisher
    11. @Controller("/headlines")
    12. @ExecuteOn(TaskExecutors.IO)
    13. @Get(produces = [MediaType.TEXT_EVENT_STREAM])
    14. fun index(): Publisher<Event<Headline>> { (1)
    15. val versions = arrayOf("1.0", "2.0") (2)
    16. return Flowable.generate<Event<Headline>, Int>(Callable<Int>{ 0 }, BiFunction { (3)
    17. i: Int, emitter: Emitter<Event<Headline>> ->
    18. var nextInt: Int = i
    19. if (i < versions.size) {
    20. emitter.onNext( (4)
    21. Event.of<Headline>(Headline("Micronaut " + versions[i] + " Released", "Come and get it"))
    22. )
    23. } else {
    24. emitter.onComplete() (5)
    25. }
    26. ++nextInt
    27. })
    28. }
    29. }
    You typically want to schedule SSE event streams on a separate executor. The previous example uses to execute the stream on the I/O executor.

    The above example will send back a response of type text/event-stream and for each Event emitted the type previously will be converted to JSON resulting in responses such as:

    You can use the methods of the interface to customize the Server Sent Event data sent back including associating event ids, comments, retry timeouts etc.