一、走起Socket服务端


    Step1: 实现一个Socket监听

    实例化 Socket 监听有三个类可以采取实例化动作.

    • : 采用 JDK 的 AIO 模型的异步通信,JDK > 1.7。
    • NioServerSocket: 采用 JDK 的 NIO 模型的异步通信,JDK > 1.4。
    • UdpServerSocket: 采用 JDK 的 UDP 模型的异步通信,JDK > 1.4。下面我们来看看这两个类的构造方法:

    我们可以看到这两个类的构造方法都具有三个或者四个参数:

    • host: 服务发布地址。
    • port: 服务发布端口。
    • readTimeout: 读取超时时间
    • idleInterval: 空闲事件触发时间, 单位: 秒可以看到有两类构造函数,在构造中没有idleInterval参数的不会触发 onIdle 事件,反之则会触发。下面我们来实例化一个Socket监听对象:
    1. AioServerSocket serverSocket = new AioServerSocket("127.0.0.1",2031,20000);

    实际使用中如果你想构造一个 Nio 模型的 Socket 监听,请将 AioServerSocket 替换成 NioServerSocket即可。


    消息分割器是用来处理消息粘包的一个补充类,相对于 Netty 和 Mina 是一个特殊的地方. 注意:消息分割器是工作在过滤器之前的. 消息分割器是在 Socket 监听器接受到消息后对消息的内容进行判断是否是一个完成消息报文,如果是一个完成消息报文则返回给过滤器来处理,如果不是则等待消息报文被完整接受,如果一直接受的消息报文都不完整则一直等待,这个时候我们可以通过超时来控制尝试间接收不到消息的情况,具体参考TimeOutMesssageSplitter分割器的实现,也可以直接实例化这个分割器在你自定义的分割器中通过业务代码来判断是否需要使用超时。下面我们给第一步实例化的 Socket 监听对象增加一个分割器:

    1. serverSocket.messageSplitter(new LineMessageSplitter());

    自定义一个消息过滤器需要实现MessageSplitte接口,接口的源码:

    1. package org.voovan.network;
    2. public interface MessageSplitter {
    3. public int canSplite(IoSession session,byte[] buffer);
    4. }

    通过源码我们可以发现,如果想实现一个消息分割器我们需要实现一个canSplite方法:

    • canSplite 方法: 判断消息是否可分割。这两个方法有两个相同的参数:
      • IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。
      • buffer参数: Socket 接收到的所有字节。
      • 返回对象: 截取消息的长度,MessageLoader会根据这个长度来截取消息,buffer参数中这个长度的字节将会交给过滤器来处理。下面我们给出框架实现的TimeOutMesssageSplitter的源码以供参考:
    1. public class TimeOutMesssageSplitter implements MessageSplitter {
    2. private long initTime;
    3. public TimeOutMesssageSplitter(){
    4. }
    5. @Override
    6. public int canSplite(IoSession session, byte[] buffer) {
    7. int timeOut = session.sockContext().getReadTimeout();
    8. long currentTime = System.currentTimeMillis();
    9. if(initTime==-1){
    10. initTime = currentTime;
    11. }
    12. if(currentTime-initTime >= timeOut){
    13. return byteBuffer.limit();
    14. }else{
    15. return -1;
    16. }
    17. }

    Step3: 实现一个过滤器

    我们可以定义多个过滤器形成一个过滤器链,这样可以提高部分过滤器的复用性. 在第一步实例化好的监听对象中调用增加过滤器方法可以向 Socket 监听对象增加过滤器。 增加的过滤器在过滤器链中是有先后顺序的,例如:在使用 add 方法加入的过滤器则在过滤器的最后一个.在解码的过程中过滤器的方法 decode 时是按照加入的从第一个到最后一个的顺序调用的.在编码的过程中过滤器方法 encode 是按照最后一个到第一个的顺序调用的。

    下面我们给第一步实例化的 Socket 监听对象增加一个过滤器:

    其中我们通过serverSocket.filterChain()获取过滤器链,然后通过过滤器链的 add 方法增加一个名为StringFilter的过滤器。

    Voovan 框架已经包括了一个过滤器的实现: StringFilter过滤器,用于将字节流转换成字符串。

    如果我们要根据自己的需求定义一个自定义过滤器,那么我们的过滤器实现一个 IoFilter 接口. 下面我们给出 IoFilter 接口的源码:

    1. package org.voovan.network;
    2. import org.voovan.network.exception.IoFilterException;
    3. public interface IoFilter {
    4. public Object decode(IoSession session,Object object) throws IoFilterException;
    5. public Object encode(IoSession session,Object object)throws IoFilterException;
    6. }

    通过源码我们可以发现,如果想实现一个过滤器我们需要实现两个过滤器方法:

    • decode 方法: 过滤器解码函数,接收事件(onRecive)前调用
    • encode 方法: 过滤器编码函数,发送事件(onSend)前调用这两个方法有两个相同的参数:
      • IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等.
      • object参数: 上一个过滤器的处理结果,如果只有一个过滤器则是业务代码中发送数据对象.`返回对象 过滤器处理过的返回结果,被下一个过滤器调用,如果是最后一个过滤器那么这个结果则会传入Socket业务处理句柄的onRecive` 方法。下面我们给出框架实现的StringFilter的源码以供参考:
    1. public class StringFilter implements IoFilter {
    2. @Override
    3. public Object encode(IoSession session,Object object) {
    4. if(object instanceof String){
    5. String sourceString = TObject.cast(object);
    6. return ByteBuffer.wrap(sourceString.getBytes());
    7. return object;
    8. }
    9. @Override
    10. public Object decode(IoSession session,Object object) {
    11. if(object instanceof ByteBuffer){
    12. return TByteBuffer.toString((ByteBuffer)object);
    13. }
    14. return object;
    15. }

    定义Socket 业务处理句柄需要实现IoHandler接口.下面我们给出 IoFilter 接口的源码:

    1. package org.voovan.network;
    2. public interface IoHandler {
    3. public Object onConnect(IoSession session);
    4. public void onDisconnect(IoSession session);
    5. public Object onReceive(IoSession session,Object obj);
    6. public void onSent(IoSession session,Object obj);
    7. public void onException(IoSession session,Exception e);
    8. }

    下面我们对5个方法做逐个说明:

    1. public Object onConnect(IoSession session);

    当Socket 连接断开后会回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。

    1. public Object onReceive(IoSession session,Object obj);

    当Socket 接受到数据,并且经过消息分割器分割后再经过过滤器的decode方法处理后的数据。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 obj参数: 接受的数据,这个数据是经过消息分割器和过滤器处理后的数据。 返回值:返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。

    1. public void onSent(IoSession session,Object obj);

    当Socket 发送成功后会回调这个方法. IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. obj参数: 发送的数据,这个数据是经过过滤器处理后的数据。

    1. public void onException(IoSession session,Exception e);

    当Socket 处理过程中发生异常则回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. e参数: Exception 对象描述这个异常。

    下面我们给第一步实例化的 Socket 连接对象增加一个业务处理句柄:

    1. serverSocket.handler(new ClientHandlerTest());

    下面我们给出一个实现的样例:


    Step5: 启动serverSocket

    完整的服务实例:

    1. public class AioServerSocketTest {
    2. public static void main(String[] args) throws IOException {
    3. AioServerSocket serverSocket = new AioServerSocket("127.0.0.1",2031,20000);
    4. serverSocket.handler(new ServerHandlerTest());
    5. serverSocket.filterChain().add(new StringFilter());
    6. serverSocket.messageSplitter(new LineMessageSplitter());
    7. serverSocket.start();
    8. }
    9. }