Тройной буфер без блокировки

У меня один производитель и один потребитель, и производитель никогда не останавливается, но потребитель может не успевать. Нет необходимости потреблять каждый элемент, если мы всегда обращаемся к самому последнему произведенному элементу и никогда не обрабатываем элемент дважды. Другими словами, мы должны отбрасывать товары, если потребитель не успевает за ними, но ждать производителя, если это так.

#ifndef TRIPLE_BUFFER_HPP
#define TRIPLE_BUFFER_HPP

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>

template<typename T>
class triple_buffer
{
    // the actual buffer
    T buffer[3] = {};

    // the roles the buffers currently have
    // read and write buffers are private to each side
    // the available buffer is passed between them
    T* readbuffer = &buffer[0];
    T* writebuffer = &buffer[1];
    std::atomic<T*> available = &buffer[2];

    // the last fully-written buffer waiting ready for reader
    std::atomic<T*> next_read_buf = nullptr;

    // When the reader catches up, it needs to wait for writer (slow path only)
    std::mutex read_queue_mutex = {};
    std::condition_variable read_queue = {};

public:

    // Writer interface

    // Writer has ownership of this buffer (this function never blocks).
    T *get_write_buffer()
    {
        return writebuffer;
    }

    // Writer releases ownership of its buffer.
    void set_write_complete()
    {
        // give back the write buffer
        auto *written = writebuffer;
        writebuffer = available.exchange(writebuffer);
        // mark it as written
        next_read_buf.store(written, std::memory_order_release);
        // notify any waiting reader
        read_queue.notify_one();
    }

    // Reader interface

    // Reader gets ownership of the buffer, until the next call of
    // get_read_buffer().
    T *get_read_buffer(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
    {
        auto const timeout_time = std::chrono::steady_clock::now() + timeout;

        // get the written buffer, waiting if necessary
        auto *b = next_read_buf.exchange(nullptr);
        while (b != readbuffer) {
            // it could be the available buffer
            readbuffer = available.exchange(readbuffer);
            if (b == readbuffer) {
                // yes, that's it
                return readbuffer;
            }
            // else we need to wait for writer
            b = nullptr;
            std::unique_lock lock{read_queue_mutex};
            auto test = [this,&b]{ b = next_read_buf.exchange(nullptr); return b; };
            if (!read_queue.wait_until(lock, timeout_time, test)) {
                return nullptr;
            }
        }

        return readbuffer;
    }


    // The unit test helper is enabled only if <gtest.h> is included before this header.
    // It's not available (or necessary) in production code.
#ifdef TEST
    // N.B. not thread-safe - only call this when reader and writer are idle
    void test_invariant(const char *file, int line) const
    {
        const std::set<const T*> buffers{&buffer[0], &buffer[1], &buffer[2]};
        const std::set<const T*> roles{readbuffer, available, writebuffer};
        auto const fail = buffers != roles
            || next_read_buf && !buffers.count(next_read_buf)
            || next_read_buf == writebuffer;
        if (fail) {
            auto name = [this](const T *slot){
                if (slot == &buffer[0]) { return "buffer[0]"; }
                if (slot == &buffer[1]) { return "buffer[1]"; }
                if (slot == &buffer[2]) { return "buffer[2]"; }
                if (slot == nullptr) { return "(null)"; }
                return "(invalid)";
            };
            ADD_FAILURE_AT(file, line) <<
                "Buffer/role mismatch:n"
                "Buffers = " << &buffer[0] << ", " << &buffer[1] << ", " << &buffer[2] << "n"
                "Read = " << readbuffer << " = " << name(readbuffer) << "n"
                "Available = " << available  << " = "<< name(available) << "n"
                "Write = " << writebuffer << " = " << name(writebuffer) << "n"
                "Next Read = " << next_read_buf << " = " << name(next_read_buf) << "n";
        }
    }
#endif
};

#endif // TRIPLE_BUFFER_HPP

Я создал это с помощью набора модульных тестов:

#include <gtest/gtest.h>

#include <triple-buffer.hpp>

#include <set>


#define EXPECT_INVARIANT(obj) (obj.test_invariant(__FILE__, __LINE__))


TEST(triple_buffer, no_write_read_returns_null)
{
    triple_buffer<int> buffer;
    EXPECT_INVARIANT(buffer);
    EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
    EXPECT_INVARIANT(buffer);
}

TEST(triple_buffer, write_once_read_once)
{
    triple_buffer<int> buffer;
    auto *write0 = buffer.get_write_buffer();
    EXPECT_INVARIANT(buffer);
    EXPECT_NE(write0, nullptr);
    EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
    EXPECT_INVARIANT(buffer);

    buffer.set_write_complete();
    EXPECT_INVARIANT(buffer);
    // having written, we can read
    EXPECT_EQ(buffer.get_read_buffer({}), write0);
    EXPECT_INVARIANT(buffer);

    // another read should block/fail
    EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
    EXPECT_INVARIANT(buffer);
}

TEST(triple_buffer, write_twice_read)
{
    triple_buffer<int> buffer;
    auto *write0 = buffer.get_write_buffer();
    EXPECT_NE(write0, nullptr);
    buffer.set_write_complete();

    auto *write1 = buffer.get_write_buffer();
    EXPECT_NE(write1, nullptr);
    EXPECT_NE(write1, write0);
    buffer.set_write_complete();

    // read should get newest
    EXPECT_EQ(buffer.get_read_buffer({}), write1);
    // another read should block/fail
    EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
}

TEST(triple_buffer, write_read_write2)
{
    triple_buffer<int> buffer;
    auto *write0 = buffer.get_write_buffer();
    EXPECT_NE(write0, nullptr);
    buffer.set_write_complete();

    // read should get newest
    auto *read0 = buffer.get_read_buffer({});
    EXPECT_EQ(read0, write0);

    auto *write1 = buffer.get_write_buffer();
    EXPECT_NE(write1, nullptr);
    EXPECT_NE(write1, write0);
    buffer.set_write_complete();

    auto *write2 = buffer.get_write_buffer();
    EXPECT_NE(write2, nullptr);
    EXPECT_NE(write2, write1);
    EXPECT_NE(write2, read0);   // don't touch reader's buffer
    buffer.set_write_complete();

    // read should get newest
    auto *read1 = buffer.get_read_buffer({});
    EXPECT_EQ(read1, write2);
}

TEST(triple_buffer, write_read_write3)
{
    triple_buffer<int> buffer;
    auto *write0 = buffer.get_write_buffer();
    EXPECT_NE(write0, nullptr);
    buffer.set_write_complete();

    // read should get newest
    auto *read0 = buffer.get_read_buffer({});
    EXPECT_EQ(read0, write0);

    auto *write1 = buffer.get_write_buffer();
    EXPECT_NE(write1, nullptr);
    EXPECT_NE(write1, write0);
    buffer.set_write_complete();

    auto *write2 = buffer.get_write_buffer();
    EXPECT_NE(write2, nullptr);
    EXPECT_NE(write2, write1);
    EXPECT_NE(write2, read0);   // don't touch reader's buffer
    buffer.set_write_complete();

    auto *write3 = buffer.get_write_buffer();
    EXPECT_EQ(write3, write1);  // we should be overwriting the old unread value
    EXPECT_NE(write3, read0);   // still don't touch reader's buffer
    buffer.set_write_complete();

    // read should get newest
    auto *read1 = buffer.get_read_buffer({});
    EXPECT_EQ(read1, write3);
}


TEST(triple_buffer, read_during_write)
{
    triple_buffer<int> buffer;
    auto *write0 = buffer.get_write_buffer();
    EXPECT_NE(write0, nullptr);
    buffer.set_write_complete();

    auto *write1 = buffer.get_write_buffer();

    // read should get complete buffer
    auto *read0 = buffer.get_read_buffer({});
    EXPECT_EQ(read0, write0);

    buffer.set_write_complete();
    auto *write2 = buffer.get_write_buffer();
    EXPECT_NE(write2, nullptr);
    EXPECT_NE(write2, write1);
    EXPECT_NE(write2, read0);   // don't touch reader's buffer

    auto *read1 = buffer.get_read_buffer({});
    EXPECT_EQ(read1, write1);

    buffer.set_write_complete();
}

Очевидно, что модульные тесты ограничены и являются однопоточными. В частности, они не покрывают wait_until() вызывать или провоцировать любые скачки. Я чувствую, что мне, наверное, стоит создать модуль тесты, которые могут быть медленнее, чем модульные (я считаю, что время выполнения 1 мс является абсолютным верхним пределом для модульного теста), но у меня нет опыта использования таких более медленных тестов.

В любом случае, мои основные проблемы с кодом (как классом, так и его тестами) следующие:

  1. Правильность. Я считаю, что это правильно во всех случаях, но о параллельном коде, как известно, трудно полностью рассуждать, и я был бы признателен за любое понимание того, что я пропустил.
  2. Читаемость. Может ли это понять и изменить кто-то еще (в частности, я-будущее)?
  3. Учитывая, что я знаю, что есть только один читатель, мне действительно нужно read_queue_mutex быть участником, или мне сойдет с рук создание нового экземпляра в каждом get_read_buffer() вызов? Если да, то должен ли я?

1 ответ
1

Ответы на ваши вопросы

Правильность. Я считаю, что это правильно во всех случаях, но о параллельном коде, как известно, сложно полностью рассуждать, и я был бы признателен за любое понимание того, что я пропустил.

Не видит никаких проблем с правильностью.

Читаемость. Может ли это понять и изменить кто-то еще (в частности, я-будущее)?

Код без блокировки всегда сложно соблюдать. я думаю get_write_buffer() а также set_write_complete() функции относительно просты для понимания. Сложнее всего уследить get_read_buffer(). Я бы объяснил инварианты в коде:

  • readbuffer всегда принадлежит потребителю
  • writebuffer всегда принадлежит производителю
  • available никогда не принадлежит ни
  • вы можете атомарно обменивать read/writebuffer с участием available без нарушения указанных выше инвариантов

А затем объясните трюк: next_read_buf устанавливается производителем после завершения работы с буфером, а также меняет местами writebuffer а также available, после которого next_read_buf == available. Потребитель может атомарно захватить available в таком случае.

Ожидание необходимо в двух случаях: next_read_buf является nullptr (производитель еще ничего не производил с момента последнего потребления), или если он просто обменял available а также writebuffer, но не писал next_read_buf еще.

Учитывая, что я знаю, что существует только один читатель, действительно ли мне нужно, чтобы read_queue_mutex был членом, или я могу уйти с созданием нового экземпляра в каждом вызове get_read_buffer ()? Если да, то должен ли я?

Вы можете создавать новый экземпляр при каждом вызове. Либо вы платите небольшую цену за создание нового экземпляра std::mutex каждый звонок, или вы платите цену за std::mutex вокруг все время. Я не думаю, что это имеет большое значение для производительности. Я бы оставил все как есть, чтобы мьютекс и переменная условия в исходном коде находились близко друг к другу.

Эта проблема решена в C ++ 20 с помощью std::atomic_wait().

Рассмотрите возможность использования RAII для передачи владения буфером

Производитель должен сделать два шага: получить указатель на буфер для записи и затем освободить этот буфер, когда это будет сделано. Это похоже на работу для RAII. Похожий на std::lock_guardрассмотрите возможность добавления способа получения дескриптора буфера записи, который будет автоматически выполнять set_write_complete() когда его время жизни закончится. Вы даже можете сделать то же самое для буфера чтения ради симметрии.

  • Именно на такой обзор я надеялся — спасибо. Я не знала std::atomic_wait() — выглядит полезным, но жаль, что нет версии с тайм-аутом (иногда мне нужно опрашивать, чтобы проверить, хочет ли программа завершить работу). Я помню, как раньше думал об интерфейсе RAII, но никогда не думал об этом — спасибо, что напомнил мне подумать об этом еще раз.

    — Тоби Спейт

  • 2

    Хм, хороший момент по поводу тайм-аута. Но если это необходимо для проверки необходимости завершения программы, не делайте этого! Обычный способ избежать опроса — поставить в очередь новый элемент, который каким-то образом сигнализирует о том, что потребитель должен немедленно выйти. Или в этом случае, например, у вас может быть манекен T что вы указываете next_read_buf чтобы (например, использовать &buffer[3]), и вы можете проверить это на медленном пути в get_read_buffer().

    — Г. Сон


  • Хорошие предложения для обработчика прерывания. Думаю, я все еще могу изменить это и учесть ваш совет. У моих предметов уже есть довольно много участников (и я планирую добавить некоторых, чтобы отслеживать, какая часть предметов падает, и как долго остальные тратят время на ожидание чтения), поэтому добавление флага не должно быть проблемой.

    — Тоби Спейт


Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *