当前位置:网站首页>恕我直言,我也是才知道ElasticSearch条件更新是这么玩的
恕我直言,我也是才知道ElasticSearch条件更新是这么玩的
2020-11-06 01:29:00 【尹吉欢】
背景
ElasticSearch 的使用度越来越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做订单搜索的。
大部分使用场景都是通过程序定期去导入数据到 ElasticSearch 中,或者通过 CDC 的方式来构建索引。在这种场景下,更新数据都是单条更新,比如 ID=1 的数据发生了修改操作,那么就会把 ElasticSearch 中 ID=1 的这条数据更新下。
但有些场景下需要根据条件同时更新多条数据,就像 Mysql 中我们使用 Update Table Set Name=XXX where Age=18 去更新一批数据一样。
正好有同学微信问我怎么批量更新,接下来就看看在 ElasticSearch 中是如何去进行按条件更新的操作。
单条更新
ElasticSearch 的客户端官方推荐使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 来构建代码。
首先来回顾下单条数据的更新是怎么做的,代码如下:
UpdateRequest updateRequest = new UpdateRequest(index, type, id);
updateRequest.doc(documentJson, XContentType.JSON);
restHighLevelClient.update(updateRequest, options);
构建 UpdateRequest 的时候就指定了索引,类型,ID 三个字段,也就精确到了某一条数据,所以更新的自然也是这一条数据。
条件更新
首先我们准备几条测试数据,如下:
{
id: 1,
title: "Java怎么学",
type: 1,
userId: 1,
tags: [
"java"
],
textContent: "我要学Java",
status: 1,
heat: 100
}
{
id: 2,
title: "Java怎么学",
type: 1,
userId: 1,
tags: [
"java"
],
textContent: "我要学Java",
status: 1,
heat: 100
}
假如我们的需求是将 userId=1 的所有文档数据改成无效,也就是 status=0。如果不用按条件更新,你就得查询出 userId=1 的所有数据,然后一条条更新,这就太慢了。
下面看看按条件更新是如何使用的,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
"script": {
"source":"ctx._source['status']=0;"
},
"query": {
"term": {
"userId": 1
}
}
}
按条件更新需要使用_update_by_query 来进行,query 用于指定更新数据的匹配条件,script 用于更新的逻辑。
详细使用文档:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html
在 Java 代码中如何实现条件更新呢?
UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1");
request.setQuery(new TermQueryBuilder("userId", 1));
request.setScript(new Script("ctx._source['status']=0;"));
restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
是不是也很简单,跟单条数据更新差不多,使用 UpdateByQueryRequest 构建更新对象,然后设置 Query 和 Script 就可以了。
条件更新数组
比如我们的需求是要移除 tags 中的 java,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
"script": {
"source":"ctx._source['tags'].removeIf(item -> item == 'java');"
},
"query": {
"term": {
"userId": 1
}
}
}
新增的话只需要将 removeIf 改成 add 就可以了。
ctx._source['tags'].add('java');
如果有特殊的业务逻辑,Script 中还可以写判断来判断是否需要修改。
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
"script": {
"source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}"
},
"query": {
"term": {
"userId": 1
}
}
}
封装通用的条件更新
大部分场景下的更新都比较简单,根据某个字段去更新某个值,或者去更新多个值。在 Java 中如果每个地方都去写脚本,就重复了,最好是抽一个比较通用的方法来更新。
下面是简单的示列,其中还有很多需要考虑的点,像数据类型我只处理了数字,字符串,和 List,其他的大家需要自己去扩展。
public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map<String, Object> document) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
updateByQueryRequest.setQuery(query);
StringBuilder script = new StringBuilder();
Set<String> keys = document.keySet();
for (String key : keys) {
String appendValue = "";
Object value = document.get(key);
if (value instanceof Number) {
appendValue = value.toString();
} else if (value instanceof String) {
appendValue = "'" + value.toString() + "'";
} else if (value instanceof List){
appendValue = JsonUtils.toJson(value);
} else {
appendValue = value.toString();
}
script.append("ctx._source.").append(key).append("=").append(appendValue).append(";");
}
updateByQueryRequest.setScript(new Script(script.toString()));
return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
}
public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) {
Map<String, Object> catData = new HashMap<>(1);
catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString());
return CatTransactionManager.newTransaction(() -> {
try {
return restHighLevelClient.updateByQuery(updateByQueryRequest, options);
}catch (IOException e) {
throw new RuntimeException(e);
}
}, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData);
}
如果有了这么一个方法,那么使用方式如下:
@Test
public void testUpdate5() {
Map<String, Object> document = new HashMap<>();
document.put("title", "Java");
document.put("status", 0);
document.put("tags", Lists.newArrayList("JS", "CSS"));
kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document);
}
关于作者 :尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。
我整理了一份很全的学习资料,感兴趣的可以微信搜索 「猿天地」,回复关键字 「学习资料」获取我整理好了的Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC分库分表,任务调度框架XXL-JOB,MongoDB,爬虫等相关资料。
版权声明
本文为[尹吉欢]所创,转载请带上原文链接,感谢
http://cxytiandi.com/blog/detail/36495
边栏推荐
猜你喜欢
Using tensorflow to forecast the rental price of airbnb in New York City
基于深度学习的推荐系统
How to select the evaluation index of classification model
如何对Pandas DataFrame进行自定义排序
不能再被问住了!ReentrantLock 源码、画图一起看一看!
读取、创建和运行多个文件的3个Python技巧
mac 安装hanlp,以及win下安装与使用
【jmeter】實現介面關聯的兩種方式:正則表示式提取器和json提取器
如何选择分类模型的评价指标
梯度下降算法在机器学习中的工作原理
随机推荐
【Flutter 實戰】pubspec.yaml 配置檔案詳解
基础知识点整理
8.1.2 handling global exceptions through simplemappingexceptionresolver
vite + ts 快速搭建 vue3 專案 以及介紹相關特性
免费的专利下载教程(知网、espacenet强强联合)
计组-字长
Python 基于jwt实现认证机制流程解析
keras model.compile损失函数与优化器
安装Consul集群
【數量技術宅|金融資料系列分享】套利策略的價差序列計算,恐怕沒有你想的那麼簡單
React 高阶组件浅析
为了省钱,我用1天时间把PHP学了!
Ubuntu18.04上安裝NS-3
被产品经理怼了,线上出Bug为啥你不知道
Skywalking系列博客1-安装单机版 Skywalking
使用Asponse.Words處理Word模板
tensorflow之tf.tile\tf.slice等函数的基本用法解读
Flink on PaaSTA:Yelp运行在Kubernetes上的新流处理平台
词嵌入教程
计算机TCP/IP面试10连问,你能顶住几道?