Я работаю над упражнением по программированию (университет, ничего связанного с промышленностью), в котором в основном предлагается реализовать буфер, который будет использоваться двумя потоками (производителем и потребителем). Первый ставит в очередь вызов данных next(T t)
, в то время как другой получает самое старое значение (в механизме FIFO), вызывая consume()
или ждет, если буфер пуст. Производитель может отправить сигнал остановки, чтобы объявить о завершении постановки в очередь. Текст также требует fail()
на случай, если что-то пойдет не так, но я хотел бы проигнорировать его в этом вопросе. Это мое решение
template <typename T>
class Buffer {
std::mutex m;
std::condition_variable cv;
std::queue<T> values;
bool stop, failed;
std::exception_ptr _eptr;
public:
Buffer() : stop(false), failed(false) {}
void fail (const std::exception_ptr &eptr){
{
std::unique_lock ul{m};
failed = true;
_eptr = eptr;
}
cv.notify_all();
}
void terminate(){
{
std::unique_lock ul {m};
if (stop || failed ) throw std::runtime_error("enqueing has stopped");
stop = true;
}
cv.notify_one(); // notify stop signal
}
void next(T t) {
{
std::unique_lock ul{m};
if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
values.push
}
cv.notify_one(); // notify the consumer (if waiting)
}
std::optional<T> consume(){
std::unique_lock ul{m};
cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
if (stop)
return std::nullopt;
else
std::rethrow_exception(_eptr);
}
// extract the value to consume
T val = values.front();
values.pop();
return std::optional<T>(val);
}
};
Вот как, я думаю, можно использовать буфер (я все еще игнорирую fail()
метод)
#define N 10000
Buffer<int> buf;
std::thread prod([&buf](){
for(int i = 0 ; i < N; ++i) {
std::cout << "enqueing: " << i << std::endl;
buf.next(i);
}
buf.terminate();
});
std::thread cons([&buf](){
for(int i = 0; i < N; ++i)
std::cout << "consuming: " << buf.consume().value() << std::endl;
});
prod.join();
cons.join();
У меня есть вопросы:
Вы согласны, что это не что иное, как очередь на блокировку, или я что-то упустил?
мне нужно реализовать деструктор? Если это так, не могли бы вы показать мне пример использования, для которого он нужен?
Что произойдет, если объект выйдет из области видимости и никто не позвонит
terminate()
? Стоит ли мне позаботиться об этой проблеме? В любом случае этоBuffer
проблема или программист, использующий этот класс, должен заботиться об этом? Не могли бы вы показать мне пример, когда это происходит (я думал о том, что потоки будут отсоединены, а не объединены, это подходит?)?
1 ответ
Ага. Похоже на очередь. Может, вся многопоточность действительно очередь? … философски гладит бороду
Именование:
terminate()
вероятно следует назватьfinished()
или жеdone()
или что-то подобное. (Чтобы дистанцироваться отstd::terminate
).stop()
также было бы неправильно, поскольку это означает, что мы говорим потокам прекратить обработку независимо от того, есть ли еще ввод. На самом деле мы просто говорим о том, что больше не нужно вводить данные для обработки.next()
должен называтьсяpush()
… потому что это то, что он делает.consume()
было бы лучше назвать что-то вродеwait_and_pop()
.
void terminate(){
{
std::unique_lock ul {m};
if (stop || failed ) throw std::runtime_error("enqueing has stopped");
stop = true;
}
cv.notify_one(); // notify stop signal
}
Я не думаю, что нам нужно выдавать ошибку, если очередь уже остановлена (или не работает). Мы просто ничего не можем сделать и вернуться. (Подобно вызову close для уже закрытого файла. Это нормально, просто нечего делать). Тем более, что у нас нет возможности проверить, остановлена ли очередь!
(Возможно, стоит добавить bool is_stopped() const
. Обратите внимание, что выполнение этой функции const
означало бы сделать мьютекс mutable
переменная (что нормально)).
Мы должны позвонить notify_all
вместо уведомления. Предположительно, мы хотим, чтобы все потоки сразу же перестали ждать.
Мы могли бы использовать std::lock_guard
вместо std::unique_lock
(нам не нужны дополнительные функции std::unique_lock
).
void next(T t) {
{
std::unique_lock ul{m};
if ( stop || failed ) throw std::runtime_error ("enqueing has stopped");
values.push
}
cv.notify_one(); // notify the consumer (if waiting)
}
Мы могли бы использовать std::move
при помещении значения в очередь: values.push(std::move
Опять же, мы могли бы использовать std::lock_guard
вместо std::unique_lock
.
std::optional<T> consume(){
std::unique_lock ul{m};
cv.wait(ul, [this]() { return !values.empty() || stop || failed; });
if (values.empty()) { // if got notified and the queue is empty, then stop or failed have been sent
if (stop)
return std::nullopt;
else
std::rethrow_exception(_eptr);
}
// extract the value to consume
T val = values.front();
values.pop();
return std::optional<T>(val);
}
Здесь нам действительно нужно std::unique_lock
. 🙂
Снова мы можем сделать T val = std::move(values.front());
Я не уверен, что логика здесь правильная. Если failed
установлен флаг, нам, вероятно, нужно что-то с этим сделать, даже если очередь не пустой? (Я не знаю, что говорит ваша спецификация).
Разрушение:
Ага, нам нужен деструктор. Это вполне возможно для Buffer
быть уничтоженным, пока потребительские потоки все еще пытаются читать из него. Надуманный пример:
auto consumers = std::vector<std::thread>();
{
auto buf = Buffer<int>();
for (auto i = 0; i != 5; ++i)
consumers.emplace_back([&] () { while (true) { auto value = buf.consume(); if (!value) return; std::cout << value.value(); } });
for (auto i = 0; i != 5000; ++i)
buf.next(i);
buf.terminate();
} // buf goes out of scope here! but we don't know that consumers have finished consuming!
for (auto& c : consumers)
c.join();
У нас есть два варианта:
- Сделайте это очевидной программной ошибкой (распечатайте сообщение об ошибке на
std::cerr
и позвониstd::terminate
при необходимости (когдаfailed
не установлено, или когдаstop
установлен, но очередь не пуста). - Сделайте его менее опасным (предположим, что ввод завершен, и мы хотим завершить его обработку, поэтому установите
stop
в деструкторе, а затем дождитесь, пока очередь не станет пустой).
Я думаю, что второй вариант лучше - в зависимости от того, закончили ли потоки работу, так как условие ошибки может закончиться вызовом std::terminate
совершенно случайно.
Я не понимаю, почему, используя ваш пример, программа выходит с 0 и ожидаемым результатом, даже если буфер вышел за рамки. Примечание: я заменил
while(true)
сfor
повторяя N связей, я представил только 1 потребителя и 1 производителя.- Это
Это неопределенное поведение, поэтому оно может работать, а может и нет. Может быть, потребитель быстро закончил работу, возможно, потребитель все еще обращается к памяти, в которой есть правильные значения, даже если буфер уничтожен ... и т. Д.
- user673679