У меня один производитель и один потребитель, и производитель никогда не останавливается, но потребитель может не успевать. Нет необходимости потреблять каждый элемент, если мы всегда обращаемся к самому последнему произведенному элементу и никогда не обрабатываем элемент дважды. Другими словами, мы должны отбрасывать товары, если потребитель не успевает за ними, но ждать производителя, если это так.
#ifndef TRIPLE_BUFFER_HPP
#define TRIPLE_BUFFER_HPP
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
template<typename T>
class triple_buffer
{
// the actual buffer
T buffer[3] = {};
// the roles the buffers currently have
// read and write buffers are private to each side
// the available buffer is passed between them
T* readbuffer = &buffer[0];
T* writebuffer = &buffer[1];
std::atomic<T*> available = &buffer[2];
// the last fully-written buffer waiting ready for reader
std::atomic<T*> next_read_buf = nullptr;
// When the reader catches up, it needs to wait for writer (slow path only)
std::mutex read_queue_mutex = {};
std::condition_variable read_queue = {};
public:
// Writer interface
// Writer has ownership of this buffer (this function never blocks).
T *get_write_buffer()
{
return writebuffer;
}
// Writer releases ownership of its buffer.
void set_write_complete()
{
// give back the write buffer
auto *written = writebuffer;
writebuffer = available.exchange(writebuffer);
// mark it as written
next_read_buf.store(written, std::memory_order_release);
// notify any waiting reader
read_queue.notify_one();
}
// Reader interface
// Reader gets ownership of the buffer, until the next call of
// get_read_buffer().
T *get_read_buffer(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
{
auto const timeout_time = std::chrono::steady_clock::now() + timeout;
// get the written buffer, waiting if necessary
auto *b = next_read_buf.exchange(nullptr);
while (b != readbuffer) {
// it could be the available buffer
readbuffer = available.exchange(readbuffer);
if (b == readbuffer) {
// yes, that's it
return readbuffer;
}
// else we need to wait for writer
b = nullptr;
std::unique_lock lock{read_queue_mutex};
auto test = [this,&b]{ b = next_read_buf.exchange(nullptr); return b; };
if (!read_queue.wait_until(lock, timeout_time, test)) {
return nullptr;
}
}
return readbuffer;
}
// The unit test helper is enabled only if <gtest.h> is included before this header.
// It's not available (or necessary) in production code.
#ifdef TEST
// N.B. not thread-safe - only call this when reader and writer are idle
void test_invariant(const char *file, int line) const
{
const std::set<const T*> buffers{&buffer[0], &buffer[1], &buffer[2]};
const std::set<const T*> roles{readbuffer, available, writebuffer};
auto const fail = buffers != roles
|| next_read_buf && !buffers.count(next_read_buf)
|| next_read_buf == writebuffer;
if (fail) {
auto name = [this](const T *slot){
if (slot == &buffer[0]) { return "buffer[0]"; }
if (slot == &buffer[1]) { return "buffer[1]"; }
if (slot == &buffer[2]) { return "buffer[2]"; }
if (slot == nullptr) { return "(null)"; }
return "(invalid)";
};
ADD_FAILURE_AT(file, line) <<
"Buffer/role mismatch:n"
"Buffers = " << &buffer[0] << ", " << &buffer[1] << ", " << &buffer[2] << "n"
"Read = " << readbuffer << " = " << name(readbuffer) << "n"
"Available = " << available << " = "<< name(available) << "n"
"Write = " << writebuffer << " = " << name(writebuffer) << "n"
"Next Read = " << next_read_buf << " = " << name(next_read_buf) << "n";
}
}
#endif
};
#endif // TRIPLE_BUFFER_HPP
Я создал это с помощью набора модульных тестов:
#include <gtest/gtest.h>
#include <triple-buffer.hpp>
#include <set>
#define EXPECT_INVARIANT(obj) (obj.test_invariant(__FILE__, __LINE__))
TEST(triple_buffer, no_write_read_returns_null)
{
triple_buffer<int> buffer;
EXPECT_INVARIANT(buffer);
EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
EXPECT_INVARIANT(buffer);
}
TEST(triple_buffer, write_once_read_once)
{
triple_buffer<int> buffer;
auto *write0 = buffer.get_write_buffer();
EXPECT_INVARIANT(buffer);
EXPECT_NE(write0, nullptr);
EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
EXPECT_INVARIANT(buffer);
buffer.set_write_complete();
EXPECT_INVARIANT(buffer);
// having written, we can read
EXPECT_EQ(buffer.get_read_buffer({}), write0);
EXPECT_INVARIANT(buffer);
// another read should block/fail
EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
EXPECT_INVARIANT(buffer);
}
TEST(triple_buffer, write_twice_read)
{
triple_buffer<int> buffer;
auto *write0 = buffer.get_write_buffer();
EXPECT_NE(write0, nullptr);
buffer.set_write_complete();
auto *write1 = buffer.get_write_buffer();
EXPECT_NE(write1, nullptr);
EXPECT_NE(write1, write0);
buffer.set_write_complete();
// read should get newest
EXPECT_EQ(buffer.get_read_buffer({}), write1);
// another read should block/fail
EXPECT_EQ(buffer.get_read_buffer({}), nullptr);
}
TEST(triple_buffer, write_read_write2)
{
triple_buffer<int> buffer;
auto *write0 = buffer.get_write_buffer();
EXPECT_NE(write0, nullptr);
buffer.set_write_complete();
// read should get newest
auto *read0 = buffer.get_read_buffer({});
EXPECT_EQ(read0, write0);
auto *write1 = buffer.get_write_buffer();
EXPECT_NE(write1, nullptr);
EXPECT_NE(write1, write0);
buffer.set_write_complete();
auto *write2 = buffer.get_write_buffer();
EXPECT_NE(write2, nullptr);
EXPECT_NE(write2, write1);
EXPECT_NE(write2, read0); // don't touch reader's buffer
buffer.set_write_complete();
// read should get newest
auto *read1 = buffer.get_read_buffer({});
EXPECT_EQ(read1, write2);
}
TEST(triple_buffer, write_read_write3)
{
triple_buffer<int> buffer;
auto *write0 = buffer.get_write_buffer();
EXPECT_NE(write0, nullptr);
buffer.set_write_complete();
// read should get newest
auto *read0 = buffer.get_read_buffer({});
EXPECT_EQ(read0, write0);
auto *write1 = buffer.get_write_buffer();
EXPECT_NE(write1, nullptr);
EXPECT_NE(write1, write0);
buffer.set_write_complete();
auto *write2 = buffer.get_write_buffer();
EXPECT_NE(write2, nullptr);
EXPECT_NE(write2, write1);
EXPECT_NE(write2, read0); // don't touch reader's buffer
buffer.set_write_complete();
auto *write3 = buffer.get_write_buffer();
EXPECT_EQ(write3, write1); // we should be overwriting the old unread value
EXPECT_NE(write3, read0); // still don't touch reader's buffer
buffer.set_write_complete();
// read should get newest
auto *read1 = buffer.get_read_buffer({});
EXPECT_EQ(read1, write3);
}
TEST(triple_buffer, read_during_write)
{
triple_buffer<int> buffer;
auto *write0 = buffer.get_write_buffer();
EXPECT_NE(write0, nullptr);
buffer.set_write_complete();
auto *write1 = buffer.get_write_buffer();
// read should get complete buffer
auto *read0 = buffer.get_read_buffer({});
EXPECT_EQ(read0, write0);
buffer.set_write_complete();
auto *write2 = buffer.get_write_buffer();
EXPECT_NE(write2, nullptr);
EXPECT_NE(write2, write1);
EXPECT_NE(write2, read0); // don't touch reader's buffer
auto *read1 = buffer.get_read_buffer({});
EXPECT_EQ(read1, write1);
buffer.set_write_complete();
}
Очевидно, что модульные тесты ограничены и являются однопоточными. В частности, они не покрывают wait_until()
вызывать или провоцировать любые скачки. Я чувствую, что мне, наверное, стоит создать модуль тесты, которые могут быть медленнее, чем модульные (я считаю, что время выполнения 1 мс является абсолютным верхним пределом для модульного теста), но у меня нет опыта использования таких более медленных тестов.
В любом случае, мои основные проблемы с кодом (как классом, так и его тестами) следующие:
- Правильность. Я считаю, что это правильно во всех случаях, но о параллельном коде, как известно, трудно полностью рассуждать, и я был бы признателен за любое понимание того, что я пропустил.
- Читаемость. Может ли это понять и изменить кто-то еще (в частности, я-будущее)?
- Учитывая, что я знаю, что есть только один читатель, мне действительно нужно
read_queue_mutex
быть участником, или мне сойдет с рук создание нового экземпляра в каждомget_read_buffer()
вызов? Если да, то должен ли я?
1 ответ
Ответы на ваши вопросы
Правильность. Я считаю, что это правильно во всех случаях, но о параллельном коде, как известно, сложно полностью рассуждать, и я был бы признателен за любое понимание того, что я пропустил.
Не видит никаких проблем с правильностью.
Читаемость. Может ли это понять и изменить кто-то еще (в частности, я-будущее)?
Код без блокировки всегда сложно соблюдать. я думаю get_write_buffer()
а также set_write_complete()
функции относительно просты для понимания. Сложнее всего уследить get_read_buffer()
. Я бы объяснил инварианты в коде:
readbuffer
всегда принадлежит потребителюwritebuffer
всегда принадлежит производителюavailable
никогда не принадлежит ни- вы можете атомарно обменивать
read/writebuffer
с участиемavailable
без нарушения указанных выше инвариантов
А затем объясните трюк: next_read_buf
устанавливается производителем после завершения работы с буфером, а также меняет местами writebuffer
а также available
, после которого next_read_buf == available
. Потребитель может атомарно захватить available
в таком случае.
Ожидание необходимо в двух случаях: next_read_buf
является nullptr
(производитель еще ничего не производил с момента последнего потребления), или если он просто обменял available
а также writebuffer
, но не писал next_read_buf
еще.
Учитывая, что я знаю, что существует только один читатель, действительно ли мне нужно, чтобы read_queue_mutex был членом, или я могу уйти с созданием нового экземпляра в каждом вызове get_read_buffer ()? Если да, то должен ли я?
Вы можете создавать новый экземпляр при каждом вызове. Либо вы платите небольшую цену за создание нового экземпляра std::mutex
каждый звонок, или вы платите цену за std::mutex
вокруг все время. Я не думаю, что это имеет большое значение для производительности. Я бы оставил все как есть, чтобы мьютекс и переменная условия в исходном коде находились близко друг к другу.
Эта проблема решена в C ++ 20 с помощью std::atomic_wait()
.
Рассмотрите возможность использования RAII для передачи владения буфером
Производитель должен сделать два шага: получить указатель на буфер для записи и затем освободить этот буфер, когда это будет сделано. Это похоже на работу для RAII. Похожий на std::lock_guard
рассмотрите возможность добавления способа получения дескриптора буфера записи, который будет автоматически выполнять set_write_complete()
когда его время жизни закончится. Вы даже можете сделать то же самое для буфера чтения ради симметрии.
Именно на такой обзор я надеялся — спасибо. Я не знала
std::atomic_wait()
— выглядит полезным, но жаль, что нет версии с тайм-аутом (иногда мне нужно опрашивать, чтобы проверить, хочет ли программа завершить работу). Я помню, как раньше думал об интерфейсе RAII, но никогда не думал об этом — спасибо, что напомнил мне подумать об этом еще раз.— Тоби Спейт
Хм, хороший момент по поводу тайм-аута. Но если это необходимо для проверки необходимости завершения программы, не делайте этого! Обычный способ избежать опроса — поставить в очередь новый элемент, который каким-то образом сигнализирует о том, что потребитель должен немедленно выйти. Или в этом случае, например, у вас может быть манекен
T
что вы указываетеnext_read_buf
чтобы (например, использовать&buffer[3]
), и вы можете проверить это на медленном пути вget_read_buffer()
.— Г. Сон
Хорошие предложения для обработчика прерывания. Думаю, я все еще могу изменить это и учесть ваш совет. У моих предметов уже есть довольно много участников (и я планирую добавить некоторых, чтобы отслеживать, какая часть предметов падает, и как долго остальные тратят время на ожидание чтения), поэтому добавление флага не должно быть проблемой.
— Тоби Спейт