设计推送系统
和普通Web应用不同的是,基于Servlet的线程池模型不能高效地支持成百上千的WebSocket长连接。Java提供了NIO能充分利用Linux系统的epoll机制高效支持大量的长连接,但是直接使用NIO的接口非常繁琐,通常我们会选择基于NIO的服务器。直接使用Netty其实仍然比较繁琐,基于Netty开发我们可以选择:
- Spring WebFlux:封装了Netty并实现Reactive接口;
- Vert.x:封装了Netty并提供简单的API接口。
这里我们选择Vert.x,因为它的API更简单。
Vert.x本身包含若干模块,根据需要,我们引入3个组件:
我们先编写推送服务的入口:
@SpringBootApplication
// 禁用数据库自动配置 (无DataSource, JdbcTemplate...)
@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
public class PushApplication {
public static void main(String[] args) {
System.setProperty("vertx.disableFileCPResolving", "true");
System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
SpringApplication app = new SpringApplication(PushApplication.class);
// 禁用Spring的Web:
app.setWebApplicationType(WebApplicationType.NONE);
app.run(args);
}
}
上述代码仍然是一个标准的Spring Boot应用,因为我们希望利用Spring Cloud Config读取配置。由于我们不使用Spring自身的Web功能,因此需要禁用Spring的Web功能。推送服务本身并不需要访问数据库,因此禁用数据库自动配置。最后,我们把PushApplication
放在com.itranswarp.exchange.push
包下面,以避免自动扫描到com.itranswarp.exchange
包下的组件(如RedisService)。
下一步是编写PushService
,注意它是一个Spring组件,由Spring初始化:
由Spring初始化该组件的目的是注入各种配置。在初始化方法中,我们就可以启动Vert.x:
@PostConstruct
public void startVertx() {
// 启动Vert.x:
this.vertx = Vertx.vertx();
// 创建一个Vert.x Verticle组件:
var push = new PushVerticle(this.hmacKey, this.serverPort);
vertx.deployVerticle(push);
// 连接到Redis:
String url = "redis://" + (this.redisPassword.isEmpty() ? "" : ":" + this.redisPassword + "@") + this.redisHost
+ ":" + this.redisPort + "/" + this.redisDatabase;
Redis redis = Redis.createClient(vertx, url);
redis.connect().onSuccess(conn -> {
// 事件处理:
conn.handler(response -> {
// 收到Redis的PUSH:
if (response.type() == ResponseType.PUSH) {
int size = response.size();
if (size == 3) {
Response type = response.get(2);
// 收到PUBLISH通知:
String msg = type.toString();
// 由push verticle组件处理该通知:
push.broadcast(msg);
}
}
});
// 订阅Redis的Topic:
conn.send(Request.cmd(Command.SUBSCRIBE).arg(RedisCache.Topic.NOTIFICATION)).onSuccess(resp -> {
logger.info("subscribe ok.");
}).onFailure(err -> {
logger.error("subscribe failed.", err);
System.exit(1);
});
}).onFailure(err -> {
logger.error("connect to redis failed.", err);
System.exit(1);
});
}
Vert.x用Verticle
表示一个组件,我们编写PushVerticle
来处理WebSocket连接:
在PushVerticle
中,start()
方法由Vert.x回调。我们在start()
方法中主要干这么几件事:
- 创建基于Vert.x的HTTP服务器(内部使用Netty);
- 创建路由;
- 绑定一个路径为
/notification
的GET请求,将其升级为WebSocket连接; - 绑定其他路径的GET请求;
- 开始监听指定端口号。
在处理/notification
时,我们尝试从URL的token参数解析出用户ID,这样我们就无需访问数据库而获得了当前连接的用户。升级到WebSocket连接后,再调用initWebSocket()
继续处理WebSocket连接:
public class PushVerticle extends AbstractVerticle {
// 所有Handler:
Map<String, Boolean> handlersSet = new ConcurrentHashMap<>(1000);
// 用户ID -> Handlers
Map<Long, Set<String>> userToHandlersMap = new ConcurrentHashMap<>(1000);
// Handler -> 用户ID
Map<String, Long> handlerToUserMap = new ConcurrentHashMap<>(1000);
void initWebSocket(ServerWebSocket websocket, Long userId) {
// 获取一个WebSocket关联的Handler ID:
String handlerId = websocket.textHandlerID();
// 处理输入消息:
websocket.textMessageHandler(str -> {
logger.info("text message: " + str);
});
logger.error("websocket error: " + t.getMessage(), t);
});
// 关闭连接时:
websocket.closeHandler(e -> {
unsubscribeClient(handlerId);
unsubscribeUser(handlerId, userId);
subscribeClient(handlerId);
subscribeUser(handlerId, userId);
}
void subscribeClient(String handlerId) {
this.handlersSet.put(handlerId, Boolean.TRUE);
}
void unsubscribeClient(String handlerId) {
this.handlersSet.remove(handlerId);
}
void subscribeUser(String handlerId, Long userId) {
if (userId == null) {
return;
}
handlerToUserMap.put(handlerId, userId);
Set<String> set = userToHandlersMap.get(userId);
if (set == null) {
set = new HashSet<>();
userToHandlersMap.put(userId, set);
}
set.add(handlerId);
}
void unsubscribeUser(String handlerId, Long userId) {
if (userId == null) {
return;
}
handlerToUserMap.remove(handlerId);
Set<String> set = userToHandlersMap.get(userId);
if (set != null) {
set.remove(handlerId);
}
}
在Vert.x中,每个WebSocket连接都有一个唯一的Handler标识,以String
表示。我们用几个Map
保存Handler和用户ID的映射关系,当关闭连接时,将对应的映射关系删除。
最后一个关键方法broadcast()
由PushService
中订阅的Redis推送时触发,该方法用于向用户主动推送通知:
当Redis收到PUBLISH
调用后,它自动将String
表示的JSON数据推送给所有订阅端。我们在PushService
中订阅了notification
这个Topic,然后通过broadcast()
推送给WebSocket客户端。对于一个NotificationMessage
,如果设置了,则推送给指定用户,适用于订单成交等针对用户ID的通知;如果没有设置userId
,则推送给所有用户,适用于公开市场信息的推送。
整个推送服务仅包括3个Java文件,我们就实现了基于Redis和WebSocket的高性能推送。
可以从GitHub或下载源码。
GitHub ▸ ▸ warpexchange ▸
)
)
)
)
)
)
)
)
)
)
)
▸ db)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
▸ java/com/itranswarp/exchange/config)
)
)
)
)
)
)
)
)
)
)
▸ java/com/itranswarp/exchange/push)
)
)
)
)
▸ java/com/itranswarp/exchange)
)
)
)
)
)
▸ java/com/itranswarp/exchange)
)
)
▤ TradingEngineApiProxyService.java)
)
)
▤ TradingInternalApiController.java)
)
)
)
)
)
)
)
)
)
)
)
)
)
)
▤ TradingEngineApplication.java)
)
)
▸ test/java/com/itranswarp/exchange)
)
)
)
)
)
)
)
)
)
)
)
)
)
要高效处理大量WebSocket连接,我们选择基于Netty的Vert.x框架,可以通过少量代码配合Redis实现推送。