简介
es dao 提供简便的 elastic search 数据库操作 ORM, 支持数据 CRUD 以及 sql 查询, 对外可直接提供 jdbc 只读接口.
通过 es dao,即使没有任何 elastic search 基础的开发人员可以其直接当作 sql 库来简单使用, 降低入门门槛。
本文主要介绍 elastic 的 sql 查询能力。本工具提供了基于 xpack 和 dsl 两个版本的实现类, 默认 sqlFind 开头的方法使用 xpack sql api, commonFind 开头的方法使用 dsl api.
包括 EsDaoImpl 在内的所有 SqlDdlDao 都支持注入到 SqlDdlDaoMultipleImpl 做跨存储引擎的数据查询.
使用说明
- es-dao 项目中的 EsDaoImpl 依赖 rest-high-level-client 7.0.1 兼容 7.X 系列的 elastic search (最高实测了7.8.0版), 不支持 6.x 或更低版本.
- es-dao-v6 项目中的 EsDaoImpl 依赖 rest-high-level-client 6.2.2 最低测试过 elastic search 5.5.3 .
- EsDaoImpl 写入时, id 储存为 _id.
- elastic search 建表后允许增加字段, 但不允许修改字段类型.
- elastic search 中 update 性能比较低, 需要尽量避免单字段修改.
- elastic search 中不允许深度分页, 默认 skip + limit 不得大于 10000. 建议使用 scroll api.
本工具类所有返回Stream的接口都使用了scroll api
scroll api 可以获取无限多的数据但是有以下限制: 1.不支持skip 2.只能按顺序向后翻页
- 默认使用的 xpack-sql api 查询 sql, 该 api 不支持查询数组字段. 可以使用 DSL 接口代替.
本工具类中 commonFind 开头的函数使用了 DSL 接口.
text 型字段在 group by, order by 等场景下需要使用 fieldName.keyword 来防止分词, 否则会报错.
maven 依赖
<dependency>
<groupId>cn.linpengfei.sybnutil</groupId>
<artifactId>sybn-jdbc-driver</artifactId>
<version>0.3.15-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.linpengfei.sybnutil</groupId>
<artifactId>es-dao</artifactId>
<version>0.3.15-SNAPSHOT</version>
</dependency>
注意: 在 spring boot 中使用时需要手动指定 rest-high-level-client 版本.
此版本号一般向后兼容, 必须小于等于实际使用的 elasticsearch 服务端版本号.
比如 elasticsearch 7.x 建议使用 rest-high-level-client 7.0.1
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.0.1</version>
</dependency>
基于 dao 查询 demo
// 使用指定的连接访问
EsDao dao = new EsDaoImpl("junit", "http://127.0.0.1:9200");
// 从配置文件读取连接
//EsDao dao = new EsDaoConfImpl("test", "junit_test@junit_test_init.properties");
// 传统sql占位符
dao.sqlFindListMap("select * from a in (?,?,?)", 1,2,3);
// 一个?占位一个集合
List<Integer> list = ListUtil.toList(1,2,3);
dao.sqlFindListMap("select * from a in (?)", list);
// myBatis 风格
Map<String, Object> map = new HashMap();
map.put("list", list)
dao.sqlFindListMap("select * from a in (#{xxx})", map);
注意: 前面的例子依赖了 xpack sql 来执行 select 但是此方案不支持返回数组类型的数据
某些场景下需要返回数组类型时, 可以使用 dsl 版.
dsl 版 dao, 不使用 xpack sql 的版本
dsl 版 api 与 xpack 版完全一致, 只有 实现类的名称有所区别, 多了 Dsl 三个字母
// 使用指定的连接访问
EsDao dao = new EsDslDaoImpl("junit", "http://127.0.0.1:9200");
// 从配置文件读取连接
//EsDao dao = new EsDslDaoConfImpl("test", "junit_test@junit_test_init.properties");
// 传统sql占位符
dao.sqlFindListMap("select * from a in (?,?,?)", 1,2,3);
// 一个?占位一个集合
List<Integer> list = ListUtil.toList(1,2,3);
dao.sqlFindListMap("select * from a in (?)", list);
// myBatis 风格
Map<String, Object> map = new HashMap();
map.put("list", list)
dao.sqlFindListMap("select * from a in (#{xxx})", map);
注意: dsl 的 group by 实现是基于客户端的, 因此不要使用 dsl dao 做大规模 group by, 否则性能会比较差.
jdbc demo
- jdbc 驱动直接查询 elastic search 7.4
// 创建 jdbc 连接
String url = "jdbc:es://127.0.0.1:9200/";
Properties properties = new SybnProperties(n);
Connection connect = new SybnDaoDriver().connect(url, properties);
// 被执行的 sql
String selectSql = "select * from sybn_junit_crud_test_entry where type = ? limit 1";
// 使用 jdbc 执行此 sql
PreparedStatement selectStatement = connect.prepareStatement(selectSql);
selectStatement.setInt(1, 0); // type = 0
ResultSet selectResultSet = selectStatement.executeQuery();
List<Map<String, Object>> select = HandlerUtil.MAP_LIST_HANDLER.handle(selectResultSet);
selectResultSet.close();
// 打印结果
LogUtil.info("select", select.size(), select);
在 mybatis 中使用
在 spring boot + mybatis 中使用本工具类的 JDBC 能力查询 elastic search 无需特殊设置, 正常注册 jdbc 即可:
@Slf4j
@Configuration
public class ElasticDataSourceConfig {
@Value("${spring.profiles.active}")
private String activeProfile;
// sybn.datasource.elastic.url=jdbc:es://127.0.0.1:9200/
@Value("${sybn.datasource.elastic.url}")
private String url;
public static DruidDataSource esDataSource(){
testJdbcProperties = new JdbcProperties();
// SybnDaoDriver 会根据jdbc后面的字符串决定连接哪种数据库, 支持 mysql / mongo / solr / es / hbase
// 需要引用对应的 maven 实现, 比如 dbutil-dao / mongo-dao / solr-dao / es-dao / hadoop-dao
// url 中多个地址逗号分割, 比如: jdbc:es://server1:9200,server3:9200,server3:9200/
testJdbcProperties.setUrl(url);
testJdbcProperties.setDriverClassName("cn.sybn.util.io.driver.SybnDaoDriver");
ElasticDataSourceConfig config = new ElasticDataSourceConfig();
config.activeProfile = "local";
return config.dataSource(testJdbcProperties);
}
}
直接查询 dsl
如果需要自定义 dsl 语句查询数据, 以下两种写法等效
// 利用 lowLevelClient 6.2 直接查数, 其他版本的 performRequest 语法略有不同
HttpEntity entity = new NStringEntity("{\"query\":{\"bool\":{\"filter\":[{\"range\":{\"id\":{\"gt\":\"123\"}}}]}},\"sort\":[{\"id\":{\"order\":\"asc\"}}],\"size\":1000}", ContentType.APPLICATION_JSON);
Response resp = lowLevelClient.performRequest("POST", "/maoyan_movie_schedule/_search", Collections.emptyMap(), entity);
TotalArrayList<Map<String, Object>> maps = EsDocumentUtil.searchResponseToMaps(resp);
task.infos("dsl", maps.getTotal(), maps.size());
// 利用 sql 查询数据
List<Map<String, Object>> maps1 = dao().sqlFindListMap("select * from maoyan_movie_schedule where id > ? order by id limit 1000", "123");
TotalArrayList<Map<String, Object>> maps3 = (TotalArrayList<Map<String, Object>>) maps1;
task.infos("sql", maps3.getTotal(), maps3.size());
在线测试 (TODO)
可以用以下链接尝测试执行 sql, 其中的 sql 及 json数据 可以随意替换. 也可以使用测试表: sql_demo_table,mongo_demo_table,cinema_info
测试服务器配置较低暂时不提供 elastic search 在线测试, 以上连接可以测试 sql 和 mongo 联合查询.
如需测试 es jdbc 请使用离线版 demo,