当前位置:网站首页>恕我直言,我也是才知道ElasticSearch条件更新是这么玩的
恕我直言,我也是才知道ElasticSearch条件更新是这么玩的
2020-11-06 01:29:00 【尹吉欢】
背景
ElasticSearch 的使用度越来越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做订单搜索的。
大部分使用场景都是通过程序定期去导入数据到 ElasticSearch 中,或者通过 CDC 的方式来构建索引。在这种场景下,更新数据都是单条更新,比如 ID=1 的数据发生了修改操作,那么就会把 ElasticSearch 中 ID=1 的这条数据更新下。
但有些场景下需要根据条件同时更新多条数据,就像 Mysql 中我们使用 Update Table Set Name=XXX where Age=18 去更新一批数据一样。
正好有同学微信问我怎么批量更新,接下来就看看在 ElasticSearch 中是如何去进行按条件更新的操作。
单条更新
ElasticSearch 的客户端官方推荐使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 来构建代码。
首先来回顾下单条数据的更新是怎么做的,代码如下:
UpdateRequest updateRequest = new UpdateRequest(index, type, id);updateRequest.doc(documentJson, XContentType.JSON);restHighLevelClient.update(updateRequest, options);
构建 UpdateRequest 的时候就指定了索引,类型,ID 三个字段,也就精确到了某一条数据,所以更新的自然也是这一条数据。
条件更新
首先我们准备几条测试数据,如下:
{id: 1,title: "Java怎么学",type: 1,userId: 1,tags: ["java"],textContent: "我要学Java",status: 1,heat: 100}{id: 2,title: "Java怎么学",type: 1,userId: 1,tags: ["java"],textContent: "我要学Java",status: 1,heat: 100}
假如我们的需求是将 userId=1 的所有文档数据改成无效,也就是 status=0。如果不用按条件更新,你就得查询出 userId=1 的所有数据,然后一条条更新,这就太慢了。
下面看看按条件更新是如何使用的,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query{"script": {"source":"ctx._source['status']=0;"},"query": {"term": {"userId": 1}}}
按条件更新需要使用_update_by_query 来进行,query 用于指定更新数据的匹配条件,script 用于更新的逻辑。
详细使用文档:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html
在 Java 代码中如何实现条件更新呢?
UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1");request.setQuery(new TermQueryBuilder("userId", 1));request.setScript(new Script("ctx._source['status']=0;"));restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
是不是也很简单,跟单条数据更新差不多,使用 UpdateByQueryRequest 构建更新对象,然后设置 Query 和 Script 就可以了。
条件更新数组
比如我们的需求是要移除 tags 中的 java,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query{"script": {"source":"ctx._source['tags'].removeIf(item -> item == 'java');"},"query": {"term": {"userId": 1}}}
新增的话只需要将 removeIf 改成 add 就可以了。
ctx._source['tags'].add('java');
如果有特殊的业务逻辑,Script 中还可以写判断来判断是否需要修改。
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query{"script": {"source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}"},"query": {"term": {"userId": 1}}}
封装通用的条件更新
大部分场景下的更新都比较简单,根据某个字段去更新某个值,或者去更新多个值。在 Java 中如果每个地方都去写脚本,就重复了,最好是抽一个比较通用的方法来更新。
下面是简单的示列,其中还有很多需要考虑的点,像数据类型我只处理了数字,字符串,和 List,其他的大家需要自己去扩展。
public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map<String, Object> document) {UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);updateByQueryRequest.setQuery(query);StringBuilder script = new StringBuilder();Set<String> keys = document.keySet();for (String key : keys) {String appendValue = "";Object value = document.get(key);if (value instanceof Number) {appendValue = value.toString();} else if (value instanceof String) {appendValue = "'" + value.toString() + "'";} else if (value instanceof List){appendValue = JsonUtils.toJson(value);} else {appendValue = value.toString();}script.append("ctx._source.").append(key).append("=").append(appendValue).append(";");}updateByQueryRequest.setScript(new Script(script.toString()));return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);}public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) {Map<String, Object> catData = new HashMap<>(1);catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString());return CatTransactionManager.newTransaction(() -> {try {return restHighLevelClient.updateByQuery(updateByQueryRequest, options);}catch (IOException e) {throw new RuntimeException(e);}}, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData);}
如果有了这么一个方法,那么使用方式如下:
@Testpublic void testUpdate5() {Map<String, Object> document = new HashMap<>();document.put("title", "Java");document.put("status", 0);document.put("tags", Lists.newArrayList("JS", "CSS"));kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document);}
关于作者 :尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。
我整理了一份很全的学习资料,感兴趣的可以微信搜索 「猿天地」,回复关键字 「学习资料」获取我整理好了的Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC分库分表,任务调度框架XXL-JOB,MongoDB,爬虫等相关资料。

版权声明
本文为[尹吉欢]所创,转载请带上原文链接,感谢
http://cxytiandi.com/blog/detail/36495
边栏推荐
- 聆听无声的话语:手把手教你用ModelArts实现手语识别
- 适合时间序列数据的计算脚本
- 一文带你了解 Jest 单元测试
- 自然语言处理-错字识别(基于Python)kenlm、pycorrector
- 【C/C++ 2】Clion配置与运行C语言
- 面经手册 · 第16篇《码农会锁,ReentrantLock之公平锁讲解和实现》
- 按指定基准对齐的分组运算
- Azure Data Factory(三)整合 Azure Devops 實現CI/CD
- 使用ES5实现ES6的Class
- 8.1.2 handling global exceptions through simplemappingexceptionresolver
猜你喜欢
随机推荐
如何对Pandas DataFrame进行自定义排序
用git2consul从Git同步配置到Consul
写一个通用的幂等组件,我觉得很有必要
8.2.3 implementation of interceptors (interceptors and filters) through handlerinterceptor
聆听无声的话语:手把手教你用ModelArts实现手语识别
业务策略、业务规则、业务流程和业务主数据之间关系 - modernanalyst
Using tensorflow to forecast the rental price of airbnb in New York City
Vue.js移动端左滑删除组件
6.8 multipartresolver file upload parser (in-depth analysis of SSM and project practice)
windows10 tensorflow(二)原理实战之回归分析,深度学习框架(梯度下降法求解回归参数)
JUC 包下工具类,它的名字叫 LockSupport !你造么?
前端模組化簡單總結
安装Anaconda3 后,怎样使用 Python 2.7?
NodeJs爬虫抓取古代典籍,共计16000个页面心得体会总结及项目分享
Clean架构能够解决哪些问题? - jbogard
深度解读智能推荐系统搭建之路 | 会展云技术揭秘
看完这篇就看懂了很多webpack脚手架
面经手册 · 第16篇《码农会锁,ReentrantLock之公平锁讲解和实现》
OPTIMIZER_TRACE详解
vite + ts 快速搭建 vue3 專案 以及介紹相關特性







