深耕ElasticSearch - 乐观锁解决并发冲突问题

深耕ElasticSearch - 乐观锁解决并发冲突问题,第1张

深耕ElasticSearch - 乐观锁解决并发冲突问题

文章目录
    • 1. 并发冲突问题
    • 2. ES的乐观锁并发控制
    • 3. _version元数据
    • 4. Replica Shard 数据同步机制
    • 5. 基于internal_version进行乐观锁并发控制实战
    • 6. 基于external_version进行乐观锁并发控制实战

1. 并发冲突问题

当我们更新文档时 ,可以一次性读取原始文档,做我们的修改,然后重新索引整个文档 。 最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在ES中。如果其他人同时更改这个文档,他们的更改将丢失。很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数据复制到ES中并使其可被搜索。 也许两个人同时更改相同的文档的几率很小。或者对于我们的业务来说偶尔丢失更改并不是很严重的问题。

但有时丢失了一个变更就是非常严重的。试想我们使用ES存储我们网上商城商品库存的数量, 每次我们卖一个商品的时候,我们在ES中将库存数量减少。有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。 假设有两个线程并发运行,每一个都同时处理所有商品的销售,如图:

正常的情况下,我们希望Thread-A将库存100-1,设置为99件,然后Thread-B将库存99-1,设置为98件,最终ES中的库存应该是98件。

但是实际上,在高并发的情况下,Thread-A将库存设置为99件,然后Thread-B再次将库存设置为99件,导致结果就出错了,Thread-A对库存做的更改已经丢失,因为Thread-B不知道它的库存拷贝已经过期,结果我们会认为有超过商品的实际数量的库存,因为卖给顾客的库存商品并不存在,我们将让他们非常失望。

变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更,因为这段时间里很可能数据在ES中已经修改了,那么我们拿到的就是旧数据,基于旧数据去 *** 作,后面结果肯定错了。


2. ES的乐观锁并发控制

在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:

① 悲观并发控制

这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。

② 乐观并发控制

ElasticSearch中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的 *** 作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。

如图所示,ElasticSearch的乐观锁并发控制:

1、Thread-A写数据的时候,会判断当前数据的版本号version=1和ES中的数据版本号version=1是否一致? 结果一致,Thread-A更新库存成功,此时ES中的库存=99件,版本号变成version=2。

2、Thread-B写数据的时候,会判断当前数据的版本号version=1和ES中数据的版本号version=2是否一致?结果不一致,线程B更新库存失败,但是Thread-B会重新获取ES中的最新库存数据,重新执行上述流程。

3、 Thread-B再次写数据的时候,会判断当前数据的版本号version=2和ES中的数据版本哈version=2是否一致?结果一致,Thread-B更新库存成功,此时ES中的库存=98件,版本号变成version=3。

我们可以利用 _version 号来确保应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。

乐观锁和悲观锁的优缺点分别为:

1、悲观锁的优点是方便,直接加锁,对应用程序来说,透明,不需要做额外的 *** 作;缺点是并发能力很低,同一时间只能有一条线程 *** 作数据。

2、乐观锁的优点是并发能力很高,不给数据加锁,大量数据并发 *** 作;缺点是麻烦,每次更新的时候,都需要比对版本号,然后可能还需要重新加载数据,再次修改,再写入数据,这个过程可能还要重复好几遍。


3. _version元数据

第一次创建一个document的时候,它的_version内部版本号就是1;以后每次对这个document执行修改或者删除 *** 作,都会将这个_version版本号自动加1;哪怕是删除,也会对这条数据的版本号加1。

PUT /text_index/_doc/1
{
  "test_field": "test test"
}
{
  "_index" : "text_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,         // 第一次创建,version=1
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

修改这条数据,版本号_version自动加1:

PUT /text_index/_doc/2
{
  "test_field":"test test2"
}
{
  "_index" : "text_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 14,
  "_primary_term" : 1
}

删除这条数据,版本号_version自动加1:

DELETE /text_index/_doc/2
{
  "_index" : "text_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 3,
  "result" : "deleted",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 15,
  "_primary_term" : 1
}

再次这条数据,会在delete version基础之上,再把version号加1:

PUT /text_index/_doc/2
{
  "test_field": "test test"
}
{
  "_index" : "text_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 4,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 16,
  "_primary_term" : 1
}

我们会发现,在删除一个document之后,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1。


4. Replica Shard 数据同步机制

ES是分布式的。每当primary shard中的文档创建、更新或删除时,新版本的文档必须同步到集群中其他节点的replica shard中。ES也是异步和并发的,这意味着这些同步请求被并行发送,如果primary shard中的同一个文档进行了多次修改,到达replica shard时也许顺序是乱的,可能会出现"后发先至"的情况。 ES需要一种方法确保文档的旧版本不会覆盖新的版本。

Java客户端向ES某条文档发起更新请求,共发出3次,Java端有严谨的并发请求控制,在ES的primary shard中写入的结果是正确的,但ES内部数据同步时,顺序不能保证都是先修改的先到,有可能先修改的后到了,后修改的先到了,比如下图中第三次更新请求比第二次更新请求先到了replica shard中:

如果ES内部没有并发控制,这个文档在primary shard中的数据是Filed=text3,在replica的数据可能是text2,这样肯定错了。预期的更新顺序应该是text1-->text2-->text3,最终的正确结果是text3。而实际的更新结果是text1-->text3-->text2那ES内部是如何做的呢?

Shard的数据同步也是基于内置的_version进行乐观锁并发控制的。

当我们之前讨论 index , GET 和 delete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增。 ES使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

ES内部在更新文档时,会比较一下version,如果请求的version等于文档的version,就做更新,如果请求的version不等于文档的version,说明此数据已经被后到的线程更新过了,此时会丢弃当前的请求。Field=text3,version=2这条数据在修改时,会比较文档的版本号version=3当前请求数据的版本号version=2,发现不相等直接将请求数据丢掉。因此最终的结果为text3,此时的更新顺序为text1-->text3。


5. 基于internal_version进行乐观锁并发控制实战

1、先构造一条数据出来

PUT /test_index/_doc/5
{
  "test field":"test test"
}

2、模拟两个客户端,都获取到了统一条数据

GET /test_index/_doc/6
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "6",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test field" : "test test"
  }
}

3、其中一个kibana客户端,先更新了一下这个数据,同时带上数据的版本号,确保es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改。

PUT /test_index/_doc/6?version=1
{
  "test field":"test client 1"
}

一些老的版本es使用version,但是新版本不支持了,会报这个错误,提示我们用if_seq_no和if_primary_term:

  • _version表示当前数据新的版本号;
  • _seq_no其实和version同一个道理,一旦数据发生更改,数据也一直是累计的;
  • _primary_term表示是由谁分配的,意思说如果文档在一个集群里面,文档肯定会被分配一个位置,_primary_term表示的就是一个位置;
{
  "error": {
    "root_cause": [
      {
        "type": "action_request_validation_exception",
        "reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
      }
    ],
    "type": "action_request_validation_exception",
    "reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
  },
  "status": 400
}

_seq_no和_primary_term是对_version的优化,7.X版本的ES默认使用这种方式控制版本,所以当在高并发环境下使用乐观锁机制修改文档时,要带上当前文档的_seq_no和_primary_term进行更新:

PUT /test_index/_doc/6?if_seq_no=1&if_primary_term=1
{
  "test field":"test client 1"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "6",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

4、另外一个kibana客户端,尝试基于_seq_no=1和_primary_term=1的数据去进行修改,进行乐观锁的并发控制,发现更新失败。

PUT /test_index/_doc/6?if_seq_no=1&if_primary_term=1
{
  "test field":"test client 2"
}
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[6]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [2] and primary term [1]",
        "index_uuid": "1q7xuNvBQAelYOZVjN0Luw",
        "shard": "0",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[6]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [2] and primary term [1]",
    "index_uuid": "1q7xuNvBQAelYOZVjN0Luw",
    "shard": "0",
    "index": "test_index"
  },
  "status": 409
}

5、基于最新的_seq_no=2和_primary_term=1进行修改,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下

PUT /test_index/_doc/6?if_seq_no=2&if_primary_term=1
{
  "test field":"test client 2"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "6",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

6. 基于external_version进行乐观锁并发控制实战

一个常见的设置是使用其它数据库作为主要的数据存储,使用ES做数据检索, 这意味着主数据库的所有更改发生时都需要被复制到ES ,如果多个进程负责这一数据同步,你可能遇到类似于之前描述的并发问题。

如果你的主数据库已经有了版本号, 那么你就可以在 ES中通过增加 version_type=external 到查询字符串的方式重用这些相同的版本号, 版本号必须是大于零的整数, 且小于 9.2E+18 — 一个 Java 中 long 类型的正值。

?version=1
?version=1&version_type=external

外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同,ES不是检查当前 _version 和请求中指定的版本号是否相同, 而是检查当前 _version 是否小于请求中指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储。

外部版本号不仅在索引和删除请求是可以指定,而且在创建新文档时也可以指定。

es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,比如说?version=2&version_type=external

1、先构造一条数据

PUT /test_index/_doc/7
{
  "test field":"test test"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "7",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 4,
  "_primary_term" : 1
}

2、模拟两个客户端同时查询到这条数据

GET /test_index/_doc/7
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "7",
  "_version" : 1,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test field" : "test test"
  }
}

3、第一个客户端先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如说是2

PUT /test_index/_doc/7?version=2&version_type=external
{
  "test field":"test client 1"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "7",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 5,
  "_primary_term" : 1
}

4、模拟第二个客户端,同时拿到了自己数据库中维护的那个版本号,也是2,同时基于version=2发起了修改

PUT /test_index/_doc/7?version=2&version_type=external
{
  "test field":"test client 2"
}
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[7]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "1q7xuNvBQAelYOZVjN0Luw",
        "shard": "0",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[7]: version conflict, current version [2] is higher or equal to the one provided [2]",
    "index_uuid": "1q7xuNvBQAelYOZVjN0Luw",
    "shard": "0",
    "index": "test_index"
  },
  "status": 409
}

5、在并发控制成功后,重新基于最新的版本号发起更新

PUT /test_index/_doc/7?version=3&version_type=external
{
  "test field":"test client 2"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "7",
  "_version" : 3,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 6,
  "_primary_term" : 1
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5655856.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存