Безблокирующий кольцевой буфер с одним производителем и одним потребителем

Я пишу простой кольцевой буфер для собственного обучения. Ниже приведен пример стратегии, описанной в http://www.cse.cuhk.edu.hk/~pclee/www/pubs/ancs09poster.pdf : Производитель и Потребитель хранят локальные копии индексов записи и чтения в разных строках кэша и, насколько это возможно, избегают касания общих версий тех же самых.

Производительность кажется на уровне очереди boost spspc. Любые указания о том, как это улучшить, приветствуются. Я использую volatile-переменные, а не std :: atomic, потому что по непонятным мне причинам производительность лучше с volatile.

Любое понимание там было бы очень кстати. Я понимаю, что без std :: atomic код может работать только на x86_64.

#include <array>
#include <atomic>
#include <limits>
#include <uchar.h>

// Single Producer Single Consumer ring buffer.
// based on http://www.cse.cuhk.edu.hk/~pclee/www/pubs/ancs09poster.pdf

template <class T, size_t MAX_SZ = 10>
class RingBuffer {

   static const size_t cache_line = 64;

 public:
   RingBuffer()
       : // Shared control variables
         shared_r(0), shared_w(0),
         // Consumer state
         consumer_w(0), consumer_r(0),
         // Producer state
         producer_r(0), producer_w(0), uncommited_writes(0) {}

   // Called only by the single producer thread
   // -----------------------------------------
   template <class... ARG>
   bool emplace_enqueue_one(ARG &&... arg) {
      auto result = emplace_enqueue_batch(std::forward<ARG>(arg)...);
      commit_writes();
      return result;
   }

   template <class... ARG>
   bool emplace_enqueue_batch(ARG &&... arg) {

      // Where would the write position be after we enqueue this element?
      size_t next_w = calc_next(producer_w);

      // We always keep an empty slot between the read and write
      // positions, rather than fill our entire buffer. We do this to
      // be able to distinguish between empty (w == r) and full
      // (next(w) == r) buffers. Since we are consulting the
      // producer's copy of the shared read position (producer_r), not
      // the actual read position (shared_r), we might get a false
      // positive (that is we might think we are full when we are not)
      // but not a false negative (that is we think the queue is not
      // full we are right)
      if (next_w == producer_r) {
         // At this point we might be full. To be sure we need to do
         // the more expensive read of the shared read position
         // variable
         size_t actual_r = get_shared_r();
         if (next_w == actual_r) {
            // We are indeed full. At this point we might have to
            // force a commit so that the consumer can see (and drain)
            // uncommited writes.
            commit_writes();
            return false;
         } else
            // We are not actually full, update our local copy of the
            // read position and carry on.
            producer_r = actual_r;
      }

      // Enqueue
      new (&buffer[producer_w]) T(std::forward<ARG>(arg)...);

      // Update our copy of the write position but do not actually
      // update the shared write position. We leave it up to the
      // caller as to when the writes should be visible to the
      // consumer. This allows the caller to amortize the expensive
      // update fo the shared_w variable over multiple writes.
      producer_w = next_w;
      uncommited_writes++;
      return true;
   }

   void commit_writes() {
      if (uncommited_writes) {
         uncommited_writes = 0;
         set_shared_w(producer_w);
      }
   }

   // Called only by the single consumer thread
   // -----------------------------------------
   template <class C>
   size_t consume_one(C &&c) {
      return consume_(std::forward<C>(c), 1);
   }

   template <class C>
   size_t consume_all(C &&c) {
      return consume_(std::forward<C>(c), std::numeric_limits<size_t>::max());
   }

 private:
   template <class C>
   size_t consume_(C c, size_t max_consume_count) {
      size_t consumed_count = 0;
      while (consumed_count < max_consume_count) {
         // Could we be empty?
         if (consumer_w == consumer_r) {
            // We could, but to be sure we have to do the expensive
            // read of the shared write position.
            size_t actual_w = get_shared_w();
            if (consumer_r == actual_w) {
               // We are actually empty. If we managed to read
               // anything so far then update the shared read
               // position.
               if (consumed_count)
                  set_shared_r(consumer_r);
               return consumed_count;
            } else
               // We were not actually empty. Update our copy of the
               // write position. We will do the read below.
               consumer_w = actual_w;
         }
         consumed_count++;
         c(buffer[consumer_r]);
         buffer[consumer_r].~T();
         consumer_r = calc_next(consumer_r);
      }
      // If we reach this point that means we were able to consume
      // max_consume_count items, so we need to update the shared_r
      // position.
      set_shared_r(consumer_r);
      return consumed_count;
   }
   size_t calc_next(size_t p) const {
      if (p < (MAX_SZ - 1))
         return p + 1;
      else
         return 0;
   }

   size_t get_shared_r() { return shared_r; }
   void set_shared_r(size_t r) { shared_r = r; }
   size_t get_shared_w() { return shared_w; }
   void set_shared_w(size_t w) { shared_w = w; }

   // cacheline 1 : shared control variables
   // read position is known to be larger or equal than this
   volatile size_t shared_r;
   // write position is known to be larger or equal than this
   volatile size_t shared_w;
   char padding1[cache_line - 2 * sizeof(size_t)];

   // cacheline 2: consumer state
   size_t consumer_w; // last known write position (to the consumer)
   size_t consumer_r; // current consumer read position
   char padding2[cache_line - 2 * sizeof(size_t)];

   // cacheline 3: producer state
   size_t producer_r;        // last known read position (to the producer)
   size_t producer_w;        // current producer write position
   size_t uncommited_writes; // how far ahead is producer_w from shared_w
   char padding3[cache_line - 3 * sizeof(size_t)];

   // cache line 5: start of actual buffer
   std::array<T, MAX_SZ> buffer;
};

РЕДАКТИРОВАТЬ: В ответ на отзывы я изменил приведенный выше код, чтобы использовать атомики следующим образом:

   size_t get_shared_r() { return shared_r.load(std::memory_order_acquire); }
   void set_shared_r(size_t r) { shared_r.store(r, std::memory_order_release); }
   size_t get_shared_w() { return shared_w.load(std::memory_order_acquire); }
   void set_shared_w(size_t w) { shared_w.store(w, std::memory_order_release); }

Я считаю, что мне не нужно ничего более сильного, чем это (хотя я могу ошибаться – я новичок в подобном программировании). Однако я понимаю, и я подтвердил это, проверив сгенерированный ассемблер, что в x86_64 загрузки всегда получают, а хранилища всегда освобождают. Таким образом, я ожидал, что производительность будет такой же, как и в случае нестабильного. К сожалению, это не так, атомарная версия примерно в два раза медленнее, чем изменчивая. Я ломаю голову над этим.

2 ответа
2

Производительность с volatile выше, потому что это небезопасно, поскольку оно не создает барьер памяти и, следовательно, имеет гонку данных и не должно использоваться в многопоточном контексте.

Таким образом, код нарушен по прямому назначению и может рассматриваться как тема для этого сайта. Однако я отвечу, так как это общая ошибка, лежащая в основе многих ошибок:

volatile не означает потокобезопасный или атомарный!

Безопасность потоков включает в себя множество аспектов, таких как обеспечение того, чтобы ядра ЦП видели согласованное представление об основной памяти (например, отключение или обновление строк кэша между ядрами) и обеспечение того, чтобы операции с памятью выполнялись вообще (вы были бы удивлены, насколько компиляторы могут оптимизировать, особенно лязг) и в правильном порядке и атомарно. Из этих свойств volatile только гарантирует порядок и доступность. Для остального вам нужны барьеры памяти и т. Д.

Только изменчивость означает, что компилятор будет передавать все операции чтения и записи в переменные в том виде, в каком они записаны, таким образом игнорируя возможности оптимизации и не переупорядочивая операции в энергозависимой памяти.

Единственная причина использовать volatile – это когда вы взаимодействуете с оборудованием, таким как регистры микроконтроллера или драйверы устройств.

Я повторяю, volatile почти всегда неверно И / ИЛИ медленно. Если вы не пишете драйверы или прошивки, забудьте, что они когда-либо существовали.

  • Я согласен с тем, что «volatile не означает потокобезопасность или атомарность!» и что большинство случаев использования volatile a обычно неверно, но я думаю, что volatile полезен в немного большем количестве контекстов, чем взаимодействие с оборудованием. любая операция с памятью, которая не требует аппаратной синхронизации, но непредсказуема для компилятора, должна быть изменчивой. это также необходимо при использовании обработчика сигналов и с некоторыми схемами отображения памяти (например, несколько страниц виртуальной памяти с использованием одной и той же базовой страницы в физической памяти) и в других ситуациях, о которых я не думал.

    – Тайкер


  • Я знал, что использование volatile было подозрительным, но я видел в статье, на которую я ссылался, и тестирование / профилирование было многообещающим. Я не на 100% уверен, что это неверно для конкретной платформы, которая меня интересует (x86_64), которая имеет действительно сильную модель памяти, но я обязательно установлю это, прежде чем помещать это где-нибудь в производство. Спасибо за вклад.

    – samwise

  • @Tyker Если бы вам когда-либо приходилось программировать драйвер устройства для машины с DEC Alpha, вы бы знали, что volatile бесполезен даже для взаимодействия с оборудованием. Он все еще используется, но я думаю, что почти во всех случаях использование атомарных переменных лучше. Если вы знаете, когда безопасно использовать volatile то вы, вероятно, сможете получить ту же производительность с атомиксом, если пройдете правильную порядок памяти параметр для load() и store().

    – Г. Сон

  • @samwise Я прочитал газету и у меня возникли подозрения. Они не объяснили, почему это будет правильно, и не упомянули о проверке или проверке правильности в своей оценке. Они удобно опускают упоминание о том, что им пришлось использовать сходство потоков для достижения наиболее заметного результата, когда кеш L2 не используется совместно, выгода, по-видимому, заключается только в пакетной обработке …

    – Эмили Л.

  • Далее они упоминают, что требуется, чтобы считывание управляющих переменных было неделимым, и заявляют, что это обычно так, без примеров. К тому же невыровненное чтение / запись на x86, например, не является атомарным, есть множество особых сценариев, которые необходимо учитывать. Они также удобно игнорируют тот факт, что наиболее вероятный случай, когда это вообще сработает для них, – это очень удобная модель согласованности памяти на X86 (-64). Излишне говорить, что существует множество предположений, которые должны быть правильными, и попытки написать такой код – буквально минное поле …

    – Эмили Л.


std::size_t постоянно пишется с ошибками во всем коде. Кажется, ваша реализация объявляет size_t а также std::size_t (что разрешено), но вы не должны зависеть от этого.

Как говорит Эмили, неправильно использовать volatile где вы должны использовать атомарный тип.

std::array<T, MAX_SZ> buffer;

Если T дорого строить, то это плохой выбор. Вам нужно будет выделить неинициализированную память и построить на месте как std::vector делает. На самом деле логика здесь повсюду, потому что я вижу, что вы строите на месте, но перезаписать существующие объекты не разрушая их. Это полностью неправильно и опасно.

template <class T, std::size_t MAX_SZ = 10>

Используйте именование ALL_CAPS только для макросов. Использование для корректных идентификаторов C ++ разбавляет предупреждающее сообщение, передаваемое заглавными буквами.

   // cacheline 2: consumer state
   std::size_t consumer_w; // last known write position (to the consumer)
   std::size_t consumer_r; // current consumer read position
   char padding2[cache_line - 2 * sizeof(std::size_t)];

Вместо того, чтобы считать размер переменных в строке кэша, проще использовать анонимное объединение:

// cacheline 2: consumer state
union {
    char padding2[cache_line];
    struct {
        std::size_t w = 0; // last known write position (to the consumer)
        std::size_t r = 0; // current consumer read position
    } consumer;
};

Аналогично для других блоков, выровненных по кешу. Или рассмотрите возможность явно указать выравнивание с помощью alignas().

Не знаю, почему у нас есть эти закрытые члены для простых выражений:

   std::size_t get_shared_r();
   void set_shared_r(std::size_t r);
   std::size_t get_shared_w();
   void set_shared_w(std::size_t w);

Просто напишите выражения прямо в коде, а не загромождайте класс этими функциями.

  • Спасибо за ваш вклад. Я согласен с неправильным использованием std :: array , и мне нравится ваш трюк с объединением. Маленькие помощники использовались, чтобы я мог переключаться на атомикс и обратно во время профилирования.

    – samwise

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *