Elasticsearch聚合查询案例分享

Elasticsearch聚合查询案例分享

1.案例介绍

本文包含三个案例:

案例1:统计特定时间范围内每个应用的总访问量、访问成功数、访问失败数,每个应用请求响应时间分段统计(1秒内,1-3秒,3-5秒,5秒以上 )

案例3:简单的cardinality统计

2.准备工作

参考文档《》中的第1章节和第2章节,在自己的工程中导入bboss es依赖包和配置es参数

3.案例

在源码目录下新建文件esmapper/estrace/ESTracesMapper.xml,内容如下

3.1.2 编写统计dao及统计方法

  1. public List<ApplicationStatic> getApplicationSumStatic(TraceExtraCriteria traceExtraCriteria){
  2. ClientInterface clientUtil= ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTracesMapper.xml");
  3. //返回json统计报文,调试用,一遍根据json报文组装统计结果列表
  4. // String response = clientUtil.executeRequest("trace-*/_search",
  5. // "applicationSumStatic",traceExtraCriteria);
  6. //根据条件进行统计,在对象traceExtraCriteria中指定开始时间和结束时间
  7. MapRestResponse restResponse = clientUtil.search("trace-*/_search",
  8. "applicationSumStatic",traceExtraCriteria);
  9. //组装统计结果
  10. //获取应用统计列表,包含每个应用的名称、总访问量以及成功数和失败数
  11. List<Map<String,Object>> appstatics = (List<Map<String,Object>>)restResponse.getAggBuckets("applicationsums");
  12. if(appstatics != null && appstatics.size() > 0) {
  13. List<ApplicationStatic> applicationStatics = new ArrayList<ApplicationStatic>(appstatics.size());
  14. ApplicationStatic applicationStatic = null;
  15. for (int i = 0; i < appstatics.size(); i++) {
  16. applicationStatic = new ApplicationStatic();
  17. Map<String, Object> map = appstatics.get(i);
  18. //应用名称
  19. String appName = (String) map.get("key");
  20. applicationStatic.setApplicationName(appName);
  21. //应用总访问量
  22. Long totalsize = ResultUtil.longValue( map.get("doc_count"),0l);
  23. applicationStatic.setTotalSize(totalsize);
  24. //获取成功数和失败数
  25. List<Map<String, Object>> appstatic = (List<Map<String, Object>>)ResultUtil.getAggBuckets(map, "successsums");
  26. /**
  27. "buckets": [
  28. {
  29. "key": 0,
  30. "doc_count": 30
  31. }
  32. ]
  33. */
  34. //key 0
  35. Long success = 0l;//成功数
  36. Long failed = 0l;//失败数
  37. for (int j = 0; j < appstatic.size(); j++) {
  38. Map<String, Object> stats = appstatic.get(j);
  39. Integer key = (Integer) stats.get("key");//成功和错误标识
  40. if (key == 0)//成功
  41. success = ResultUtil.longValue( stats.get("doc_count"),0l);
  42. else if (key == 1)//失败
  43. failed = ResultUtil.longValue( stats.get("doc_count"),0l);
  44. }
  45. applicationStatic.setSuccessCount(success);
  46. applicationStatic.setFailCount(failed);
  47. List<ApplicationPeriodStatic> applicationPeriodStatics = new ArrayList<ApplicationPeriodStatic>(4);
  48. //获取响应时间分段统计信息
  49. Map<String, Map<String, Object>> appPeriodstatic = (Map<String, Map<String, Object>>)ResultUtil.getAggBuckets(map, "elapsed_ranges");
  50. //1秒
  51. Map<String, Object> period = appPeriodstatic.get("1秒");
  52. applicationPeriodStatic = new ApplicationPeriodStatic();
  53. applicationPeriodStatic.setPeriod("1秒");
  54. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  55. applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),1000));
  56. applicationPeriodStatics.add(applicationPeriodStatic);
  57. period = appPeriodstatic.get("3秒");
  58. applicationPeriodStatic = new ApplicationPeriodStatic();
  59. applicationPeriodStatic.setPeriod("3秒");
  60. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  61. applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),1000));
  62. applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),3000));
  63. applicationPeriodStatics.add(applicationPeriodStatic);
  64. //5秒
  65. period = appPeriodstatic.get("5秒");
  66. applicationPeriodStatic = new ApplicationPeriodStatic();
  67. applicationPeriodStatic.setPeriod("5秒");
  68. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  69. applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),3000));
  70. applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),5000));
  71. applicationPeriodStatics.add(applicationPeriodStatic);
  72. //5秒以上
  73. period = appPeriodstatic.get("5秒以上");
  74. applicationPeriodStatic = new ApplicationPeriodStatic();
  75. applicationPeriodStatic.setPeriod("5秒以上");
  76. applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
  77. applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),5000));
  78. applicationPeriodStatics.add(applicationPeriodStatic);
  79. applicationStatic.setApplicationPeriodStatics(applicationPeriodStatics);
  80. applicationStatics.add(applicationStatic);
  81. }
  82. //返回统计结果
  83. return applicationStatics;
  84. }
  85. return null;
  86. }
  87. }

Java代码

3.1.4 获取元数据信息的测试方法

java代码

  1. @Test
  2. public void testAppStatic(){
  3. TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
  4. traceExtraCriteria.setStartTime(1516304868072l);
  5. traceExtraCriteria.setEndTime(1516349516377l);
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTracesMapper.xml");
  7. //通过下面的方法先得到查询的json报文,然后再通过MapRestResponse查询遍历结果,调试的时候打开String response的注释
  8. //String response = clientUtil.executeRequest("trace-*/_search","applicationSumStatic",traceExtraCriteria);
  9. //System.out.println(response);
  10. MapRestResponse restResponse = clientUtil.search("trace-*/_search","applicationSumStatic",traceExtraCriteria);
  11. List<Map<String,Object>> appstatics = restResponse.getAggBuckets("applicationsums",new ESTypeReference<List<Map<String,Object>>>(){});
  12. int doc_count_error_upper_bound = restResponse.getAggAttribute("applicationsums","doc_count_error_upper_bound",int.class);
  13. int sum_other_doc_count = restResponse.getAggAttribute("applicationsums","sum_other_doc_count",int.class);
  14. System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
  15. System.out.println("sum_other_doc_count:"+sum_other_doc_count);
  16. for(int i = 0; i < appstatics.size(); i ++){
  17. Map<String,Object> map = appstatics.get(i);
  18. //应用名称
  19. //应用总访问量
  20. int totalsize = (int)map.get("doc_count");
  21. //获取成功数和失败数
  22. List<Map<String,Object>> appstatic = ResultUtil.getAggBuckets(map ,"successsums",new ESTypeReference<List<Map<String,Object>>>(){});
  23. doc_count_error_upper_bound = ResultUtil.getAggAttribute(map ,"successsums","doc_count_error_upper_bound",int.class);
  24. System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
  25. System.out.println("sum_other_doc_count:"+sum_other_doc_count);
  26. /**
  27. "buckets": [
  28. {
  29. "key": 0,
  30. "doc_count": 30
  31. }
  32. ]
  33. */
  34. //key 0
  35. int success = 0;//成功数
  36. int failed = 0;//失败数
  37. for(int j = 0; j < appstatic.size(); i ++){
  38. Map<String,Object> stats = appstatic.get(i);
  39. int key = (int) stats.get("key");//成功和错误标识
  40. if(key == 0)
  41. success = (int)stats.get("doc_count");
  42. else if(key == 1)
  43. failed = (int)stats.get("doc_count");
  44. }
  45. }
  46. }

建立dsl配置文件esmapper/testagg.xml,定义termAgg:

3.2.2 执行dsl

  1. @Test
  2. public void termAgg(){
  3. ClientInterface clientInterface = ElasticSearchHelper.getConfigRestClientUtil("esmapper/testagg.xml");
  4. //ESDatas<Map> traces = clientInterface.searchAll("trace-*",1000,Map.class);//获取总记录集合
  5. Map params = new HashMap();//聚合统计条件参数
  6. params.put("application","testweb");
  7. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");
  8. try {
  9. params.put("startTime",format.parse("1999-01-01 00:00:00").getTime());
  10. params.put("endTime",new Date().getTime());
  11. params.put("rpc","/testweb/jsp/logoutredirect.jsp");
  12. } catch (ParseException e) {
  13. e.printStackTrace();
  14. }
  15. //一行代码,执行每个服务的访问量总数统计
  16. ESAggDatas<LongAggHit> response = clientInterface.searchAgg("trace-*/_search",//从trace-开头的索引表中检索数据
  17. "termAgg", //配置在esmapper/testagg.xml中的dsl语句
  18. params, //dsl语句termAgg中需要的查询参数
  19. LongAggHit.class, //封装聚合统计中每个服务地址及服务访问量的地址
  20. "traces"); //term统计桶的名称,参见dsl语句
  21. List<LongAggHit> aggHitList = response.getAggDatas();//每个服务的访问量
  22. long totalSize = response.getTotalSize();//总访问量
  23. }

3.3.2 执行dsl

  1. @Test
  2. public void candicateAgg(){
  3. ClientInterface clientInterface = ElasticSearchHelper.getConfigRestClientUtil("esmapper/testagg.xml");
  4. Map params = null;//单值聚合统计条件参数
  5. //一行代码,执行服务基数统计
  6. ESAggDatas<SingleLongAggHit> response = clientInterface.searchAgg("trace-*/_search","candicateAgg",params,SingleLongAggHit.class,"traces");
  7. SingleLongAggHit aggHitList = response.getSingleAggData();
  8. long value = aggHitList.getValue();
  9. long totalSize = response.getTotalSize();//总访问量

4.相关资料

bboss elasticsearch交流:166471282