两个接口分工很明确,Protocol 负责解析数据形成消息实体,之后 smart-socket 会把该消息实体传递至 MessageProcessor 进行业务处理。

    当然,你也可以在 Protocol 中一次性完成解析、业务处理,又或者将 Protocol 当个摆设,所有事情集中在 MessageProcessor 完成。smart-socket 不限制你实现功能的自由性,只是提供一个更规范、更合理的建议,最终决定权还是在用户的手中。

    Protocol 是一个泛型接口,<T>指的是业务消息实体类,smart-socket 中不少地方都运用了泛型设计,其含义都代表数据解码后封装的消息类型。Protocol 中只定义了一个方法decode

    decode(消息解码),AIO 的数据传输是以 ByteBuffer 为媒介的。所有读取到的字节数据都会填充在 ByteBuffer 中并以事件回调的形式触发 Protocol#decode() 方法。所以我们实现的 decode 算法就是 ByteBuffer 对象转化为业务消息<T>的过程。

    • 根据协议解析每一个字段前,都要先确保剩余数据满足解析所需,不满足则终止该字段解析并返回 null。
    • 当已读的数据已满足一个完整业务消息所需时,立即构造该业务对象并返回,无需关心 ByteBuffer 中是否有剩余的数据。考虑到有些读者对上述两点还不甚理解,我们通过两个示例来模拟通信过程中的半包、粘包场景。通信协议依旧是如1.1节中的定义的类型:

    2.1.2_1

    • 半包
    1. public class StringProtocol implements Protocol<String> {
    2. public static void main(String[] args) {
    3. StringProtocol protocol = new StringProtocol();
    4. byte[] msgBody = "smart-socket".getBytes();
    5. byte msgHead = (byte) msgBody.length;
    6. System.out.println("完整消息长度:" + (msgBody.length + 1));
    7. ByteBuffer buffer = ByteBuffer.allocate(msgBody.length);
    8. buffer.put(msgHead);
    9. buffer.put(msgBody, 0, buffer.remaining());
    10. buffer.flip();
    11. System.out.println(protocol.decode(buffer, null));
    12. }
    13. public String decode(ByteBuffer buffer, AioSession<String> session) {
    14. buffer.mark();
    15. byte length = buffer.get();
    16. if (buffer.remaining() < length) {
    17. System.out.println("半包:期望长度:" + length + " ,实际剩余长度:" + buffer.remaining());
    18. buffer.reset();
    19. return null;
    20. }
    21. byte[] body = new byte[length];
    22. buffer.get(body);
    23. buffer.mark();
    24. return new String(body);
    25. }
    26. }

    根据协议规定,完整的消息长度是字符串“smart-socket”字节数加一个字节的消息头,即13位。但因接收数据的 ByteBuffer 空间不足导致无法容纳整个消息,此时执行解码算法decode便等同于通信中的半包,运行后控制台打印如下:

    • 粘包
    1. public class StringProtocol implements Protocol<String> {
    2. public static void main(String[] args) {
    3. StringProtocol protocol = new StringProtocol();
    4. byte[] msgBody = "smart-socket".getBytes();
    5. byte msgHead = (byte) msgBody.length;
    6. //第一个消息
    7. buffer.put(msgHead);
    8. buffer.put(msgBody);
    9. //第二个消息
    10. buffer.put(msgHead);
    11. buffer.put(msgBody);
    12. buffer.flip();
    13. String str = null;
    14. while ((str = protocol.decode(buffer, null)) != null) {
    15. System.out.println("消息解码成功:"+str);
    16. }
    17. }
    18. public String decode(ByteBuffer buffer, AioSession<String> session) {
    19. if (!buffer.hasRemaining()) {
    20. return null;
    21. }
    22. buffer.mark();
    23. byte length = buffer.get();
    24. if (buffer.remaining() < length) {
    25. System.out.println("半包:期望长度:" + length + " ,实际剩余长度:" + buffer.remaining());
    26. buffer.reset();
    27. return null;
    28. }
    29. byte[] body = new byte[length];
    30. buffer.get(body);
    31. buffer.mark();
    32. return new String(body);
    33. }
    34. }

    粘包出现于已读数据的部分超过了一个完整的消息长度。如 demo 所示,我们在 ByteBuffer 中放入了符合协议贵的两个完整消息,按照解码算法解析出第一个消息里立即返回new String(body),待该消息处理完成后再进行下一次解码。故上述例子的控制台打印如下:

    至此我们已经为大家介绍了 Protocol 的特性以及对于半包粘包的处理方式,当然真实场景下我们会面临更复杂的协议,对于半包粘包的处理方式也是多种多样,在通信协议章节在详细说明。

    1. public interface MessageProcessor<T> {
    2. /**
    3. * 处理接收到的消息
    4. *
    5. * @param session 通信会话
    6. * @param msg 待处理的业务消息
    7. /**
    8. * 状态机事件,当枚举事件发生时由框架触发该方法
    9. *
    10. *
    11. * @param session 本次触发状态机的AioSession对象
    12. * @param stateMachineEnum 状态枚举
    13. * @param throwable 异常对象,如果存在的话
    14. * @see StateMachineEnum
    15. */
    16. void stateEvent(AioSession<T> session, StateMachineEnum stateMachineEnum, Throwable throwable);
    17. }

    Protocol 侧重于通信层的数据解析,而 MessageProcessor 则负责应用层的消息业务处理。定义了消息处理器接口,smart-socket 在通过 Protocol 完成消息解码后,会将消息对象交由 MessageProcessor 实现类进行业务处理。

    • process消息处理器,smart-socket 每接收到一个完整的业务消息,都会交由该处理器执行。
    • stateEvent执行状态机,smart-socket 内置了状态枚举StateMachineEnumMessageProcessor实现类可在此方法中处理其关注的事件。

    状态机伴贯穿了通信服务的整个生命周期,在这个过程中不同事件的发生会触发不同的状态机。通信事件与状态机的关系如下图所示。

    图2.2.2

    状态机相对于整个通信环境的各个节点只是一个旁观者,它见证了各个事件的发生,却无力扭转事件的发展方向。状态机本质其实跟大家所认知的过滤器、拦截器有点类似,那为什么smart-socket要如此设计呢?想想一下如果我们按照过滤器的设计思路,其形态会如下所示:

    这样的设计存在以下缺陷:

    • 对实现类不友好;也许我只想处理 newSession,却不得不保留其余方法的空实现;
    • 无法平滑升级;加入新版本中加入新的事件类型,老版本代码需要全部更改;而采用状态机模式,不仅解决了上述问题,还为通信服务的多元化扩展带了便利。例如 IM 场景下,我们在 NEW_SESSION 状态机中收集 Session 集合,在消息处理时很容易就能实现消息群发;当某个用户断线后,我们及时在状态机 SESSION_CLOSED 中感知到并更新 Session 集合中的会话状态,甚至可以群发消息给所有用户“某某人掉线了”。这些通信状态和业务相结合的场景, 用状态机能很好的得以解决。最后奉上一段粗糙的伪代码,读者自行领悟。
    1. public class IMProcessor implements MessageProcessor<Message> {
    2. private LinkedBlockingQueue sessions = new LinkedBlockingQueue();
    3. public void process(AioSession<String> session, Message msg) {
    4. for(AioSession otherSession:sessions){
    5. if(otherSession==session){
    6. continue;
    7. }
    8. sendMessage(otherSession,session+"群发送消息:"+msg);
    9. }
    10. }
    11. public void stateEvent(AioSession<Message> session, StateMachineEnum state, Throwable throwable) {
    12. switch (state) {
    13. case NEW_SESSION:
    14. sessions.add(session);
    15. break;
    16. case SESSION_CLOSED:
    17. sessions.remove(session);
    18. break;
    19. }
    20. }