Я пишу простой кольцевой буфер для собственного обучения. Ниже приведен пример стратегии, описанной в http://www.cse.cuhk.edu.hk/~pclee/www/pubs/ancs09poster.pdf : Производитель и Потребитель хранят локальные копии индексов записи и чтения в разных строках кэша и, насколько это возможно, избегают касания общих версий тех же самых.
Производительность кажется на уровне очереди boost spspc. Любые указания о том, как это улучшить, приветствуются. Я использую volatile-переменные, а не std :: atomic, потому что по непонятным мне причинам производительность лучше с volatile.
Любое понимание там было бы очень кстати. Я понимаю, что без std :: atomic код может работать только на x86_64.
#include <array>
#include <atomic>
#include <limits>
#include <uchar.h>
// Single Producer Single Consumer ring buffer.
// based on http://www.cse.cuhk.edu.hk/~pclee/www/pubs/ancs09poster.pdf
template <class T, size_t MAX_SZ = 10>
class RingBuffer {
static const size_t cache_line = 64;
public:
RingBuffer()
: // Shared control variables
shared_r(0), shared_w(0),
// Consumer state
consumer_w(0), consumer_r(0),
// Producer state
producer_r(0), producer_w(0), uncommited_writes(0) {}
// Called only by the single producer thread
// -----------------------------------------
template <class... ARG>
bool emplace_enqueue_one(ARG &&... arg) {
auto result = emplace_enqueue_batch(std::forward<ARG>(arg)...);
commit_writes();
return result;
}
template <class... ARG>
bool emplace_enqueue_batch(ARG &&... arg) {
// Where would the write position be after we enqueue this element?
size_t next_w = calc_next(producer_w);
// We always keep an empty slot between the read and write
// positions, rather than fill our entire buffer. We do this to
// be able to distinguish between empty (w == r) and full
// (next(w) == r) buffers. Since we are consulting the
// producer's copy of the shared read position (producer_r), not
// the actual read position (shared_r), we might get a false
// positive (that is we might think we are full when we are not)
// but not a false negative (that is we think the queue is not
// full we are right)
if (next_w == producer_r) {
// At this point we might be full. To be sure we need to do
// the more expensive read of the shared read position
// variable
size_t actual_r = get_shared_r();
if (next_w == actual_r) {
// We are indeed full. At this point we might have to
// force a commit so that the consumer can see (and drain)
// uncommited writes.
commit_writes();
return false;
} else
// We are not actually full, update our local copy of the
// read position and carry on.
producer_r = actual_r;
}
// Enqueue
new (&buffer[producer_w]) T(std::forward<ARG>(arg)...);
// Update our copy of the write position but do not actually
// update the shared write position. We leave it up to the
// caller as to when the writes should be visible to the
// consumer. This allows the caller to amortize the expensive
// update fo the shared_w variable over multiple writes.
producer_w = next_w;
uncommited_writes++;
return true;
}
void commit_writes() {
if (uncommited_writes) {
uncommited_writes = 0;
set_shared_w(producer_w);
}
}
// Called only by the single consumer thread
// -----------------------------------------
template <class C>
size_t consume_one(C &&c) {
return consume_(std::forward<C>(c), 1);
}
template <class C>
size_t consume_all(C &&c) {
return consume_(std::forward<C>(c), std::numeric_limits<size_t>::max());
}
private:
template <class C>
size_t consume_(C c, size_t max_consume_count) {
size_t consumed_count = 0;
while (consumed_count < max_consume_count) {
// Could we be empty?
if (consumer_w == consumer_r) {
// We could, but to be sure we have to do the expensive
// read of the shared write position.
size_t actual_w = get_shared_w();
if (consumer_r == actual_w) {
// We are actually empty. If we managed to read
// anything so far then update the shared read
// position.
if (consumed_count)
set_shared_r(consumer_r);
return consumed_count;
} else
// We were not actually empty. Update our copy of the
// write position. We will do the read below.
consumer_w = actual_w;
}
consumed_count++;
c(buffer[consumer_r]);
buffer[consumer_r].~T();
consumer_r = calc_next(consumer_r);
}
// If we reach this point that means we were able to consume
// max_consume_count items, so we need to update the shared_r
// position.
set_shared_r(consumer_r);
return consumed_count;
}
size_t calc_next(size_t p) const {
if (p < (MAX_SZ - 1))
return p + 1;
else
return 0;
}
size_t get_shared_r() { return shared_r; }
void set_shared_r(size_t r) { shared_r = r; }
size_t get_shared_w() { return shared_w; }
void set_shared_w(size_t w) { shared_w = w; }
// cacheline 1 : shared control variables
// read position is known to be larger or equal than this
volatile size_t shared_r;
// write position is known to be larger or equal than this
volatile size_t shared_w;
char padding1[cache_line - 2 * sizeof(size_t)];
// cacheline 2: consumer state
size_t consumer_w; // last known write position (to the consumer)
size_t consumer_r; // current consumer read position
char padding2[cache_line - 2 * sizeof(size_t)];
// cacheline 3: producer state
size_t producer_r; // last known read position (to the producer)
size_t producer_w; // current producer write position
size_t uncommited_writes; // how far ahead is producer_w from shared_w
char padding3[cache_line - 3 * sizeof(size_t)];
// cache line 5: start of actual buffer
std::array<T, MAX_SZ> buffer;
};
РЕДАКТИРОВАТЬ: В ответ на отзывы я изменил приведенный выше код, чтобы использовать атомики следующим образом:
size_t get_shared_r() { return shared_r.load(std::memory_order_acquire); }
void set_shared_r(size_t r) { shared_r.store(r, std::memory_order_release); }
size_t get_shared_w() { return shared_w.load(std::memory_order_acquire); }
void set_shared_w(size_t w) { shared_w.store(w, std::memory_order_release); }
Я считаю, что мне не нужно ничего более сильного, чем это (хотя я могу ошибаться — я новичок в подобном программировании). Однако я понимаю, и я подтвердил это, проверив сгенерированный ассемблер, что в x86_64 загрузки всегда получают, а хранилища всегда освобождают. Таким образом, я ожидал, что производительность будет такой же, как и в случае нестабильного. К сожалению, это не так, атомарная версия примерно в два раза медленнее, чем изменчивая. Я ломаю голову над этим.
2 ответа
Производительность с volatile выше, потому что это небезопасно, поскольку оно не создает барьер памяти и, следовательно, имеет гонку данных и не должно использоваться в многопоточном контексте.
Таким образом, код нарушен по прямому назначению и может рассматриваться как тема для этого сайта. Однако я отвечу, так как это общая ошибка, лежащая в основе многих ошибок:
volatile
не означает потокобезопасный или атомарный!
Безопасность потоков включает в себя множество аспектов, таких как обеспечение того, чтобы ядра ЦП видели согласованное представление об основной памяти (например, отключение или обновление строк кэша между ядрами) и обеспечение того, чтобы операции с памятью выполнялись вообще (вы были бы удивлены, насколько компиляторы могут оптимизировать, особенно лязг) и в правильном порядке и атомарно. Из этих свойств volatile
только гарантирует порядок и доступность. Для остального вам нужны барьеры памяти и т. Д.
Только изменчивость означает, что компилятор будет передавать все операции чтения и записи в переменные в том виде, в каком они записаны, таким образом игнорируя возможности оптимизации и не переупорядочивая операции в энергозависимой памяти.
Единственная причина использовать volatile — это когда вы взаимодействуете с оборудованием, таким как регистры микроконтроллера или драйверы устройств.
Я повторяю, volatile
почти всегда неверно И / ИЛИ медленно. Если вы не пишете драйверы или прошивки, забудьте, что они когда-либо существовали.
std::size_t
постоянно пишется с ошибками во всем коде. Кажется, ваша реализация объявляет size_t
а также std::size_t
(что разрешено), но вы не должны зависеть от этого.
Как говорит Эмили, неправильно использовать volatile
где вы должны использовать атомарный тип.
std::array<T, MAX_SZ> buffer;
Если T
дорого строить, то это плохой выбор. Вам нужно будет выделить неинициализированную память и построить на месте как std::vector
делает. На самом деле логика здесь повсюду, потому что я вижу, что вы строите на месте, но перезаписать существующие объекты не разрушая их. Это полностью неправильно и опасно.
template <class T, std::size_t MAX_SZ = 10>
Используйте именование ALL_CAPS только для макросов. Использование для корректных идентификаторов C ++ разбавляет предупреждающее сообщение, передаваемое заглавными буквами.
// cacheline 2: consumer state std::size_t consumer_w; // last known write position (to the consumer) std::size_t consumer_r; // current consumer read position char padding2[cache_line - 2 * sizeof(std::size_t)];
Вместо того, чтобы считать размер переменных в строке кэша, проще использовать анонимное объединение:
// cacheline 2: consumer state
union {
char padding2[cache_line];
struct {
std::size_t w = 0; // last known write position (to the consumer)
std::size_t r = 0; // current consumer read position
} consumer;
};
Аналогично для других блоков, выровненных по кешу. Или рассмотрите возможность явно указать выравнивание с помощью alignas()
.
Не знаю, почему у нас есть эти закрытые члены для простых выражений:
std::size_t get_shared_r(); void set_shared_r(std::size_t r); std::size_t get_shared_w(); void set_shared_w(std::size_t w);
Просто напишите выражения прямо в коде, а не загромождайте класс этими функциями.
Я согласен с тем, что «volatile не означает потокобезопасность или атомарность!» и что большинство случаев использования volatile a обычно неверно, но я думаю, что volatile полезен в немного большем количестве контекстов, чем взаимодействие с оборудованием. любая операция с памятью, которая не требует аппаратной синхронизации, но непредсказуема для компилятора, должна быть изменчивой. это также необходимо при использовании обработчика сигналов и с некоторыми схемами отображения памяти (например, несколько страниц виртуальной памяти с использованием одной и той же базовой страницы в физической памяти) и в других ситуациях, о которых я не думал.
— Тайкер
Я знал, что использование volatile было подозрительным, но я видел в статье, на которую я ссылался, и тестирование / профилирование было многообещающим. Я не на 100% уверен, что это неверно для конкретной платформы, которая меня интересует (x86_64), которая имеет действительно сильную модель памяти, но я обязательно установлю это, прежде чем помещать это где-нибудь в производство. Спасибо за вклад.
— samwise
@Tyker Если бы вам когда-либо приходилось программировать драйвер устройства для машины с DEC Alpha, вы бы знали, что
volatile
бесполезен даже для взаимодействия с оборудованием. Он все еще используется, но я думаю, что почти во всех случаях использование атомарных переменных лучше. Если вы знаете, когда безопасно использоватьvolatile
то вы, вероятно, сможете получить ту же производительность с атомиксом, если пройдете правильную порядок памяти параметр дляload()
иstore()
.— Г. Сон
@samwise Я прочитал газету и у меня возникли подозрения. Они не объяснили, почему это будет правильно, и не упомянули о проверке или проверке правильности в своей оценке. Они удобно опускают упоминание о том, что им пришлось использовать сходство потоков для достижения наиболее заметного результата, когда кеш L2 не используется совместно, выгода, по-видимому, заключается только в пакетной обработке …
— Эмили Л.
Далее они упоминают, что требуется, чтобы считывание управляющих переменных было неделимым, и заявляют, что это обычно так, без примеров. К тому же невыровненное чтение / запись на x86, например, не является атомарным, есть множество особых сценариев, которые необходимо учитывать. Они также удобно игнорируют тот факт, что наиболее вероятный случай, когда это вообще сработает для них, — это очень удобная модель согласованности памяти на X86 (-64). Излишне говорить, что существует множество предположений, которые должны быть правильными, и попытки написать такой код — буквально минное поле …
— Эмили Л.
Показать 4 больше комментариев