Vert.x Rx

    • observable sequences:可观察序列
    • Rxified:Rx化
    • operator:操作符
    • lift:变换
    • flow:流
    • read stream:可读流
    • write stream:可写流
    • observer:观察者
    • subscriber:订阅者
    • item:对象
    • handler:(事件)处理器
    • timer:定时器
    • subscription(n.):订阅
    • unmarshall:重组

    Vert.x 与 RxJava 集成起来很自然:它使得无论什么时候,只要我们能使用流和异步结果,就能使用 Observable。

    要使用 Vert.x 的 RxJava API,有两种方式:

    • 通过原始的 Vert.x API 辅以类,这个辅助类提供了用于 Vert.x Core API 和 RxJava API 之间互相转化的静态方法。
    • 通过基于 Vert.x Core API 增强的Rx化的 Vert.x API。

    RxJava 中 Observable 的概念和 Vert.x 中 ReadStream 类是一对完美的匹配:都提供了一个对象流。

    静态方法 RxHelper.toObservable 用于将 Vert.x 可读流转换为 rx.Observable

    而 Rx化的 Vert.x API 在 类上提供了 toObservable 方法:

    1. FileSystem fs = vertx.fileSystem();
    2. fs.open("/data.txt", new OpenOptions(), result -> {
    3. AsyncFile file = result.result();
    4. Observable<Buffer> observable = file.toObservable();
    5. observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
    6. });

    这样的 Observable 是所谓 hot observable,即不管是否有订阅,它们都会产生通知。
    ReadStream 是否能自发地发射数据,这取决于它的具体实现:
    当订阅动作发生时,适配器会调用 来设置它的 handler 。
    某些 ReadStream 实现会在这个调用之后开始发射事件,而其他的则无关于 handler 是否设置:

    • AsyncFile 在 handler 设置后开始产生 buffer 事件
    • HttpServerRequest 则不依赖于此(即 如果 handler 未设置,buffer 可能会丢失)

    在上述所有情形中,订阅 Observable 都是安全的。原因在于不管 event loop 还是 worker verticle 都不会被并发执行,所以订阅一定是在 handler 开始发射数据之前发生。

    当你想延迟订阅时,需要先暂停 ReadStream ,并在之后恢复它。

    1. server.requestHandler(request -> {
    2. if (request.method() == HttpMethod.POST) {
    3. // 暂停接收 buffer
    4. request.pause();
    5. checkAuth(res -> {
    6. // 现在可以重新接收 buffer
    7. request.resume();
    8. if (res.succeeded()) {
    9. Observable<Buffer> observable = request.toObservable();
    10. observable.subscribe(buff -> {
    11. // 获得 buffer
    12. });
    13. }
    14. });
    15. }
    16. });

    同样的,将一个 Observable 转变为 Vert.x ReadStream 也是可以的。

    静态方法 RxHelper.toReadStream 用于将 rx.Observable 转换为 Vert.x 可读流:

    1. Observable<Buffer> observable = getObservable();
    2. ReadStream<Buffer> readStream = RxHelper.toReadStream(observable);
    3. Pump pump = Pump.pump(readStream, response);
    4. pump.start();

    处理器支持

    RxHelper 类可以创建 对象。这是一个 Observable 对象,它的 toHandler 方法会返回 Handler<T> 接口的实现:

    1. ObservableHandler<Long> observable = RxHelper.observableHandler();
    2. observable.subscribe(id -> {
    3. // Fired
    4. });
    5. vertx.setTimer(1000, observable.toHandler());

    Rx化的 Vert.x API 未提供针对 Handler 的 API。

    异步结果支持

    以一个现有的 Vert.x Handler<AsyncResult<T>> 对象为基础,你可以创建一个 RxJava Subscriber,然后将其注册在 ObservableSingle 上:

    1. observable.subscribe(RxHelper.toSubscriber(handler1));
    2. // Subscribe to a Single
    3. single.subscribe(RxHelper.toSubscriber(handler2));

    在构造(construct)发生时,作为异步方法的最后一个参数的 Vert.x Handler<AsyncResult<T>> 可以被映射为单个元素的 Observable

    • 当回调成功时,观察者的 onNext 方法将被调用,参数就是这个对象;且其后 onComplete 方法会立即被调用。
    • 当回调失败时,观察者的 onError 方法将被调用。

    RxHelper.observableFuture 方法可以创建一个 对象。这这是一个 Observable 对象,它的 toHandler 方法会返回 Handler<AsyncResult<T>> 接口的实现:

    1. ObservableFuture<HttpServer> observable = RxHelper.observableFuture();
    2. observable.subscribe(
    3. server -> {
    4. // Server is listening
    5. },
    6. failure -> {
    7. // Server could not start
    8. }
    9. );
    10. vertx.createHttpServer(new HttpServerOptions().
    11. setPort(1234).
    12. setHost("localhost")
    13. ).listen(observable.toHandler());

    我们可以从 ObservableFuture<Server> 中获取单个 HttpServer 对象。如果端口监听失败,订阅者将会得到通知。

    方法为观察者(Observer)和事件处理器(Handler)做了适配:

    1. Observer<HttpServer> observer = new Observer<HttpServer>() {
    2. @Override
    3. public void onNext(HttpServer o) {
    4. }
    5. @Override
    6. public void onError(Throwable e) {
    7. }
    8. @Override
    9. }
    10. };
    11. Handler<AsyncResult<HttpServer>> handler = RxHelper.toFuture(observer);
    1. Action1<HttpServer> onNext = httpServer -> {};
    2. Action1<Throwable> onError = httpServer -> {};
    3. Action0 onComplete = () -> {};
    4. Handler<AsyncResult<HttpServer>> handler1 = RxHelper.toFuture(onNext);
    5. Handler<AsyncResult<HttpServer>> handler2 = RxHelper.toFuture(onNext, onError);
    6. Handler<AsyncResult<HttpServer>> handler3 = RxHelper.toFuture(onNext, onError, onComplete);

    Rx化的 Vert.x API 复制了类似的每一个方法,并冠以 rx 的前缀,它们都返回 RxJava 的 对象:

    1. Single<HttpServer> single = vertx
    2. .createHttpServer()
    3. .rxListen(1234, "localhost");
    4. // 订阅绑定端口的事件
    5. single.
    6. subscribe(
    7. server -> {
    8. // Server is listening
    9. },
    10. failure -> {
    11. // Server could not start
    12. }
    13. );

    这样的 Single 是 “冷的”(cold),对应的 API 方法将在注册时被调用。

    有时候 Reactive 扩展库需要执行一些可调度的操作,例如 Observable#timer 方法将创建一个能周期性发射事件的定时器并返回之。缺省情况下,这些可调度的操作由 RxJava 管理,这意味着定时器线程并非 Vert.x 线程,因此(这些操作)并不是在 Vert.x Event Loop 线程上执行的。

    在 RxJava 中,有些操作通常会有接受一个 rx.Scheduler 参数的重载方法用于设定 SchedulerRxHelper 类提供了一个 RxHelper.scheduler 方法,其返回的调度器可供 RxJava 的这些方法使用。比如:

    1. Scheduler scheduler = RxHelper.scheduler(vertx);
    2. Observable<Long> timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS, scheduler);

    对于阻塞型的可调度操作(blocking scheduled actions),我们可以通过 方法获得适用的调度器:

    1. Scheduler scheduler = RxHelper.blockingScheduler(vertx);
    2. Observable<Integer> obs = blockingObservable.observeOn(scheduler);

    RxJava 也能被配置成使用 Vert.x 的调度器,这得益于 RxHelper.schedulerHook 方法创建的调度器钩子对象。对于 IO 操作这里使用了阻塞型的调度器:

    1. RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
    2. RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
    3. RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
    4. RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());

    Rx化的 Vert.x API 在 类中也提供了相似的方法:

    1. RxJavaSchedulersHook hook = io.vertx.rxjava.core.RxHelper.schedulerHook(vertx);
    2. RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
    3. RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
    4. RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());

    基于一个命名的工作线程池(named worker pool)创建调度器也是可以的,如果你想为了调度阻塞操作复用特定的线程池,这将会很有帮助:

    1. Scheduler scheduler = io.vertx.rxjava.core.RxHelper.scheduler(workerExecutor);
    2. Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);

    JSON解码

    方法创建了一个 rx.Observable.Operator 对象,这个操作符的作用是将 Observable<Buffer> 变换为对象的 Observable:

    1. fileSystem.open("/data.txt", new OpenOptions(), result -> {
    2. AsyncFile file = result.result();
    3. Observable<Buffer> observable = RxHelper.toObservable(file);
    4. observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe(
    5. mypojo -> {
    6. // 处理对象
    7. }
    8. );
    9. });

    Rx化的辅助类也能做同样的事:

    1. fileSystem.open("/data.txt", new OpenOptions(), result -> {
    2. AsyncFile file = result.result();
    3. Observable<Buffer> observable = file.toObservable();
    4. observable.lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).subscribe(
    5. mypojo -> {
    6. // 处理对象
    7. }
    8. );
    9. });

    部署Verticle

    Rx化的 API 不能部署一个已经存在的 Verticle 实例。 方法为此提供了一个解决方案。

    所有工作都在 RxHelper.deployVerticle 方法里自动完成,它会部署一个 Verticle 并返回包含部署 ID 的 Observable<String>

    1. Observable<String> deployment = RxHelper.deployVerticle(vertx, verticle);
    2. deployment.subscribe(id -> {
    3. // 部署成功
    4. }, err -> {
    5. // 部署失败
    6. });

    对于通过订阅执行一个 HTTP GET 请求这样的需求,利用 方法是很适合的:

    1. Observable<HttpClientResponse> get = RxHelper.get(client, "http://the-server");
    2. // 执行请求
    3. get.subscribe(resp -> {
    4. // 获得响应
    5. }, err -> {
    6. // 出错了
    7. });

    Rx化的 API 是 Vert.x API 的一个代码自动生成版本,就像 Vert.x 的 JavaScriptGroovy 版本一样。这个 API 以 io.vertx.rxjava 为包名前缀,例如 io.vertx.core.Vertx 类被转化为 ·io.vertx.rxjava.core.Vertx·类。

    Embedding Rxfified Vert.x

    只需使用 Vertx.vertx 方法:

    1. Vertx vertx = io.vertx.rxjava.core.Vertx.vertx();

    作为Verticle

    通过继承 AbstractVerticle 类,它会做一些包装(你将获得一个 RxJava Verticle):

    1. class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle {
    2. public void start() {
    3. // Use Rxified Vertx here
    4. }
    5. }

    让我们通过研究一些样例来了解相关 API 吧。

    Event Bus 消息流

    很自然地,MessageConsumer 类提供了相关的 Observable<Message<T>>

    1. EventBus eb = vertx.eventBus();
    2. MessageConsumer<String> consumer = eb.<String>consumer("the-address");
    3. Observable<Message<String>> observable = consumer.toObservable();
    4. Subscription sub = observable.subscribe(msg -> {
    5. // 获得消息
    6. });
    7. // 10秒后注销
    8. vertx.setTimer(10000, id -> {
    9. sub.unsubscribe();
    10. });

    类提供了 Message 的流,如果需要,还可以通过 方法获得消息体组成的新流:

    1. EventBus eb = vertx.eventBus();
    2. MessageConsumer<String> consumer = eb.<String>consumer("the-address");
    3. Observable<String> observable = consumer.bodyStream().toObservable();

    RxJava 的 map/reduce 组合风格在这里是相当有用的:

    1. Observable<Double> observable = vertx.eventBus().
    2. <Double>consumer("heat-sensor").
    3. bodyStream().
    4. toObservable();
    5. observable.
    6. buffer(1, TimeUnit.SECONDS).
    7. map(samples -> samples.
    8. stream().
    9. collect(Collectors.averagingDouble(d -> d))).
    10. subscribe(heat -> {
    11. });

    定时器

    定时器任务可以通过 方法来创建:

    1. toObservable().
    2. subscribe(
    3. id -> {
    4. System.out.println("Callback after 1 second");
    5. }
    6. );

    周期性的任务可以通过 periodicStream 方法来创建:

    通过注销操作可以取消对 Observable 的订阅:

    1. vertx.periodicStream(1000).
    2. toObservable().
    3. subscribe(new Subscriber<Long>() {
    4. public void onNext(Long aLong) {
    5. // 回调
    6. unsubscribe();
    7. }
    8. public void onError(Throwable e) {}
    9. public void onCompleted() {}
    10. });

    HTTP客户端请求

    toObservable 方法提供了带有 对象的一次性回调,请求错误同样会在 Observable 中反映处理。

    1. HttpClient client = vertx.createHttpClient(new HttpClientOptions());
    2. HttpClientRequest request = client.request(HttpMethod.GET, 8080, "localhost", "/the_uri");
    3. request.toObservable().subscribe(
    4. response -> {
    5. // 处理响应
    6. },
    7. error -> {
    8. // 无法连接
    9. }
    10. );
    11. request.end();

    通过 toObservable 方法可以将响应当成 Observable<Buffer> 来处理:

    1. request.toObservable().
    2. subscribe(
    3. response -> {
    4. Observable<Buffer> observable = response.toObservable();
    5. observable.forEach(
    6. buffer -> {
    7. // 处理 buffer
    8. }
    9. );
    10. }
    11. );

    flatMap 操作也能获得同样的流:

    1. request.toObservable().
    2. flatMap(HttpClientResponse::toObservable).
    3. forEach(
    4. buffer -> {
    5. // Process buffer
    6. }
    7. );

    通过静态方法 ,我们也能将 Observable<Buffer> 重组为对象。这个方法创建了一个 Rx.Observable.Operator(Rx 操作符)供重组操作使用:

    1. request.toObservable().
    2. flatMap(HttpClientResponse::toObservable).
    3. lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)).
    4. forEach(
    5. pojo -> {
    6. // Process pojo
    7. }
    8. );

    HTTP服务端请求

    方法对到达的每个请求都提供了回调:

    1. Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
    2. requestObservable.subscribe(request -> {
    3. // 处理请求
    4. });

    HttpServerRequest 可以被适配为 Observable<Buffer>

    1. Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
    2. requestObservable.subscribe(request -> {
    3. Observable<Buffer> observable = request.toObservable();
    4. });

    方法可以用来解析 JSON 格式的请求并将其映射为对象:

    1. Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
    2. requestObservable.subscribe(request -> {
    3. Observable<MyPojo> observable = request.
    4. toObservable().
    5. lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class));
    6. });

    WebSocket客户端

    当 WebSocket 连接上或失败时, 方法对此提供了一次性的回调:

    1. HttpClient client = vertx.createHttpClient(new HttpClientOptions());
    2. client.websocketStream(8080, "localhost", "/the_uri").toObservable().subscribe(
    3. ws -> {
    4. // Use the websocket
    5. },
    6. error -> {
    7. // Could not connect
    8. }
    9. );

    WebSocket 对象可以轻松地转换为 Observable<Buffer>

    1. socketObservable.subscribe(
    2. socket -> {
    3. Observable<Buffer> dataObs = socket.toObservable();
    4. dataObs.subscribe(buffer -> {
    5. System.out.println("Got message " + buffer.toString("UTF-8"));
    6. });
    7. }
    8. );

    WebSocket服务端

    每当有新连接到达时,websocketStream 方法都会提供一次回调:

    1. Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable();
    2. socketObservable.subscribe(
    3. socket -> System.out.println("Web socket connect"),
    4. failure -> System.out.println("Should never be called"),
    5. () -> {
    6. System.out.println("Subscription ended or server closed");
    7. }
    8. );

    对象可以轻松地转换为 Observable<Buffer>

    1. socketObservable.subscribe(
    2. socket -> {
    3. Observable<Buffer> dataObs = socket.toObservable();
    4. dataObs.subscribe(buffer -> {
    5. System.out.println("Got message " + buffer.toString("UTF-8"));
    6. });
    7. }