伸缩自如的ElasticSearch——通过bboss操作和访问elasticsearch模式


ClientUtil

bboss操作和访问elasticsearch提供两种模式,分别对应两个组件:

  • RestClientUtil:通用组件,提供所有不依赖dsl的功能,也可以直接接收dsl。

  • ConfigRestClientUtil:加载配置文件中的dsl来实现对es的操作

这两个组件分别通过org.frameworkset.elasticsearch.ElasticSearchHelper中提供的静态工厂方法获取其单实例对象,这些单实例对象是多线程并发安全的,分别说明如下:

加载配置文件中的dsl来实现对es的操作模式

1
2
3
public static ClientInterface getConfigRestClientUtil(String configFile)

public static ClientInterface getConfigRestClientUtil(String elasticSearch,String configFile) //elasticsearch参数指定了bboss中多集群配

所有不依赖dsl的功能,或直接接收dsl模式
1
2
3
public static ClientInterface getRestClientUtil()

public static ClientInterface getRestClientUtil(String elasticSearch) //elasticsearch参数指定了bboss中多集群配

通过这两个方法获取到的ClientInterface实例是多线程安全的、单实例对象。
直接操作dsl使用实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void testDirectDslQuery(){
String queryAll = "{\"query\": {\"match_all\": {}}}";
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
ESDatas<Demo> esDatas =clientUtil.searchList("demo/_search",//demo为索引表,_search为检索操作action
queryAll,//queryAll变量对应的dsl语句
Demo.class);
//获取结果对象列表
List<Demo> demos = esDatas.getDatas();

//获取总记录数
long totalSize = esDatas.getTotalSize();
System.out.println(totalSize);
}

基本功能

配置es查询dsl

在resources下创建配置文件estrace/ESTracesqlMapper.xml,配置一个query dsl脚本,名称为queryServiceByCondition,我们将在后面的ClientInterface 组件中通过queryServiceByCondition引用这个脚本,脚本内容定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
<properties>
<property name="queryServiceByCondition">
<![CDATA[
{
"sort": [ ##排序
{
"startTime": {
"order": "desc"
}
}
],
#if($lastStartTime > 0)//search_after分页查询
"search_after": [#[lastStartTime]],
#end
"size": 100, ##每次返回100条记录
"query": {
"bool":{
"filter": [
{"term": { ##精确查找
"applicationName": #[application]
}}
#if($queryStatus.equals("success"))
,
{"term": { ##精确查找

"err": 0
}}
#elseif($queryStatus.equals("error"))
,
{"term": { ##精确查找

"err": 1
}}
#end
,
{"range": { ##指定时间范围
"startTime": {
"gte": #[startTime],
"lt": #[endTime]
}
}}
]

#if($queryCondition && !$queryCondition.equals(""))
,
"must" : {
"multi_match" : { ##分词检索,指定坐在多个field执行检索
"query" : #[queryCondition],

"fields" : [ "agentId", "applicationName" ,"endPoint","params","remoteAddr","rpc","exceptionInfo"]
}
}
#end
}
},
"aggs": {
"applicationsums": {
"terms": {
"field": "applicationName.keyword",##按应用名称进行统计计数
"size":10000
},
"aggs":{
"successsums" : {
"terms" : {
"field" : "err" ##按err标识统计每个应用的成功数和失败数,0标识成功,1标识失败
}
},
"elapsed_ranges" : {
"range" : {
"field" : "elapsed", ##按响应时间分段统计
"keyed" : true,
"ranges" : [
{ "key" : "1秒", "to" : 1000 },
{ "key" : "3秒", "from" : 1000, "to" : 3000 },
{ "key" : "5秒", "from" : 3000, "to" : 5000 },
{ "key" : "5秒以上", "from" : 5000 }
]
}
}
}
}
}
}]]>
</property>

</properties>

查询的java代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.frameworkset.elasticsearch.ElasticSearchHelper;
import org.frameworkset.elasticsearch.client.ClientInterface;
//加载配置文件,创建es客户端工具包
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("estrace/ESTracesqlMapper.xml");

//构建查询条件对象
TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
traceExtraCriteria.setApplication("testweb88");
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
traceExtraCriteria.setStartTime(dateFormat.parse("2017-09-02 00:00:00").getTime());
traceExtraCriteria.setEndTime(dateFormat.parse("2017-09-13 00:00:00").getTime());

// 检索条件
String queryCondition = (request.getParameter("queryCondition"));
// 设置检索条件
traceExtraCriteria.setQueryCondition(queryCondition);
// 查询状态:all 全部 success 处理成功 fail 处理失败
String queryStatus = request.getParameter("queryStatus");
traceExtraCriteria.setQueryStatus(queryStatus);
//设置分页数据起点,以时间为起点
String lastStartTimeStr = request.getParameter("lastStartTime");
if(lastStartTimeStr != null && !lastStartTimeStr.equals("")) {
Long lastStartTime = Long.parseLong(lastStartTimeStr);
traceExtraCriteria.setLastStartTime(lastStartTime);
}

//执行查询操作,查询可以是简单的检索查询,也可以结合聚合查询
ESDatas<Traces> data //ESDatas为查询结果集对象,封装了返回的当前查询的List<Traces>结果集、符合条件的总记录数totalSize、以及聚合查询的结果
= clientUtil.searchList("trace-*/_search",//查询操作,查询indices trace-*中符合条件的数据
"queryServiceByCondition",//通过名称引用配置文件中的query dsl语句
traceExtraCriteria,//查询条件封装对象
Traces.class);//指定返回的po对象类型,po对象中的属性与indices表中的文档filed名称保持一致
List<Traces> traceList = data.getDatas();//获取查询到的记过集
long totalSize = data.getTotalSize();//获取总记录数
List<Map<String, Object>> applicationsums= data.getAggregationBuckets("applicationsums");//同时可以做聚合查询,获取聚合查询结果集
for (int i = 0; i < applicationsums .size(); i++) {

Map<String, Object> map = applicationsums.get(i);

//获取子聚合查询结果:成功数和失败数
List<Map<String, Object>> appstatic = (List<Map<String, Object>>)ResultUtil.getAggBuckets(map, "successsums");

//获取响应时间分段统计信息
Map<String, Map<String, Object>> appPeriodstatic = (Map<String, Map<String, Object>>)ResultUtil.getAggBuckets(map, "elapsed_ranges");
}

封装条件也可以用Map替代
1
2
3
4
5
6
7
8
9
Map<String,Object> params = new HashMap<String,Object>();
//设置applicationName1和applicationName2两个变量的值
params.put("applicationName1","blackca\"tdemo2");
params.put("applicationName2","blackcat\"demo3");
ESDatas<Demo> esDatas = //ESDatas包含当前检索的记录集合,最多1000条记录,由dsl中的size属性指定
clientUtil.searchList("demo/_search",//demo为索引表,_search为检索操作action
"searchWithCustomEscape",//esmapper/demo.xml中定义的dsl语句
params,//变量参数
Demo.class);//返回的文档封装对象类型

文档批量创建或修改

按照日期分表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
一个完整的批量添加和修改索引文档的案例  
//分表的时间
SimpleDateFormat format = new SimpleDateFormat("yyyy.MM.dd");
String date = format.format(new Date());

ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
//数据1
List<Demo> demos = new ArrayList<>();
Demo demo = new Demo();
demo.setDemoId(2l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo2");
demo.setContentbody("this is content body2");
demos.add(demo);
//数据2
demo = new Demo();
demo.setDemoId(3l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo3");
demo.setContentbody("this is content body3");
demos.add(demo);

//批量创建文档
String response = clientUtil.addDateDocuments("demo",//索引表
"demo",//索引类型
demos);

System.out.println("addDateDocument-------------------------");
System.out.println(response);

//批量更新文档
demo.setContentbody("updated");
response = clientUtil.updateDocuments("demo-"+date,"demo",demos);
System.out.println("updateDateDocument-------------------------");

System.out.println(response);
//获取索引文档,json格式
response = clientUtil.getDocument("demo-"+date,//索引表
"demo",//索引类型
"2");//文档id
System.out.println("getDocument-------------------------");
System.out.println(response);
//获取索引文档,返回Demo对象类型
demo = clientUtil.getDocument("demo-"+date,//索引表
"demo",//索引类型
"3",//文档id
Demo.class);

不按日期分表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//一个完整的批量添加和修改索引文档的案例  

ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
List<Demo> demos = new ArrayList<>();
Demo demo = new Demo();
demo.setDemoId(2l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo2");
demo.setContentbody("this is content body2");
demos.add(demo);

demo = new Demo();
demo.setDemoId(3l);
demo.setAgentStarttime(new Date());
demo.setApplicationName("blackcatdemo3");
demo.setContentbody("this is content body3");
demos.add(demo);

//批量创建文档
String response = clientUtil.addDocuments("demo",//索引表
"demo",//索引类型
demos);

System.out.println("addDocuments-------------------------");
System.out.println(response);

//批量更新文档
demo.setContentbody("updated");
response = clientUtil.updateDocuments("demo","demo",demos);
System.out.println("updateDateDocument-------------------------");

System.out.println(response);
//获取索引文档,json格式
response = clientUtil.getDocument("demo",//索引表
"demo",//索引类型
"2");//文档id
System.out.println("getDocument-------------------------");
System.out.println(response);
//获取索引文档,返回Demo对象类型
demo = clientUtil.getDocument("demo",//索引表
"demo",//索引类型
"3",//文档id
Demo.class);

除了es能够自动生成文档_id属性,bboss提供了三种指定文档_id和parentid的方法:

  • 注解@ESId和@ESParentId
  • ClientInterface接口方法参数
  • ClientOptions(新增/修改)/UpdateOptions(修改控制)

添加索引文档时,es会自动设置文档_id属性,如果需要人工指定_id值,只需要在对象属性上设置注解@ESId即可,例如:

1
2
@ESId //ip属性作为文档唯一标识,根据ip值对应的索引文档存在与否来决定添加或者修改操作
private String ip;

http api

通过ClientInterface 接口提供的以下通用executeHttp api,我们可以非常方便地实现es中所有带请求报文的功能

1
2
3
4
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
//验证环境,获取es状态
String response = clientUtil.executeHttp("/kibana_sample_data_logs/_search", ClientInterface.HTTP_GET);
System.out.println(response);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void testTempate() throws ParseException{

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTemplate.xml");
//创建模板
String response = clientUtil.createTempate("demotemplate_1",//模板名称
"demoTemplate");//模板对应的脚本名称,在esmapper/estrace/ESTemplate.xml中配置
System.out.println("createTempate-------------------------");
System.out.println(response);
//获取模板
/**
* 指定模板
* /_template/demoTemplate_1
* /_template/demoTemplate*
* 所有模板 /_template
*
*/
String template = clientUtil.executeHttp("/_template/demotemplate_1",ClientUtil.HTTP_GET);
System.out.println("HTTP_GET-------------------------");
System.out.println(template);
//删除模板
template = clientUtil.executeHttp("/_template/demotemplate_1",ClientUtil.HTTP_DELETE);
System.out.println("HTTP_DELETE-------------------------");
System.out.println(template);

template = clientUtil.executeHttp("/_template/demotemplate_1",ClientUtil.HTTP_GET);
System.out.println("HTTP_GET after delete-------------------------");
System.out.println(template);
}

查询dsl动态脚本语法规范

bboss elasticsearch采用xml文件管理elasticsearch的dsl脚本,在dsl脚本中可以使用变量、foreach循环、逻辑判断、注释;配置文件支持在线修改、自动热加载,开发和调试非常方便。

脚本中变量定义语法有两种:#[xxx],$xxx,尽可能地在脚本中使用#[xxx]方式的变量,在#[]类型变量中还可以指定属性,后面举例说明。对于#[xxx]类型变量值中包含的可能破坏dsl json语法结构的特殊字符(例如回车换行符等),框架会自动进行转义处理;$xxx类型变量直接输出原始值(特殊字符不做转移处理),$xxx类型变量可以用于if/else和foreach循环控制变量,而#[xxx]不可以。

1
2
3
4
5
6
7
8
9
判断List集合datas不为空并且datas的size大于0

#if($datas && $datas.size()> 0)

#foreach($bb in $datas)

#end

#end
  • foreach循环语法:

#foreach-#end
foreach循环内置循环变量:$velocaitycount,不需要从外部传入

  • 逻辑判断语法:

    #if-#else-#end,#if-#elseif-#else-#end
    变量值逻辑判断
    #if($xxxx)
    #end
    变量值不为null判断(类似java语法 if(xxxx != null))
    #if(!$xxxx)
    #end
    变量值为null判断(类似java语法 if(xxxx == null))
    #if($xxxx && !$xxxx.equals(“”))
    #end
    变量值不为null判断且不等于”“判断(类似java语法 if(xxxx != null && !xxx.equals(””)))
    #if($xxxx > 0)
    #end
    变量值大于某个值判断,其他类似(类似java语法 if(xxxx > 0))
    #if($datas && $datas.size() > 0)
    #end
    判断List集合不为null并且size大于0,逻辑判断还可以包含各种组合 && ||操作。

  • 在dsl中定义和修改$模式变量:

    • 定义变量

      #set($needComma = true)

    • 修改$变量值

      #set($needComma = false)

  • 在dsl中使用注释:
    dsl注释是用多个#号来标识的,大段注释用##包起来
    单行注释:##注释内容

  • 变量使用:

    #[application]
    变量格式#[aaa]所有格式:

#[aaa]
简单的变量属性引用

#[aaa->bb]
如果aaa是一个bean对象,这个变量格式表示了对aaa对象的bb属性的引用,如果aaa是一个map对象,这个变量格式表示了对aaa对象的key为bb的元素值引用

#[aaa[key]]
引用map对象aaa中key所对应的value数据,引用map元素的等价方法#[aaa->key]

配置springboot

application.properties如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
elasticsearch.rest.hostNames=localhost:9200


#动态索引表名称日期格式配置
elasticsearch.dateFormat=yyyy.MM.dd

elasticsearch.timeZone=Asia/Shanghai
elasticsearch.ttl=2d

#在控制台输出脚本调试开关showTemplate,false关闭,true打开,同时log4j至少是info级别
elasticsearch.showTemplate=false

#客户端动态发现es集群节点控制开关
elasticsearch.discoverHost=false

#http链接池配置
http.timeoutConnection = 400000
http.timeoutSocket = 400000
http.connectionRequestTimeout=400000
http.retryTime = 1
http.maxLineLength = -1
http.maxHeaderCount = 200
http.maxTotal = 400
http.defaultMaxPerRoute = 200

# dsl配置文件热加载扫描时间间隔,毫秒为单位,默认5秒扫描一次,<= 0时关闭扫描机制
dslfile.refreshInterval = -1

依赖如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.3</version>
</dependency>

<!-- bboss客户端开始 -->
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-spring-boot-starter</artifactId>
<version>5.3.7</version>
</dependency>
<dependency>
<groupId>com.bbossgroups.plugins</groupId>
<artifactId>bboss-elasticsearch-rest-jdbc</artifactId>
<version>5.3.7</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>jdbc</artifactId>
<version>6.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.23.1</version>
<scope>test</scope>
</dependency>
<!-- bboss客户端结束 -->

引用文章:
https://blog.csdn.net/No_Game_No_Life_/article/details/90407228