Написание потокобезопасной кольцевой очереди на C ++ 17

Я пробовал реализовать потокобезопасную кольцевую очередь на C ++. Я совершенно новичок в перемещении семантики и C ++ 11/14/17 в целом.

#ifndef THREAD_SAFE_RING_QUEUE_HPP_
#define THREAD_SAFE_RING_QUEUE_HPP_

#include <mutex>
#include <optional>
#include <vector>

namespace structure {
class MaximumCapacityReachedError : public std::runtime_error {
 public:
  MaximumCapacityReachedError(std::size_t capacity)
      : runtime_error("Structure instance is full (" +
                      std::to_string(capacity) + ")") {}
};

class EmptyError : public std::runtime_error {
 public:
  EmptyError() : runtime_error("Structure instance is empty") {}
};

template <class T>
class RingQueue {
 public:
  static constexpr std::size_t kDefaultCapacity_ = 10;

  RingQueue(std::size_t = kDefaultCapacity_);

  bool IsEmpty() const;
  std::size_t GetSize() const;
  void Push(T&& element);
  void TryPush(T&& element);
  T& ConsumeNext();
  std::optional<T> TryConsumeNext();
  void Clear();

  std::size_t GetCapacity() const noexcept;
  bool IsFull() const noexcept;

 private:
  mutable std::recursive_mutex mutex_;
  std::size_t capacity_ = 0;
  std::size_t head_ = 0;
  std::size_t tail_ = 0;
  std::vector<T> elements_;
  bool is_full_ = false;
};

template <class T>
inline RingQueue<T>::RingQueue(std::size_t capacity)
    : capacity_(capacity),
      elements_(std::vector<T>(capacity_)),
      is_full_(capacity == 0) {}

template <class T>
inline bool RingQueue<T>::IsEmpty() const {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);
  return capacity_ == 0 || (!is_full_ && head_ == tail_);
}

template <class T>
inline std::size_t RingQueue<T>::GetSize() const {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);

  if (is_full_) {
    return capacity_;
  }

  if (tail_ >= head_) {
    return tail_ - head_;
  }

  return capacity_ - (head_ - tail_);
}

template <class T>
inline void RingQueue<T>::Push(T&& element) {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);

  if (IsFull()) {
    throw MaximumCapacityReachedError(capacity_);
  }

  elements_[tail_] = std::forward<T>(element);
  tail_ = (tail_ + 1) % capacity_;
  is_full_ = head_ == tail_;
}

template <class T>
inline void RingQueue<T>::TryPush(T&& element) {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);

  if (IsFull()) {
    return;
  }

  elements_[tail_] = std::forward<T>(element);
  tail_ = (tail_ + 1) % capacity_;
  is_full_ = head_ == tail_;
}

template <class T>
inline T& RingQueue<T>::ConsumeNext() {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);

  if (IsEmpty()) {
    throw EmptyError();
  }

  const auto previous_head = head_;
  head_ = (head_ + 1) % capacity_;
  is_full_ = false;
  return std::forward<T>(elements_[previous_head]);
}

template <class T>
inline std::optional<T> RingQueue<T>::TryConsumeNext() {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);

  if (IsEmpty()) {
    return std::nullopt;
  }

  const auto previous_head = head_;
  head_ = (head_ + 1) % capacity_;
  is_full_ = false;
  return std::forward<T>(elements_[previous_head]);
}

template <class T>
inline void RingQueue<T>::Clear() {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);

  head_ = 0;
  tail_ = 0;
  is_full_ = false;
}

template <class T>
inline std::size_t RingQueue<T>::GetCapacity() const noexcept {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);
  return capacity_;
}

template <class T>
inline bool RingQueue<T>::IsFull() const noexcept {
  std::scoped_lock<std::recursive_mutex> lock(mutex_);
  return is_full_;
}
}  // namespace structure

#endif  // THREAD_SAFE_RING_QUEUE_HPP_

Я протестировал свой код, и кажется, что он работает, но многое может пойти не так …

РЕДАКТИРОВАТЬ: вот тесты, которые я сделал, используя Поймать2. DummyObject это просто класс, содержащий int (значение может быть передано в конструктор и равно 0 по умолчанию).

#include <memory>
#include "catch.hpp"
#include "dummies.hpp"
#include "utils/structure/ring_queue.hpp"

TEST_CASE("Ring queue creation with default capacity", "[structure]") {
  auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>();

  SECTION("Properties after creation with default capacity.") {
    REQUIRE(queue.GetCapacity() == queue.kDefaultCapacity_);
    REQUIRE(queue.GetSize() == 0);
    REQUIRE(!queue.IsFull());
    REQUIRE(queue.IsEmpty());
  }
}

TEST_CASE("Ring queue creation with specific capacity", "[structure]") {
  const std::size_t capacity = 15;

  auto queue =
      structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(capacity);

  SECTION("Properties after creation with specific capacity.") {
    REQUIRE(queue.GetCapacity() == capacity);
    REQUIRE(queue.GetSize() == 0);
    REQUIRE(!queue.IsFull());
    REQUIRE(queue.IsEmpty());
  }
}

TEST_CASE("Ring queue push operations in single thread", "[structure]") {
  const std::size_t capacity = 15;

  SECTION("Properties after pushing one element.") {
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    const auto dummy1 = dummy::DummyObject(0);
    queue.Push(std::make_unique<dummy::DummyObject>(0));

    REQUIRE(queue.GetSize() == 1);
    REQUIRE(!queue.IsFull());
    REQUIRE(!queue.IsEmpty());
  }

  SECTION("Properties after pushing two elements.") {
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    queue.Push(std::make_unique<dummy::DummyObject>(0));
    queue.Push(std::make_unique<dummy::DummyObject>(0));
    REQUIRE(queue.GetSize() == 2);
    REQUIRE(!queue.IsFull());
    REQUIRE(!queue.IsEmpty());
  }

  SECTION("Properties after pushing the maximum amount of elements.") {
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(1);

    queue.Push(std::make_unique<dummy::DummyObject>(0));
    REQUIRE(queue.GetSize() == 1);
    REQUIRE(queue.IsFull());
    REQUIRE(!queue.IsEmpty());
  }

  SECTION("Properties after pushing too many elements.") {
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(1);

    queue.Push(std::make_unique<dummy::DummyObject>(0));

    bool is_exception = false;

    try {
      queue.Push(std::make_unique<dummy::DummyObject>(0));
    } catch (const structure::MaximumCapacityReachedError& error) {
      is_exception = true;
    }

    REQUIRE(is_exception);
    REQUIRE(queue.GetSize() == 1);
    REQUIRE(queue.IsFull());
    REQUIRE(!queue.IsEmpty());
  }

  SECTION("Specific error case: pushing one element in a 0-capacity queue.") {
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(0);

    bool is_exception = false;

    try {
      queue.Push(std::make_unique<dummy::DummyObject>(0));
    } catch (const structure::MaximumCapacityReachedError& error) {
      is_exception = true;
    }

    REQUIRE(is_exception);
  }

  SECTION("Try pushing an element.") {
    std::size_t capacity = 5;
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    for (std::size_t index = 0; index < capacity + 1; index++) {
      queue.TryPush(std::make_unique<dummy::DummyObject>(0));
    }

    REQUIRE(queue.GetSize() == capacity);
  }
}

TEST_CASE("Ring queue consume operations in single thread",
          "[structure]") {
  const std::size_t capacity = 15;

  SECTION("Properties after consuming one element.") {
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    queue.Push(std::make_unique<dummy::DummyObject>(42));
    REQUIRE(queue.ConsumeNext()->GetValue() == 42);
    REQUIRE(queue.GetSize() == 0);
    REQUIRE(!queue.IsFull());
    REQUIRE(queue.IsEmpty());
  }

  SECTION("Properties after consuming two elements.") {
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    queue.Push(std::make_unique<dummy::DummyObject>(42));
    REQUIRE(queue.ConsumeNext()->GetValue() == 42);
    REQUIRE(queue.GetSize() == 0);
    REQUIRE(!queue.IsFull());
    REQUIRE(queue.IsEmpty());
  }

  SECTION("Properties after consuming one element in a two-element queue.") {
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    queue.Push(std::make_unique<dummy::DummyObject>(0));
    queue.Push(std::make_unique<dummy::DummyObject>(0));
    REQUIRE(queue.ConsumeNext());
    REQUIRE(queue.GetSize() == 1);
    REQUIRE(!queue.IsFull());
    REQUIRE(!queue.IsEmpty());
  }

  SECTION("Properties after consuming too many elements.") {
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(1);

    queue.Push(std::make_unique<dummy::DummyObject>(42));
    queue.ConsumeNext();
    bool is_exception = false;

    try {
      queue.ConsumeNext();
    } catch (const structure::EmptyError& error) {
      is_exception = true;
    }

    REQUIRE(is_exception);
    REQUIRE(queue.GetSize() == 0);
    REQUIRE(!queue.IsFull());
    REQUIRE(queue.IsEmpty());
  }

  SECTION("Order is respected.") {
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(3);

    queue.Push(std::make_unique<dummy::DummyObject>(1));
    queue.Push(std::make_unique<dummy::DummyObject>(2));
    queue.Push(std::make_unique<dummy::DummyObject>(3));

    REQUIRE(queue.ConsumeNext().get()->GetValue() == 1);
    REQUIRE(queue.ConsumeNext().get()->GetValue() == 2);
    REQUIRE(queue.ConsumeNext().get()->GetValue() == 3);
  }

  SECTION("Specific error case: consuming one element in a 0-capacity queue.") {
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(0);

    bool is_exception = false;

    try {
      queue.ConsumeNext();
    } catch (const structure::EmptyError& error) {
      is_exception = true;
    }

    REQUIRE(is_exception);
  }

  SECTION("Try consuming an element.") {
    std::size_t capacity = 5;
    auto queue = structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(
        capacity);

    for (std::size_t index = 0; index < capacity; index++) {
      queue.Push(std::make_unique<dummy::DummyObject>(0));
    }

    for (std::size_t index = 0; index < capacity + 1; index++) {
      const auto element = queue.TryConsumeNext();
      const bool is_element = element.has_value();
      const bool is_okay = index < capacity ? is_element : !is_element;
      REQUIRE(is_okay);
    }

    REQUIRE(queue.GetSize() == 0);
  }
}

TEST_CASE("Ring queue pushing operations in multiple threads",
          "[structure]") {
  const std::size_t capacity = 15;
  SECTION("Usage in threads: push operations.") {
    std::size_t thread_number = 5;
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(5);

    std::vector<std::thread> threads;

    for (std::size_t index = 0; index < thread_number; index++) {
      threads.push_back(std::thread([&queue]() {
        queue.Push(std::make_unique<dummy::DummyObject>(0));
      }));
    }

    std::for_each(threads.begin(), threads.end(),
                  [](std::thread& thread) { thread.join(); });

    REQUIRE(queue.GetSize() == thread_number);
  }
}

TEST_CASE("Ring queue consume operations in multiple threads",
          "[structure]") {
  SECTION("Usage in threads: consume operations.") {
    std::size_t thread_number = 5;
    auto queue =
        structure::RingQueue<std::shared_ptr<dummy::DummyObject>>(5);

    std::vector<std::thread> threads;

    for (std::size_t index = 0; index < thread_number; index++) {
      queue.Push(std::make_unique<dummy::DummyObject>(0));
    }

    for (std::size_t index = 0; index < thread_number; index++) {
      threads.push_back(std::thread([&queue]() { queue.ConsumeNext(); }));
    }

    std::for_each(threads.begin(), threads.end(),
                  [](std::thread& thread) { thread.join(); });

    REQUIRE(queue.GetSize() == 0);
  }
}
```

1 ответ
1

ConsumeNext() не должен возвращать ссылку

Проблема с ConsumeNext() в том, что он возвращает ссылку на объект. Однако этот элемент также помечается как потребленный. Таким образом, вызывающий не может безопасно получить доступ к этому элементу, поскольку он может быть перезаписан вызовом Push() из другого потока.

Есть два варианта решения этой проблемы:

  1. Иметь ConsumeNext() возврат по значению, как и TryConsumeNext() делает
  2. Расколоть ConsumeNext() в GetNext() который возвращает ссылку, но не использует, а ConsumeNext() что возвращается void.

Возврат по значению имеет недостаток, заключающийся в том, что T необходимо скопировать, что может оказаться дорогостоящим или невозможным, в зависимости от конкретного типа. Поэтому я рекомендую выбрать вариант 2. Это, кстати, именно то, что STL делает для таких контейнеров, как std::stack.

Подумайте о том, чтобы избежать head == tail двусмысленность

Проблема с указателем головы и хвоста или указателем для кольцевого буфера заключается в том, что когда head == tail, это может быть связано с тем, что кольцевой буфер пуст или полностью заполнен. Вы решили это, добавив дополнительную переменную is_full_, и проверяю это во многих местах.

На мой взгляд, более изящное решение — не иметь tail_ индекс, а скорее size_ переменная, которая отслеживает, сколько элементов фактически используется. Это значительно упрощает код. Конечно, теперь вам нужно получить индекс хвоста в Push() и TryPush(), вот так:

elements_[(head_ + size_++) % capacity_] = std::forward<T>(element);

Бесполезная блокировка GetCapacity()

Вам не нужно брать замок GetCapacity(), так как емкость никогда не может измениться после построения RingQueue. Вы даже можете сделать capacity_ сам по себе const Переменная.

Удалять IsEmpty() и IsFull()

Функции IsEmpty() и IsFull() не возвращайте полезную информацию пользователям вашего класса. К тому времени, когда они вернутся true или же false, это значение может не отражать фактическое состояние RingQueue больше, так как другой поток мог добавлять или удалять элементы из него. Сохраняя их доступными как public функции приглашает ТОКТУ ошибки.

Вы можете оставить их как private функции, но тогда они больше не нуждаются в блокировке, и вы можете изменить mutex_ из std::recursive_mutex к чуть более эффективному обычному std::mutex.

Именование вещей

Я рекомендую вам давать имена функций-членов, которые соответствуют именам контейнеров STL, в частности, тем из std::queue, так что проще пользоваться RingQueues в коде, который также использует STL. Так:

  • Push() -> push() и emplace()
  • ConsumeNext() -> front() и pop()
  • Clear() -> clear()
  • GetCapacity() -> capacity()

Добавить функцию для ожидания возможности нажать / вытолкнуть

Приятно, что эта функция потокобезопасна, но это означает, что вы хотите использовать ее из нескольких потоков. В частности, рассмотрим сценарий производитель-потребитель, когда один поток хочет добавить элементы в очередь, а другой — удалить их. Они могут работать с разной скоростью, поэтому, если очередь заполняется, вы в идеале хотите, чтобы производитель спал до тех пор, пока очередь не заполнится, и, наоборот, если очередь пуста, вы хотите, чтобы потребитель спал до тех пор, пока не появится элемент. был добавлен. Это можно легко реализовать, используя std::condition_variable.

  • Большое спасибо за подробное объяснение! Я внесу изменения и считаю, что соблюдение соглашения об именах STL определенно намного лучше. Я не могу вас отблагодарить!

    — Cloud7001

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *