Created
December 21, 2015 12:04
-
-
Save rikkit/93ca50a3c20ed6610de6 to your computer and use it in GitHub Desktop.
Bulk Nest Publisher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using JetBrains.Annotations; | |
using Nest; | |
using TheFilter.Core.Entity.Customers.Managers; | |
/// <summary> | |
/// Publishes bulk ES operations in batches | |
/// </summary> | |
public class NestBulkPublisher : INestBulkPublisher | |
{ | |
internal class NestBulkOperation | |
{ | |
public Func<BulkDescriptor, BulkDescriptor> Operation { get; set; } | |
public BulkOperationResponseItem Response { get; set; } | |
public CancellationTokenSource Cts { get; set; } | |
public Exception Error { get; set; } | |
} | |
private readonly Subject<NestBulkOperation> _bulkSubject; | |
private readonly Subject<System.Reactive.Unit> _flushSubject; | |
private readonly TimeSpan _maxTaskWait; | |
private readonly TimeSpan _bufferTimeoutMs; | |
private readonly bool _shouldRefresh; | |
public event EventHandler<int> Published; | |
public IElasticClient Client { get; } | |
public NestBulkPublisher(IElasticClient nest, BulkPublisherConfig config) | |
: this(nest, config.BulkSize, config.PublishFrequency, config.Tolerance) | |
{ | |
} | |
public NestBulkPublisher(IElasticClient nest, int bufferLimit, TimeSpan publishFrequency, TimeSpan tolerance, bool refresh = true) | |
{ | |
Client = nest; | |
_bulkSubject = new Subject<NestBulkOperation>(); | |
_flushSubject = new Subject<System.Reactive.Unit>(); | |
_bufferTimeoutMs = publishFrequency; | |
_maxTaskWait = tolerance; | |
_shouldRefresh = refresh; | |
// Publish items after x ms, y items, or when FlushAsync() is called | |
var bufferTrigger = _bulkSubject | |
.Buffer(publishFrequency, bufferLimit) | |
.Select(op => System.Reactive.Unit.Default) | |
.Merge(_flushSubject); | |
_bulkSubject | |
.Buffer(() => bufferTrigger) | |
.Subscribe(PublishBuffer); | |
} | |
public Task<Result<BulkCreateResponseItem>> CreateAsync<T>(Func<BulkCreateDescriptor<T>, BulkCreateDescriptor<T>> operation) | |
where T : class | |
{ | |
return BulkAsync<BulkCreateResponseItem>(bulk => bulk.Create(operation)); | |
} | |
public Task<Result<BulkIndexResponseItem>> IndexAsync<T>(Func<BulkIndexDescriptor<T>, BulkIndexDescriptor<T>> operation) | |
where T : class | |
{ | |
return BulkAsync<BulkIndexResponseItem>(bulk => bulk.Index(operation)); | |
} | |
public Task<Result<BulkUpdateResponseItem>> UpdateAsync<T, U>(Func<BulkUpdateDescriptor<T, U>, BulkUpdateDescriptor<T, U>> operation) | |
where T : class | |
where U : class | |
{ | |
return BulkAsync<BulkUpdateResponseItem>(bulk => bulk.Update(operation)); | |
} | |
public Task<Result<BulkDeleteResponseItem>> DeleteAsync<T>(Func<BulkDeleteDescriptor<T>, BulkDeleteDescriptor<T>> operation) | |
where T : class | |
{ | |
return BulkAsync<BulkDeleteResponseItem>(bulk => bulk.Delete(operation)); | |
} | |
/// <summary> | |
/// Add the bulk operation to the queue, and return when done. | |
/// </summary> | |
private async Task<Result<T>> BulkAsync<T>(Func<BulkDescriptor, BulkDescriptor> operation) where T : BulkOperationResponseItem | |
{ | |
var cts = new CancellationTokenSource(); | |
var bulkOp = new NestBulkOperation | |
{ | |
Operation = operation, | |
Cts = cts | |
}; | |
// Publish the operation, this will be picked up by the subscribing observable. | |
_bulkSubject.OnNext(bulkOp); | |
// This isn't great. | |
// Delay on this thread for longer than it will take the buffer to be published. Expect a cancellation. | |
try | |
{ | |
await Task.Delay(_maxTaskWait, cts.Token).ConfigureAwait(false); | |
} | |
catch (TaskCanceledException tce) | |
{ | |
} | |
if (!cts.IsCancellationRequested) | |
{ | |
return Result<T>.SetException(bulkOp.Error ?? new TaskCanceledException("The bulk request was not sent within the expected time limit.")); | |
} | |
return !bulkOp.Response.IsValid | |
? Result<T>.SetException(new ApplicationException(bulkOp.Response.Error)) // TODO throw different kinds of exception based on actual error | |
: Result<T>.SetValue((T) bulkOp.Response); | |
} | |
private void PublishBuffer(IList<NestBulkOperation> currentBuffer) | |
{ | |
if (!currentBuffer.Any()) | |
{ | |
return; | |
} | |
var bulkDescriptor = new BulkDescriptor(); | |
foreach (var bulkOperation in currentBuffer) | |
{ | |
bulkOperation.Operation(bulkDescriptor); | |
} | |
bulkDescriptor.Refresh(_shouldRefresh); | |
var bulkResponse = Client.Bulk(bulkDescriptor); | |
Published?.Invoke(this, currentBuffer.Count); | |
if (bulkResponse.Items != null) | |
{ | |
// Set response values and release all waiting tasks | |
var zip = currentBuffer.Zip(bulkResponse.Items, (op, result) => new {op, result}); | |
foreach (var a in zip) | |
{ | |
a.op.Response = a.result; | |
a.op.Cts.Cancel(); | |
} | |
} | |
else | |
{ | |
foreach (var operation in currentBuffer) | |
{ | |
operation.Error = bulkResponse.ConnectionStatus.OriginalException; | |
} | |
} | |
} | |
/// <summary> | |
/// Flush all contents of the buffer. | |
/// </summary> | |
public async Task FlushAsync() | |
{ | |
_flushSubject.OnNext(System.Reactive.Unit.Default); | |
await Task.Delay(_bufferTimeoutMs); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment