Производитель-потребитель C ++ с использованием буфера блокировки

Я работаю над упражнением по программированию (университет, ничего связанного с промышленностью), в котором в основном предлагается реализовать буфер, который будет использоваться двумя потоками (производителем и потребителем). Первый ставит в очередь вызов данных next(T t), в то время как другой получает самое старое значение (в механизме FIFO), вызывая consume() или ждет, если буфер пуст. Производитель может отправить сигнал остановки, чтобы объявить о завершении постановки в очередь. Текст также требует fail() на случай, если что-то пойдет не так, но я хотел бы проигнорировать его в этом вопросе. Это мое решение

template <typename T>
class Buffer {
    std::mutex m;
    std::condition_variable cv;
    std::queue<T> values;
    bool stop, failed;
    std::exception_ptr _eptr;

public:
    Buffer() : stop(false), failed(false) {}

    void fail (const std::exception_ptr &eptr){
        { 
          std::unique_lock ul{m};
          failed = true;
          _eptr = eptr;
        }
        cv.notify_all();
    }

    void terminate(){
        {
          std::unique_lock ul {m};
          if (stop || failed ) throw std::runtime_error("enqueing has stopped");
          stop = true;
        }
        cv.notify_one(); // notify stop signal
    }

    void next(T t) {
        {
        std::unique_lock ul{m};
        if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
        values.push
        }
        cv.notify_one(); // notify the consumer (if waiting)
    }

    std::optional<T> consume(){
        std::unique_lock ul{m};
        cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
        if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
            if (stop)
                return std::nullopt;
            else
                std::rethrow_exception(_eptr);
        }
        // extract the value to consume
        T val = values.front();
        values.pop();
        return std::optional<T>(val);
    }
};

Вот как, я думаю, можно использовать буфер (я все еще игнорирую fail() метод)

#define N 10000

  Buffer<int> buf;

    std::thread prod([&buf](){
        for(int i = 0 ; i < N; ++i) {
            std::cout << "enqueing: " << i << std::endl;
            buf.next(i);
        }

        buf.terminate();
    });

    std::thread cons([&buf](){
        for(int i = 0; i < N; ++i)
            std::cout << "consuming: " << buf.consume().value() << std::endl;
    });

    prod.join();
    cons.join();

У меня есть вопросы:

  • Вы согласны, что это не что иное, как очередь на блокировку, или я что-то упустил?

  • мне нужно реализовать деструктор? Если это так, не могли бы вы показать мне пример использования, для которого он нужен?

  • Что произойдет, если объект выйдет из области видимости и никто не позвонит terminate() ? Стоит ли мне позаботиться об этой проблеме? В любом случае это Buffer проблема или программист, использующий этот класс, должен заботиться об этом? Не могли бы вы показать мне пример, когда это происходит (я думал о том, что потоки будут отсоединены, а не объединены, это подходит?)?

1 ответ
1

Ага. Похоже на очередь. Может, вся многопоточность действительно очередь? … философски гладит бороду


Именование:

  • terminate() вероятно следует назвать finished() или же done() или что-то подобное. (Чтобы дистанцироваться от std::terminate). stop() также было бы неправильно, поскольку это означает, что мы говорим потокам прекратить обработку независимо от того, есть ли еще ввод. На самом деле мы просто говорим о том, что больше не нужно вводить данные для обработки.
  • next() должен называться push()… потому что это то, что он делает.
  • consume() было бы лучше назвать что-то вроде wait_and_pop().

void terminate(){
    {
      std::unique_lock ul {m};
      if (stop || failed ) throw std::runtime_error("enqueing has stopped");
      stop = true;
    }
    cv.notify_one(); // notify stop signal
}

Я не думаю, что нам нужно выдавать ошибку, если очередь уже остановлена ​​(или не работает). Мы просто ничего не можем сделать и вернуться. (Подобно вызову close для уже закрытого файла. Это нормально, просто нечего делать). Тем более, что у нас нет возможности проверить, остановлена ​​ли очередь!

(Возможно, стоит добавить bool is_stopped() const. Обратите внимание, что выполнение этой функции const означало бы сделать мьютекс mutable переменная (что нормально)).

Мы должны позвонить notify_all вместо уведомления. Предположительно, мы хотим, чтобы все потоки сразу же перестали ждать.

Мы могли бы использовать std::lock_guard вместо std::unique_lock (нам не нужны дополнительные функции std::unique_lock).


void next(T t) {
    {
    std::unique_lock ul{m};
    if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
    values.push
    }
    cv.notify_one(); // notify the consumer (if waiting)
}

Мы могли бы использовать std::move при помещении значения в очередь: values.push(std::move

Опять же, мы могли бы использовать std::lock_guard вместо std::unique_lock.


std::optional<T> consume(){
    std::unique_lock ul{m};
    cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
    if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
        if (stop)
            return std::nullopt;
        else
            std::rethrow_exception(_eptr);
    }
    // extract the value to consume
    T val = values.front();
    values.pop();
    return std::optional<T>(val);
}

Здесь нам действительно нужно std::unique_lock. 🙂

Снова мы можем сделать T val = std::move(values.front());

Я не уверен, что логика здесь правильная. Если failed установлен флаг, нам, вероятно, нужно что-то с этим сделать, даже если очередь не пустой? (Я не знаю, что говорит ваша спецификация).


Разрушение:

Ага, нам нужен деструктор. Это вполне возможно для Buffer быть уничтоженным, пока потребительские потоки все еще пытаются читать из него. Надуманный пример:

auto consumers = std::vector<std::thread>();

{
    auto buf = Buffer<int>();

    for (auto i = 0; i != 5; ++i)
        consumers.emplace_back([&] () { while (true) { auto value = buf.consume(); if (!value) return; std::cout << value.value(); } });

    for (auto i = 0; i != 5000; ++i)
        buf.next(i);
    
    buf.terminate();

} // buf goes out of scope here! but we don't know that consumers have finished consuming!

for (auto& c : consumers)
    c.join();

У нас есть два варианта:

  • Сделайте это очевидной программной ошибкой (распечатайте сообщение об ошибке на std::cerr и позвони std::terminate при необходимости (когда failed не установлено, или когда stop установлен, но очередь не пуста).
  • Сделайте его менее опасным (предположим, что ввод завершен, и мы хотим завершить его обработку, поэтому установите stop в деструкторе, а затем дождитесь, пока очередь не станет пустой).

Я думаю, что второй вариант лучше - в зависимости от того, закончили ли потоки работу, так как условие ошибки может закончиться вызовом std::terminate совершенно случайно.

  • Я не понимаю, почему, используя ваш пример, программа выходит с 0 и ожидаемым результатом, даже если буфер вышел за рамки. Примечание: я заменил while(true) с for повторяя N связей, я представил только 1 потребителя и 1 производителя.

    - Это


  • Это неопределенное поведение, поэтому оно может работать, а может и нет. Может быть, потребитель быстро закончил работу, возможно, потребитель все еще обращается к памяти, в которой есть правильные значения, даже если буфер уничтожен ... и т. Д.

    - user673679

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *