What you don’t want to do is block the underlying Netty event loop within your filter, instead you want the filter to proceed with execution once any I/O is complete.
As an example, consider the following example that uses RxJava to compose an I/O operation:
A TraceService Example using RxJava
A TraceService Example using RxJava
import io.micronaut.http.HttpRequest
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.inject.Singleton
@Singleton
class TraceService {
private static final Logger LOG = LoggerFactory.getLogger(TraceService.class)
Flowable<Boolean> trace(HttpRequest<?> request) {
Flowable.fromCallable({ -> (1)
if (LOG.isDebugEnabled()) {
LOG.debug("Tracing request: " + request.getUri())
}
// trace logic here, potentially performing I/O (2)
return true
}).subscribeOn(Schedulers.io()) (3)
}
}
import io.micronaut.http.HttpRequest
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
import org.slf4j.LoggerFactory
import javax.inject.Singleton
@Singleton
private val LOG = LoggerFactory.getLogger(TraceService::class.java)
internal fun trace(request: HttpRequest<*>): Flowable<Boolean> {
return Flowable.fromCallable {
(1)
if (LOG.isDebugEnabled) {
LOG.debug("Tracing request: " + request.uri)
}
// trace logic here, potentially performing I/O (2)
true
}.subscribeOn(Schedulers.io()) (3)
}
}
You can then inject this implementation into your filter definition:
An Example HttpServerFilter
An Example HttpServerFilter
import io.micronaut.http.HttpRequest
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.Filter
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import org.reactivestreams.Publisher
@Filter("/hello/**") (1)
class TraceFilter implements HttpServerFilter { (2)
private final TraceService traceService
TraceFilter(TraceService traceService) { (3)
this.traceService = traceService
}
}
An Example HttpServerFilter
import io.micronaut.http.HttpRequest
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import org.reactivestreams.Publisher
@Filter("/hello/**") (1)
class TraceFilter((2)
private val traceService: TraceService)(3)
: HttpServerFilter {
}
The doFilter implementation
The doFilter implementation
@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
traceService.trace(request) (1)
.switchMap({ aBoolean -> chain.proceed(request) }) (2)
.doOnNext({ res -> (3)
res.getHeaders().add("X-Trace-Enabled", "true")
})
}
The doFilter implementation
override fun doFilter(request: HttpRequest<*>, chain: ServerFilterChain): Publisher<MutableHttpResponse<*>> {
return traceService.trace(request) (1)
.switchMap { aBoolean -> chain.proceed(request) } (2)
.doOnNext { res ->
(3)
res.headers.add("X-Trace-Enabled", "true")
}
The previous example demonstrates some key concepts such as executing logic in a non-blocking matter before proceeding with the request and modifying the outgoing response.