Counter 例子详解

    在多个节点(机器)组成的一个 raft group 中保存一个分布式计数器,该计数器可以递增和获取,并且在所有节点之间保持一致,任何少数节点的挂掉都不会影响堆外提供的两个服务:

    • incrmentAndGet(delta) 递增 delta 数值并返回递增后的值。
    • get() 获取最新的值

    jraft 底层使用 bolt 作为通讯框架,定义两个请求

    • IncrementAndGetRequest,用于递增
    • GetValueRequest,用于获取最新值:
    1. public class GetValueRequest implements Serializable {
    2. private static final long serialVersionUID = 9218253805003988802L;
    3.  
    4. public GetValueRequest() {
    5. super();
    6. }
    7.  
    8. }

    应答结果 ValueResponse,包括:

    • success 是否成功
    • value 成功情况下返回的最新值
    • errorMsg 失败情况下的错误信息
    • redirect 发生了重新选举,需要跳转的新的leader节点。
    1. public class ValueResponse implements Serializable {
    2.  
    3. private static final long serialVersionUID = -4220017686727146773L;
    4.  
    5. private long value;
    6. private boolean success;
    7. /**
    8. * redirect peer id
    9. */
    10. private String redirect;
    11.  
    12. private String errorMsg;
    13.  
    14. public String getErrorMsg() {
    15. return this.errorMsg;
    16. }
    17.  
    18. public void setErrorMsg(String errorMsg) {
    19. this.errorMsg = errorMsg;
    20. }
    21. ......
    22. }

    IncrementAndAddClosure 用于 Leader 服务端接收IncrementAndGetRequest 请求后的回调处理:

    1. public class IncrementAndAddClosure implements Closure {
    2. private CounterServer counterServer;
    3. private IncrementAndGetRequest request;
    4. private ValueResponse response;
    5. private Closure done; // 网络应答callback
    6.  
    7. public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response,
    8. Closure done) {
    9. super();
    10. this.counterServer = counterServer;
    11. this.request = request;
    12. this.response = response;
    13. this.done = done;
    14. }
    15.  
    16. @Override
    17. public void run(Status status) {
    18. // 返回应答给客户端
    19. if (this.done != null) {
    20. done.run(status);
    21. }
    22. }
    23.  
    24. public IncrementAndGetRequest getRequest() {
    25. return this.request;
    26. }
    27.  
    28. public void setRequest(IncrementAndGetRequest request) {
    29. this.request = request;
    30. }
    31.  
    32. public ValueResponse getResponse() {
    33. return this.response;
    34. }
    35.  
    36. }

    首先持有一个初始值:

    1. public class CounterStateMachine extends StateMachineAdapter {
    2. /**
    3. * counter value
    4. */
    5. private AtomicLong value = new AtomicLong(0);

    CounterServer

    启动一个 raft node节点,提供分布式计数器服务,内部使用 jraft 提供的 RaftGroupService 服务框架:

    1. public class CounterServer {
    2. // jraft 服务端服务框架
    3. private RaftGroupService raftGroupService;
    4. // raft 节点
    5. private Node node;
    6. // 业务状态机
    7. private CounterStateMachine fsm;
    8.  
    9. public CounterServer(String dataPath, String groupId, PeerId serverId, NodeOptions nodeOptions) throws IOException {
    10. // 初始化路径
    11. FileUtils.forceMkdir(new File(dataPath));
    12. // 初始化全局定时器
    13. TimerManager.init(50);
    14.  
    15. // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开.
    16. RpcServer rpcServer = new RpcServer(serverId.getPort());
    17. RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
    18. // 注册业务处理器
    19. rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
    20. rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
    21. // 初始化状态机
    22. this.fsm = new CounterStateMachine();
    23. // 设置状态机到启动参数
    24. nodeOptions.setFsm(this.fsm);
    25. // 设置存储路径
    26. nodeOptions.setLogUri(dataPath + File.separator + "log");
    27. // 元信息, 必须
    28. nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
    29. // snapshot, 可选, 一般都推荐
    30. nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
    31. // 初始化 raft group 服务框架
    32. this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
    33. // 启动
    34. this.node = this.raftGroupService.start();
    35. }
    36.  
    37. public CounterStateMachine getFsm() {
    38. return this.fsm;
    39. }
    40.  
    41. public Node getNode() {
    42. return this.node;
    43. }
    44.  
    45. public RaftGroupService RaftGroupService() {
    46. return this.raftGroupService;
    47. }
    48.  
    49. /**
    50. * 生成重定向请求
    51. */
    52. public ValueResponse redirect() {
    53. ValueResponse response = new ValueResponse();
    54. response.setSuccess(false);
    55. if (node != null) {
    56. PeerId leader = node.getLeaderId();
    57. if (leader != null) {
    58. response.setRedirect(leader.toString());
    59. }
    60. }
    61.  
    62. return response;
    63. }
    64.  
    65. public static void main(String[] args) throws IOException {
    66. if (args.length != 4) {
    67. System.out
    68. .println("Useage : java com.alipay.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");
    69. System.out
    70. .println("Example: java com.alipay.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
    71. System.exit(1);
    72. }
    73. String dataPath = args[0];
    74. String groupId = args[1];
    75. String serverIdStr = args[2];
    76. String initConfStr = args[3];
    77.  
    78. NodeOptions nodeOptions = new NodeOptions();
    79. // 为了测试, 调整 snapshot 间隔等参数
    80. nodeOptions.setElectionTimeoutMs(5000);
    81. nodeOptions.setDisableCli(false);
    82. nodeOptions.setSnapshotIntervalSecs(30);
    83. // 解析参数
    84. PeerId serverId = new PeerId();
    85. if (!serverId.parse(serverIdStr)) {
    86. throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
    87. }
    88. Configuration initConf = new Configuration();
    89. if (!initConf.parse(initConfStr)) {
    90. throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
    91. }
    92. // 设置初始集群配置
    93. nodeOptions.setInitialConf(initConf);
    94.  
    95. // 启动
    96. CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
    97. System.out.println("Started counter server at port:"
    98. + counterServer.getNode().getNodeId().getPeerId().getPort());
    99. }
    100. }

    启动三个节点的参数类似:

    windows 用户请注意第一个参数的数据目录设置

    1. /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
    2. /tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
    3. /tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

    分别为 server1/server2/server3三个目录,raft group名称为 counter,节点ip也分别为

    注册的网络请求处理器,我们看下 IncrementAndGetRequestProcessor 实现,一个普通的 bolt processor :

    1. public class IncrementAndGetRequestProcessor extends AsyncUserProcessor<IncrementAndGetRequest> {
    2. private static final Logger LOG = LoggerFactory.getLogger(IncrementAndGetRequestProcessor.class);
    3.  
    4. private CounterServer counterServer;
    5.  
    6. public IncrementAndGetRequestProcessor(CounterServer counterServer) {
    7. super();
    8. this.counterServer = counterServer;
    9. }
    10.  
    11. @Override
    12. public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, IncrementAndGetRequest request) {
    13.  
    14.      // 非leader,生成跳转请求
    15. if (!counterServer.getFsm().isLeader()) {
    16. asyncCtx.sendResponse(counterServer.redirect());
    17. return;
    18. }
    19.  
    20. // 构建应答回调
    21. ValueResponse response = new ValueResponse();
    22. IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response, new Closure() {
    23.  
    24. @Override
    25. public void run(Status status) {
    26. // 提交后处理
    27. if (!status.isOk()) {
    28. // 提交失败,返回错误信息
    29. response.setErrorMsg(status.getErrorMsg());
    30. response.setSuccess(false);
    31. }
    32. // 成功,返回ValueResponse应答
    33.  
    34. }
    35. });
    36.  
    37. try {
    38. // 构建提交任务
    39. Task task = new Task();
    40. task.setDone(closure); // 设置回调
    41. // 填充数据,将请求用 hessian2序列化到 data 字段
    42. task.setData(ByteBuffer.wrap(Codecs.getSerializer(Codecs.Hessian2).encode(request)));
    43.  
    44. // 提交到 raft group
    45. counterServer.getNode().apply(task);
    46. } catch (CodecException e) {
    47. // 处理序列化异常
    48. LOG.error("Fail to encode IncrementAndGetRequest", e);
    49. ValueResponse responseObject = response;
    50. responseObject.setSuccess(false);
    51. responseObject.setErrorMsg(e.getMessage());
    52. asyncCtx.sendResponse(responseObject);
    53. }
    54. }
    55.  
    56. @Override
    57. public String interest() {
    58. return IncrementAndGetRequest.class.getName();
    59. }
    60.  
    61. }

    接下来初始化 RPC 客户端并更新路由表:

    1. BoltCliClientService cliClientService = new BoltCliClientService();
    2. cliClientService.init(new CliOptions());
    3.  
    4. if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
    5. throw new IllegalStateException("Refresh leader failed");
    6. }

    获取 leader 后发送请求:

    1. PeerId leader = RouteTable.getInstance().selectLeader(groupId);
    2. System.out.println("Leader is " + leader);
    3. int n = 1000;
    4. CountDownLatch latch = new CountDownLatch(n);
    5. long start = System.currentTimeMillis();
    6. for (int i = 0; i < n; i++) {
    7. incrementAndGet(cliClientService, leader, i, latch);
    8. }
    9. latch.await();
    10. System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
    11. System.exit(0);

    incrementAndGet 方法实现比较简单了:

    1. private static void incrementAndGet(BoltCliClientService cliClientService, PeerId leader, long delta,
    2. CountDownLatch latch) throws RemotingException, InterruptedException {
    3. // 构建 IncrementAndGetRequest 请求并发送到 leader
    4. IncrementAndGetRequest request = new IncrementAndGetRequest();
    5. request.setDelta(delta);
    6. cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,
    7. new InvokeCallback() {
    8.  
    9. @Override
    10. public void onResponse(Object result) {
    11. latch.countDown();
    12. System.out.println("incrementAndGet result:" + result);
    13. }
    14.  
    15. @Override
    16. public void onException(Throwable e) {
    17. e.printStackTrace();
    18. latch.countDown();
    19.  
    20. }
    21.  
    22. @Override
    23. public Executor getExecutor() {
    24. return null;
    25. }
    26. }, 5000);
    27. }

    为了避免每次节点重启的时候,重新应用一遍所有的日志,并且避免保存所有的日志,可以使用 snapshot 机制,也就是为状态机做一个 checkpoint,保存当时状态机的状态,删除在此之前的所有日志,核心是实现 StateMachine的两个方法:

    • ,启动或者安装 snapshot 后加载 snapshot
    • onSnapshotSave ,定期保存 snapshot
      我们先为 Counter实现一个snapsho t数据文件:
    1. public class CounterSnapshotFile {
    2. private static final Logger LOG = LoggerFactory.getLogger(CounterSnapshotFile.class);
    3. private String path;
    4.  
    5. public CounterSnapshotFile(String path) {
    6. super();
    7. this.path = path;
    8. }
    9.  
    10. public String getPath() {
    11. return this.path;
    12. }
    13.  
    14. /**
    15. * Save value to snapshot file.
    16. * @param value
    17. * @return
    18. */
    19. public boolean save(long value) {
    20. try {
    21. FileUtils.writeStringToFile(new File(path), String.valueOf(value));
    22. return true;
    23. } catch (IOException e) {
    24. LOG.error("Fail to save snapshot", e);
    25. return false;
    26. }
    27. }
    28.  
    29. public long load() throws IOException {
    30. String s = FileUtils.readFileToString(new File(path));
    31. if (!StringUtils.isBlank(s)) {
    32. return Long.parseLong(s);
    33. }
    34. throw new IOException("Fail to load snapshot from " + path + ",content: " + s);
    35. }
    36. }

    保存到指定的 。

    snapshot 的间隔可以通过 NodeOptions 的 snapshotIntervalSecs 控制,默认一个小时。