C ++ без блокировок, кольцевой буфер MPMC в C ++ 20

У меня есть критический для производительности код межпоточного обмена сообщениями на C ++. Несколько производителей, один или несколько потребителей.

Профилируя десятки итераций этого кода обмена сообщениями за несколько лет разработки, я обычно борюсь с конфликтом мьютексов. Если это не прямой мьютекс для std :: queue, то мьютекс кучи для данных, на которые указывают указатели в любом concurrent_queue, который я использую. Если я использую большие (~ 256 байт) структуры для хранения данных для потребителей, чтобы избежать или минимизировать использование кучи, я часто получаю привязку memcopy из назначения перемещения.

Пытаясь решить все свои проблемы сразу, я написал кольцевой буфер MPMC без выделения и блокировки. Объекты производятся и потребляются, даже не перемещаясь. Никаких мьютексов. Распределений нет. Копий нет. Только атомарные операции.

Я написал несколько простых тестов и реализовал их в критически важном для производительности коде. Кажется, работает и, кажется, предлагает приличное улучшение производительности в случае, если мой босс заботится. Я сомневаюсь, что это настолько хорошо, насколько я могу это понять.

Я не проверял это пытками. Я не тестировал компиляторы, отличные от MS. Я считаю, что этот код достаточно сложен, и мне действительно стоит попросить указатели, прежде чем углубляться в него.

Мне интересны любые отзывы, которые есть у кого-то.

Чрезвычайно упрощенный пример использования:

  // buffer contains 1 << 8 (256) std::strings
  RingBuffer<std::string, 8> buffer;

  // Producer example
  {
    auto producer = buffer.TryProduce();
    if (!producer)  {... handle if we produce too quickly ...}

    producer->clear(); // We re-use any existing capacity, so there may be a string here.
    producer->push_back('A');
    producer->push_back(.... lots more typically....);

    // Production completes by the destruction of the producer object.
    // As soon as this destructor runs, the std::string can be consumed.
  }
  

  // Consumer example
  {
    auto consumer = buffer.TryConsume();

    if (!consumer) {... handle buffer underrun - ie no instructions in the queue ...}

    if (!consumer->empty() &&
        (*consumer)[0] == 'A')
    {
      // Do something with this instruction that's prefixed with "A".

      // It's safe to read from any other memory in "*consumer" for as long as we have it
      // in scope.
    }

    // When consumer is destroyed, that memory is made available for producers
    // to write to.
  }

Главный заголовочный файл:

#pragma once
///////////////////////////////////////////////////////////////////////////////
//
// Implements an allocation free (after initial construction), lock-free,
// parallel, multi-producer, multi-consumer ring buffer. Any data
// type T is supported, so long as it's default constructible.
//
// The type doesn't even need to be movable / copyable, as the TryProduce
// and TryConsume methods do not force the values to be copied or moved,
// although that is a likely enough case that the TryPushBack and TryPopFront
// helpers are included. Values are typically consumed from the same address 
// they're produced in.
//
// Items added to the end of the buffer and guaranteed to be popped out in
// order, or, more strictly, consumption is guaranteed to commence in the
// same order that production commenced.
//
// Eg (single consumer):
//
//  T 1: |---Produce A--------|   |---Produce B--------|
//  T 2:  |--Produce C -----|   |----produce D------|
//  T 3: idle.................|Consume A|Consume C|.|Consume D|Consume B|
//
//  Or (multiple consumers):
//  T 3: idle.................|Consume A|............|Consume D|
//  T 4: idle..................|Consume C|.............|Consume B|
//
// LGPL - (c) Ashley.Harris@Maptek.com.au.
//
///////////////////////////////////////////////////////////////////////////////

#include <atomic>
#include <memory>

namespace rb
{
  template<class T, uint8_t BitsForItemCount> class RingBuffer
  {
  public:
    static_assert(std::is_default_constructible_v<T>);
    static_assert(BitsForItemCount > 0);
    static constexpr uint32_t ItemCount = 1 << BitsForItemCount;
    enum class State : uint8_t;

    struct Consumer;

    struct Producer;

    RingBuffer();

    // Will attempt to read from the buffer. If the buffer was empty, the
    // Consumer will return false in boolean context. While the Consumer is in
    // scope it is guaranteed that the memory your reading will remain
    // untouched and will not be re-used by any producers.
    Consumer TryConsume();

    // Will attempt to reserve a spot in the buffer for you, and if it
    // succeeded, you can take your time populating it (eg with a complex
    // assignment move operation) in safety knowing that no reader will access
    // it until the Producer goes out of scope.
    //
    // If no space in the buffer was available, the producer will
    // return false in a boolean context.
    //
    // There is no way to cancel production once this method has been called,
    // if this is an issue for you A) Don't call it until you know for sure or
    // B) set up a no-op value of T that your consumers will safely skip over.
    //
    // If you try to cancel by letting an exception escape (e.g. your move
    // assignment operator throws), that is very bad, as the T may be left in
    // an invalid state and then submitted to a consumer. This results in an
    // assertion failure in debug if attempted.
    Producer TryProduce();

    // ------------------

    // Simple helpers - it's quite common to want to use this buffer with a
    // type that implements fast move:

    // Moves Item into the write pointer of the ring buffer, if it'll fit.
    // Returns whether the move occurred.
    bool TryPushBack(T&& Item);

    // Moves an item from the front of the buffer to ItemDestination, if there
    // is one to read. Returns whether the move occurred.
    bool TryPopFront(T& ItemDestination);

    enum class State : uint8_t
    {
      Empty,
      Populating,
      Queued,
      Reading
    };

  private:

    static constexpr uint32_t ItemDiv =
      (uint64_t(uint32_t(-1)) + 1) / ItemCount;

    static_assert(
      (ItemDiv & (ItemDiv - 1)) == 0,
      "ItemDiv should always be a power of 2. Otherwise won't wrap correctly.");

    struct Consumer
    {
      explicit operator bool() const { return myValue; }

      auto& Get() const { return *myValue; }
      auto& Get() { return *myValue; }

      operator const T&() const { return Get(); }
      operator T&() { return Get(); }

      T& operator*() { return Get(); }
      const T& operator*() const { return Get(); }

      T* operator->() { return &Get(); }
      const T* operator->() const { return &Get(); }

      Consumer(const Consumer& DontCopy) = delete;
      Consumer(Consumer&& Move);

      Consumer& operator=(const Consumer& DontCopy) = delete;
      Consumer& operator=(Consumer&& Move);

      void Release();

      ~Consumer() { Release(); }
      Consumer() = default;
      Consumer(T* Data, std::atomic_uint8_t* State)
      : myValue(Data), myState(State){};

    private:

      T* myValue = nullptr;
      std::atomic_uint8_t* myState = nullptr;
    };


    struct Producer
    {
      explicit operator bool() const { return myValue; }

      auto& Get() const { return *myValue; }
      auto& Get() { return *myValue; }

      operator const T&() const { return Get(); }
      operator T&() { return Get(); }

      T& operator*() { return Get(); }
      const T& operator*() const { return Get(); }

      T* operator->() { return &Get(); }
      const T* operator->() const { return &Get(); }

      Producer(const Producer& DontCopy) = delete;
      Producer(Producer&& Move);

      Producer& operator=(const Producer& DontCopy) = delete;
      Producer& operator=(Producer&& Move);

      void Release();

      ~Producer();
      Producer() = default;
      Producer(T* Data, std::atomic_uint8_t* State)
      : myValue(Data), myState(State){};

    private:

      T* myValue = nullptr;
      std::atomic_uint8_t* myState = nullptr;
    };

    std::unique_ptr<T[]> myData;
    std::unique_ptr<std::atomic_uint8_t[]> myStates;

    std::atomic_uint32_t myNextRead = 0;
    std::atomic_uint32_t myNextWrite = 0;
  };

}

Файл реализаций шаблона:

template<class T, uint8_t BC>
  RingBuffer<T, BC>::RingBuffer()
  : myData(std::make_unique<T[]>(ItemCount)),
    myStates(std::make_unique<std::atomic_uint8_t[]>(ItemCount)),
    myNextRead(0),
    myNextWrite(0)
  {
  }

  template<class T, uint8_t BC>
  typename RingBuffer<T, BC>::Consumer RingBuffer<T, BC>::TryConsume()
  {
    uint8_t timeout = 1;
    while (timeout++)
    {
      auto toRead = myNextRead.load(std::memory_order_acquire);
      auto toWrite = myNextWrite.load(std::memory_order_acquire);

      if (toRead == toWrite)
      {
        // Buffer is empty.
        return Consumer(nullptr, nullptr);
      }

      auto readPtr = myData.get() + (toRead / ItemDiv);
      auto statePtr = myStates.get() + (toRead / ItemDiv);

      auto oldState = uint8_t(State::Queued);
      if (statePtr->compare_exchange_strong(
            oldState, uint8_t(State::Reading), std::memory_order_release))
      {
        // We've marked it as reading successfully.

        // Advance the read pointer for the next read. We do it by a large
        // power of two so that it wraps around at the size of the buffer -
        // otherwise we end up having to do a compare exchange if the counter
        // is at end.
        myNextRead.fetch_add(ItemDiv, std::memory_order_release);

        return Consumer(readPtr, statePtr);
      }

      // We were unable to mark the item for "reading" from "queued", that
      // means it was:
      //  - still being populated by a writer.
      //  - given to another consumer on a different thread and the
      //    myNextRead value was incremented by the other thread.
      //
      // Loop back around and try again a few hundred times - otherwise
      // we fail.
    }
    return Consumer(nullptr, nullptr);
  }

  template<class T, uint8_t BC>
  typename RingBuffer<T, BC>::Producer RingBuffer<T, BC>::TryProduce()
  {
    uint8_t timeout = 1;
    while (timeout++)
    {
      auto toRead = myNextRead.load(std::memory_order_acquire);
      auto toWrite = myNextWrite.load(std::memory_order_acquire);

      if (toRead == toWrite + ItemDiv)
      {
        // Buffer is full.
        return Producer(nullptr, nullptr);
      }

      auto writePtr = myData.get() + (toWrite / ItemDiv);
      auto statePtr = myStates.get() + (toWrite / ItemDiv);

      auto oldState = uint8_t(State::Empty);
      if (statePtr->compare_exchange_strong(
            oldState, uint8_t(State::Populating), std::memory_order_release))
      {
        // We've marked it as populating successfully.

        // Advance the write pointer for the next write. We do it by a a
        // large power of two so that it wraps around at the size of the
        // buffer - otherwise we end up having to do a compare exchange if
        // the counter is at end.
        myNextWrite.fetch_add(ItemDiv, std::memory_order_release);

        return Producer(writePtr, statePtr);
      }

      // We were unable to mark the item for "writing" from "empty", that
      // means it was:
      //  - still being read by a reader.
      //  - given to another producer on a different thread and the toWrite
      //    value is about to increment.
      //
      // Loop back around and try again a few hundred times - otherwise
      // we fail.
    }
    return Producer(nullptr, nullptr);
  }


  // Moves Item into the write pointer of the ring buffer, if it'll fit.
  // Returns whether the move occurred.
  template<class T, uint8_t BitsForItemCount>
  inline bool RingBuffer<T, BitsForItemCount>::TryPushBack(T&& Item)
  {
    auto producer = TryProduce();
    if (!producer) return false;
    producer.Get() = std::move(Item);
    return true;
  }

  // Moves an item from the front of the buffer to ItemDestination, if there
  // is one to read. Returns whether the move occurred.
  template<class T, uint8_t BitsForItemCount>
  inline bool RingBuffer<T, BitsForItemCount>::TryPopFront(T& ItemDestination)
  {
    auto consumer = TryConsume();
    if (!consumer) return false;
    ItemDestination = std::move(consumer.Get());
    return true;
  }


  template<class T, uint8_t BitsForItemCount>
  typename RingBuffer<T, BitsForItemCount>::Producer::Producer&
  RingBuffer<T, BitsForItemCount>::Producer::operator=(Producer&& Move)
  {
    Release();
    myValue = Move.myValue;
    myState = Move.myState;
    Move.myValue = nullptr;
    Move.myState = nullptr;
    return *this;
  }


  template<class T, uint8_t BitsForItemCount>
  inline RingBuffer<T, BitsForItemCount>::Producer::Producer(Producer&& Move)
  {
    myValue = Move.myValue;
    myState = Move.myState;
    Move.myValue = nullptr;
    Move.myState = nullptr;
  }
  template<class T, uint8_t BitsForItemCount>
  inline void RingBuffer<T, BitsForItemCount>::Producer::Release()
  {
    if (myValue)
    {
      myState->store(uint8_t(State::Queued), std::memory_order_release);
      myValue = nullptr;
    }
  }
  template<class T, uint8_t BitsForItemCount>
  inline RingBuffer<T, BitsForItemCount>::Producer::~Producer()
  {

    if (dbgN::IsDebugFull() && myValue && std::uncaught_exception())
    {
      ASSERTF_UNREACHABLE(R"(
Exception thrown during a buffer locked for production. Did a move constructor
throw? Why would you do that? This will result in partial data being
transmitted into the buffer and sent to consumers, which will probably
cause issues. (no - we can't rewind the buffer, other producers may of already
started on the next element and we can't break ordering guarentees) Don't use
exceptions to leave the scope! if you really love exceptions and can't do
without them for this tiny region of performance sensitive code - Catch, write
a no-op to the buffer that your consumers will skip over safely, Release(),
and then rethrow)");
    }

    Release();
  }
  template<class T, uint8_t BitsForItemCount>
  inline RingBuffer<T, BitsForItemCount>::Consumer::Consumer(Consumer&& Move)
  {
    myValue = Move.myValue;
    myState = Move.myState;
    Move.myValue = nullptr;
    Move.myState = nullptr;
  }

  template<class T, uint8_t BitsForItemCount>
  typename RingBuffer<T, BitsForItemCount>::Consumer&
  RingBuffer<T, BitsForItemCount>::Consumer::operator=(Consumer&& Move)
  {
    Release();
    myValue = Move.myValue;
    myState = Move.myState;
    Move.myValue = nullptr;
    Move.myState = nullptr;
  }

  template<class T, uint8_t BitsForItemCount>
  inline void RingBuffer<T, BitsForItemCount>::Consumer::Release()
  {
    if (myValue)
    {
      myState->store(uint8_t(State::Empty), std::memory_order_release);
      myValue = nullptr;
    }
  }

3 ответа
3

Я в основном собираюсь сыграть здесь защитника дьявола.

Без блокировки не значит быстро

Существует довольно стойкое заблуждение, что алгоритмы без блокировки быстрее, чем алгоритмы с блокировкой. Однако это может быть неправдой. Современные реализации мьютексов чрезвычайно быстры в неконкурентном случае, а при большом разногласии они используют системный вызов, который позволяет ядру ждать разблокировки мьютекса. Системный вызов определенно имеет много накладных расходов, но ваше решение — выполнить 255 повторений. Атомарные операции не бесплатны, поэтому в рассматриваемом случае с множеством потоков, пытающихся получить доступ, это может привести к потере много времени ЦП.

Вам действительно стоит попытаться доказать свою теорию о том, что реализация без блокировок быстрее, чем реализация с использованием мьютексов, путем выполнения тестов.

Добавлять Produce() а также Consume() функции

Если кольцевой буфер оспаривается, вы раскручиваете до 255 раз, повторяя операцию, прежде чем отказаться, а затем просто передаете проблему вызывающей стороне. Вызывающий не хочет иметь дело с этим и неизменно будет делать что-то неоптимальное, например, звонить std::this_thread::yield() или же std::this_thread::sleep_for(some_random_timeout). По сути, он либо будет ждать слишком короткое время, тратя больше циклов ЦП, либо будет ждать слишком долго, что приведет к недостаточной загрузке ЦП. Постарайтесь решить эту проблему, а затем внедрите решение в class RingBuffer.

Возможные решения вращаются бесконечно, но с использованием некоторого экспоненциального отката (это может быть сделано даже без потери циклов процессора, если ваш процессор поддерживает что-то вроде умвайт инструкция) или использовать условную переменную для ожидания после многократного вращения. Со временем было предпринято много усилий, чтобы найти наиболее оптимальный способ блокировки, см., Например, эту статью LWN о спин-блокировки билетов.

О State::Populating

Ваш дизайн позволяет производителю зарезервировать запись в кольцевом буфере, затем заполнить ее на досуге, а затем получить гарантию, что он может немедленно перенести эту запись в State::Queued. Возможно, это хорошо для производителей, но не так приятно для потребителей. Учтите, что у нас есть два производителя, A и B, и A называется TryProduce() первый. Теперь в кольцевом буфере есть две записи в состоянии State::Populating. Но теперь предположим, что B завершает заполнение своей записи намного быстрее, чем A. Потребители, к сожалению, не могут ничего сделать с записью B, им приходится ждать, пока A завершит заполнение, прежде чем они смогут продолжить. Это означает, что пропускная способность теперь ограничена производителем, который заполняет свои записи медленнее всего. Если у вас много производителей и время, необходимое для заполнения, сильно различается, это может оказаться медленнее, чем если бы производители просто выделяли память, а кольцевой буфер просто сохранял std::unique_ptr<T>с.

Есть альтернативы; вместо выделения каждой записи у каждого производителя может быть заранее выделенный массив, в который он заполняет записи. Вы можете пойти еще дальше, создать кольцевой буфер для каждого производителя и сделать TryConsumer() прозрачно попытаться выбрать элемент от производителя с непустым буфером.

Использовать if с инструкциями инициализации

Вы можете сделать код более кратким, переместив инициализацию переменных в if-высказывания, например:

if (auto consumer = TryConsume(); consumer) {
   ItemDestination = std::move(consumer.Get());
   return true;
} else {
   return false;
}

Передайте размер кольцевого буфера как std::size_t

Вместо того, чтобы пройти uint8_t BitsForItemCountпросто используйте std::size_t ItemCount как параметр размера. Это соответствует пути std::array работает, как и многие другие контейнеры фиксированного размера из сторонних библиотек. У вас уже есть static_assert это предотвратит использование размеров, отличных от POT.

В любом случае также добавьте static_assert ограничить ItemCount к $ 2 ^ {32} $, поскольку myNextRead а также myNextWrite всего 32 бита.

Именование вещей

Имена Consumer а также Producer плохо выбраны; эти классы представляют «элементы» или «элементы» кольцевого буфера. Так что возможно ConsumeItem а также ProduceItem было бы лучше, а может быть Consumable/Producable (хотя последнее тоже звучит странно).

  • 1

    umonitor / umwait могут остановить часы, но это по-прежнему означает, что они не привыкли к другому потоку. Это «не тратит впустую циклы процессора» только в том смысле, что он немного более дружественен к гиперпоточности, чем опрос в pause или же tpause петля. И я думаю, вы можете быть разбужены в магазине на этой линии другим ядром. Но если очередь опустеет, вы можете сообщить об этом ОС, чтобы она могла запланировать что-то еще или погрузиться в более глубокий сон. Зависит от вашего варианта использования, ожидаете ли вы, что производитель всегда будет ставить что-то в очередь очень скоро.

    — Питер Кордес


  • @PeterCordes Это правда, что он не позволяет запускать другой поток, возможно, я должен был сказать, что он не тратит так много энергия. Кроме того, отказ от чтения памяти в замкнутом цикле может также уменьшить трафик шины (будь то фактический доступ к памяти или трафик когерентности кеша). Я не думал о umonitor, это может быть именно то, что вы хотите здесь использовать. К сожалению, если вы хотите писать переносимый код, на это нельзя положиться.

    — Г. Сон


  template<class T, uint8_t BitsForItemCount> class RingBuffer

Это отсутствует

#include <cstdint>
using std::uint8_t

Тем не менее, мне очень не нравятся заголовки, которые заполняют глобальное пространство имен, поэтому я бы предпочел видеть std::uint8_t (и его друзья) написаны полностью. Есть ли причина, по которой нам нужен точно 8-битный тип, или std::uint_fast8_t быть лучшим выбором?

uint8_t timeout = 1;
while (timeout++)

Это похоже на for петля. Я бы предпочел обратный отсчет или, возможно, счет до константы, а не полагаться на переполнение типа для завершения цикла — явное лучше, чем неявное.

Я быстро осмотрел остальные и не заметил ничего, что бросалось бы на меня, кроме множества State переменные — возможно, предполагая, что простой enum : std::uint8_t может быть проще чем enum class для этого типа.

    Не без блокировки

    Вы заменили блокировки на ожидание занятости, но это не освобождает алгоритм от блокировки. В TryConsume, первый потребитель, выигравший гонку за подлежащий чтению элемент, затем отвечает за продвижение указателя чтения. Остальные ваши потребители ждут этого продвижения в режиме занятости — никакого прогресса.

    Чтобы алгоритм был свободным от блокировок, другим потребителям пришлось бы либо украсть элемент у первого потребителя, либо объявить элемент уже потребленным и потребить следующий.

    Проблема ABA

    Несколько потребителей могут захватить одно и то же значение указателя чтения и конкурировать за один и тот же элемент. Идея «блокировки» элемента путем атомарной установки его состояния на Reading после захвата указатель чтения, к сожалению, не работает: элемент может перейти из очереди в режим чтения, из пустой в заполнение и обратно в очередь, и все это до того, как первый потребитель попытается выполнить переход из очереди в режим чтения. В этот момент он преуспевает и возвращает неупорядоченный элемент.

    Видеть Гарантия прогресса без блокировок — даже хорошо спроектированные очереди кольцевых буферов фиксированного размера свободны от блокировок только тогда, когда они не заполняются или не опорожняются. Это не значит, что они не могут быть эффективными в обычном случае. Но похоже, что эта конкретная очередь намного хуже, чем та, с ненужной сериализацией. (Очередь MPMC liblfds также использует порядковый номер для отслеживания состояния каждой записи, что также позволяет избежать проблемы ABA. И да, как только один считыватель запросил слот для чтения, следующий считыватель потребует следующий слот, потенциально завершенный до более раннего чтения)

    — Питер Кордес


    Добавить комментарий

    Ваш адрес email не будет опубликован. Обязательные поля помечены *