正如Val在评论中所说,您一次发送的数据可能会超出集群的处理能力。看来您可能正在尝试在 一个* 批量请求中发送 所有
文档,但对于许多文档或大型文档而言,这可能无法正常工作。 *
使用时
_bulk,除了可以同时发送到集群的批量请求数量外,您还需要将数据以多个批量请求的形式发送到集群,并找到每个批量请求中可以发送的 最佳
文档数。
此处没有确定最佳大小的硬性规定,因为它取决于文档的复杂性,分析方式,集群硬件,集群设置,索引设置等而有所不同。
最好的做法是从一个合理的数字开始,例如在一个请求中说500个文档(或在您的上下文中有意义的某个数字),然后从那里开始。计算每个批量请求的总大小(以字节为单位)也是一种不错的方法。如果性能和吞吐量不足,请增加文档数量,请求字节大小或并发请求,直到您开始看到
es_rejected_execution_exception。
NEST 5.x附带了一个方便的助手,使用
IObservable<T>和可观察的设计模式使批量请求变得更加容易
void Main(){ var client = new ElasticClient(); // can cancel the operation by calling .Cancel() on this var cancellationTokenSource = new CancellationTokenSource(); // set up the bulk all observable var bulkAllObservable = client.BulkAll(Getdocuments(), ba => ba // number of concurrent requests .MaxDegreeOfParallelism(8) // in case of 429 response, how long we should wait before retrying .BackOffTime(TimeSpan.FromSeconds(5)) // in case of 429 response, how many times to retry before failing .BackOffRetries(2) // number of documents to send in each request .Size(500) .Index("index-name") .RefreshonCompleted(), cancellationTokenSource.Token ); var waitHandle = new ManualResetEvent(false); Exception ex = null; // what to do on each call, when an exception is thrown, and // when the bulk all completes var bulkAllObserver = new BulkAllObserver( onNext: bulkAllResponse => { // do something after each bulk request }, onError: exception => { // do something with exception thrown ex = exception; waitHandle.Set(); }, onCompleted: () => { // do something when all bulk operations complete waitHandle.Set(); }); bulkAllObservable.Subscribe(bulkAllObserver); // wait for handle to be set. waitHandle.WaitOne(); if (ex != null) { throw ex; }}// Getting documents should be lazily enumerated collection ideallypublic static IEnumerable<document> Getdocuments(){ return Enumerable.Range(1, 10000).Select(x => new document { Id = x, Name = $"document {x}" } );}public class document{ public int Id { get; set; } public string Name { get; set; }}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)