«Потоковая передача» сообщений брокеру из пакетного источника (например, базы данных sql), опционально, подтверждение обратно источнику

Я пишу код библиотеки, чтобы помочь другим разработчикам отправлять сообщения брокеру событий pub-sub.

Во многих случаях источник сообщения не будет «потоковым». В частности, большинство сообщений поступает из системы отслеживания измененных данных SQL Server и поэтому гораздо эффективнее считываются наборами (пакетами).

Кроме того, в большинстве случаев сообщение должно быть «гарантированным» – то есть, если мы не можем отправить сообщение брокеру, мы должны отправить его снова, даже если отправляющее приложение по какой-то причине выйдет из строя. Это означает, что источник должен знать, что сообщение было отправлено. Как и в случае с чтением из источника, ответное подтверждение также устанавливается на основе по той же причине.

Допустимо, что сообщение может быть “случайно” отправлено несколько раз в подобных исключениях (доставка с гарантированной доставкой – слишком сложная проблема, которую я могу решить в моей небольшой реализации здесь!), Но это должно быть не происходит, если все работает без сбоев.

Размеры пакетов могут быть большими, поэтому я действительно не хочу выполнять одновременно n задачи публикации для брокера, где n это количество сообщений в пакете, потому что потенциально может быть чертовски много Task объекты, которых ждет что-то вроде Task.WhenAll. Поэтому я использовал DataFlow как механизм ограничения параллелизма.

Вот созданные мной интерфейсы (я довольно доволен ими, поэтому пропустите их, если вам не нужен контекст, хотя мне интересно, AcknowledgeAsync метод должен принимать IAsyncEnumerable для симметрии). В IGuaranteedBatchProvider не распространяется IBatchProvider, потому что для подтверждения требуется только IMessage, а не IMessage<T>.


public interface IMessage
{
    /// <summary>
    /// Allows a message to be tracked locally through processing pipeline, either on the
    /// publication or subscription side, but the values sent by a publisher will not be the same
    /// as those received by a subscriber, since the LocalMessageId values for messages received
    /// by susbcribers are generated by the broker
    /// </summary>
    public long LocalMessageId { get; init; }
}

public interface IMessage<T> : IMessage
{
    public T Payload { get; init; }
}

public interface IBatchProvider<T>
{
    /// <summary>
    /// Returns a batch of unacknowledged messages from the source. If the provider is also a
    /// guaranteed provider, then all messages in a batch should be acknowledged prior to reading
    /// a new batch, otherwise the new batch will contain duplicate messages
    /// </summary>
    IAsyncEnumerable<IMessage<T>> ReadBatchAsync(ushort maxBatchSize, CancellationToken ct);
}
 
public interface IGuaranteedBatchProvider
{
    /// <summary>
    /// informs a guaranteed provider that a set of messages have been successfully transmitted.
    /// Acknowledged messages will not be returned by subsequent read operations
    /// </summary>
    Task AcknowledgeAsync(IEnumerable<IMessage> messagesToAcknowledge, CancellationToken ct);
}

public interface IPublication<T>
{
    /// <summary>
    /// sends a message to the broker, returns true if the payload is successfully published
    /// </summary>
    Task<bool> PublishAsync(T payload);
}

Код, который я хочу просмотреть, находится в моем BatchPublisher класс. Этот вспомогательный класс понимает, как работать с описанными выше интерфейсами. В Run цикл – это то, что следует. Я был бы особенно признателен за отзывы о разделе кода, отмеченном как //ugly?, будь то Channel реализация может быть более подходящей (я еще не использовал каналы) и, конечно же, какие-либо общие комментарии по поводу общего дизайна. Большая часть обработки исключений удалена для краткости.

Поставщиком здесь обычно будет CdcProvider<T> : IBatchProvider<T>, IGuaranteedBatchProvider

public async Task Run(CancellationToken ct)
{
    var results = new ConcurrentBag<(IMessage msg, bool result)>();

    // ugly?
    Func<IMessage<T>, Task> publish = provider is IGuaranteedBatchProvider
        ? async (msg) => results.Add((msg, await publication.PublishAsync(msg.Payload)))
        : async (msg) => _ = await publication.PublishAsync(msg.Payload);

    while (!ct.IsCancellationRequested)
    {
        results.Clear();

        // channels instead? Newing up the actionblock every batch seems wrong somehow
        ActionBlock<IMessage<T>> limiter = new
        (
            async (msg) =>
            {
                try { await publish(msg); }
               // if the consuming application as registered an event handler for publication
               // exceptions, call it (mostly for error logging)
                catch (Exception x) { PublishFailed?.Invoke(this, (msg, x)); }
            },
            new ExecutionDataflowBlockOptions
            {
                CancellationToken = ct,
                EnsureOrdered = true,
                MaxDegreeOfParallelism = sendConcurrency
            }
        );

        await foreach (var message in provider.ReadBatchAsync(maxReadBatchSize, ct))
        {
            limiter.Post(message);
        }
        limiter.Complete();
        await limiter.Completion;
        if (!results.IsEmpty && provider is IGuaranteedBatchProvider p)
        {
            await p.AcknowledgeAsync(results.Select(r => r.msg), ct);
        }
    }
}

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *