Многопоточный обратный вызов C ++, может отменить регистрацию

обновление: здесь размещена новая версия этого кода


В этом посте я хотел бы 1) попросить отзыв о приведенном ниже коде в его нынешнем виде:

  • применить все лучшие практики для c ++ 20?
  • это безопасно?
  • мой способ обновить обратный вызов изнутри работающего обратного вызова разумно / лучше всего?

На основе кода и рекомендаций по дизайну, найденных в stackoverflow (например, Вот), Я написал класс, который обрабатывает слушателей, регистрирующих обратные вызовы для получения сообщений в многопоточном коде (слушатели могут быть добавлены из разных потоков, отличных от потока, из которого отправляются сообщения, только один поток каждый отправляет сообщения). Этот код также нуждался в нескольких способах отмены регистрации слушателями:

  1. когда класс, владеющий получателем, разрушает или иным образом завершает получение: следовательно add_listener() возвращает куки в виде std::list::const_iterator (список, потому что итераторы ни для одного из других элементов не становятся недействительными, когда элемент удаляется, а другие файлы cookie, таким образом, остаются действительными)
  2. когда обратный вызов во время выполнения определяет, что ему больше не нужны дальнейшие сообщения, это обрабатывается путем возврата true («удали меня, пожалуйста»).

Теперь я сталкиваюсь с ситуацией, когда обратный вызов слушателя при вызове должен заменить себя другим обратным вызовом. Я не могу позвонить add_listener() поскольку это приведет к тупиковой ситуации при вызове изнутри вызванного обратного вызова. Если я не перейду использовать std::recursive_mutex вместо. Это кажется тяжелым.

Поэтому вместо этого я добавил список замены для вещательной компании, хранящей пару <listenerCookie, listener> в котором перечислены, какие обновления должны быть сделаны после того, как текущее сообщение будет передано всем слушателям. Обновления выполняются в конце notify_all функция. Это гарантирует, что старый обратный вызов больше никогда не будет вызван и что сохраненный файл cookie останется действительным, но теперь указывает на новый обратный вызов. Но этот дизайн кажется сложным и небезопасным: он требовал добавления дополнительной функции-члена к классу вещателя, который должен вызываться только из обратного вызова. И это легко использовать неправильно, если не соблюдаются комментарии кода (и ужасное имя функции: p), в которых говорится, что нужно вызывать только изнутри обратного вызова. Или есть способ гарантировать, что функция может быть вызвана только вызванным обратным вызовом?

Мне не хватает более простого решения?

Более простое, но, к сожалению, невозможное решение, которое я считал, заключалось в том, чтобы обратный вызов возвращал новый обратный вызов для регистрации, который заменил бы текущий зарегистрированный (таким образом, файл cookie остается действительным). Но я не знаю, какой будет тогда подпись функции моего слушателя, поскольку вы не можете сделать что-то вроде CRTP с помощью операторов (using listener = std::function<listener(Message...)>; является недопустимым) (NB: на самом деле, если бы этот вариант был возможен, я бы также вернул вариант с bool, так как мне также нужно поддерживать функцию самодерегистрации.)

Реализация

#include  #include  #include  #include  #include  template  class Broadcaster {public: using listener = std :: function ;  using listenerCookie = typename std :: list  :: const_iterator;  listenerCookie add_listener (слушатель && r_) {auto l = lock ();  вернуть targets.emplace (targets.cend (), std :: move (r_));  } void remove_listener (listenerCookie it_) {авто l = lock ();  remove_listener_impl (это_);  } void notify_all (Сообщение ... msg_) {auto l = lock ();  for (auto it = targets.cbegin (), e = targets.cend (); it! = e;) if ((* it) (msg _...)) it = remove_listener_impl (it);  else ++ это;  if (! update_list.empty ()) {for (auto && [c_it, r] : update_list) {// превращаем cookie в неконстантный итератор auto it = target.erase (c_it, c_it);  // NB, [c_it, c_it) is an empty range, so nothing is removed
                // update callback
                *it = std::move(r);
            }
            update_list.clear();
        }
    }

    // NB: should only be called from running callback!
    void update_callback_from_inside_callback(listenerCookie it_, listener&& r_)
    {
        update_list.emplace_back(it_, std::move(r_));
    }

private:
    auto lock()
    {
        return std::unique_lock<std::mutex>(m);
    }
    listenerCookie remove_listener_impl(listenerCookie it_)
    {
        return targets.erase(it_);
    }

private:
    std::mutex m;
    std::list<listener> targets;

    std::vector<std::pair<listenerCookie, listener>> update_list;
};

Example usage

#include <iostream>
#include <string>
#include <functional>
#include <optional>

struct test
{
    using StringBroadcaster = Broadcaster<std::string>;

    bool simpleCallback(std::string msg_)
    {
        std::cout << "from simpleCallback: " << msg_ << std::endl;
        return false;   // don't remove self: run indefinitely
    }
    bool oneShotCallback(std::string msg_)
    {
        std::cout << "from oneShotCallback: " << msg_ << std::endl;
        return true;    // remove self
    }
    bool twoStepCallback_step1(std::string msg_)
    {
        std::cout << "from twoStepCallback_step1: " << msg_ << std::endl;

        // replace callback (so don't request to delete through return argument!)
        broadcast.update_callback_from_inside_callback(*cb_twostep_cookie, std::bind(&test::twoStepCallback_step2, this, std::placeholders::_1));

        return false;   // don't remove self: we just indicated another callback be placed in our slot: slot needs to remain existent
    }
    bool twoStepCallback_step2(std::string msg_)
    {
        std::cout << "from twoStepCallback_step2: " << msg_ << std::endl;
        return false;   // don't remove self: run indefinitely
    }

    void runExample()
    {
        cb_simple_cookie  = broadcast.add_listener(std::bind(&test::simpleCallback       , this, std::placeholders::_1));
        cb_oneShot_cookie = broadcast.add_listener(std::bind(&test::oneShotCallback      , this, std::placeholders::_1));
        cb_twostep_cookie = broadcast.add_listener(std::bind(&test::twoStepCallback_step1, this, std::placeholders::_1));

        broadcast.notify_all("message 1");  // should be received by simpleCallback, oneShotCallback, twoStepCallback_step1
        // NB: cb_oneShot_cookie is an invalid iterator now, oneShotCallback removed itself through its return argument. clear it
        cb_oneShot_cookie.reset();
        broadcast.notify_all("message 2");  // should be received by simpleCallback and twoStepCallback_step2
        broadcast.remove_listener(*cb_simple_cookie);
        broadcast.notify_all("message 3");  // should be received by twoStepCallback_step2
        broadcast.remove_listener(*cb_twostep_cookie);
        broadcast.notify_all("message 4");  // should be received by none
    }

    StringBroadcaster broadcast;
    std::optional<StringBroadcaster::listenerCookie> cb_simple_cookie;
    std::optional<StringBroadcaster::listenerCookie> cb_oneShot_cookie;
    std::optional<StringBroadcaster::listenerCookie> cb_twostep_cookie;
};

int main(int argc, char **argv)
{
    test t;
    t.runExample();
    return 0;
}

1 Answer
1

Very good design! And very clean code. As-is, there’s very little I can recommend to improve things. In fact, the only code suggestions I can come up with off the top of my head are:

  • You should probably take all arguments in the callbacks (and notify function) by const&. That allows you to pass more complicated (and even non-copyable) types to callback functions.

    This may add some overhead if the compiler doesn’t optimize trivial or built-in types like int to by-val parameters, but the overhead should be so small as to be impossible to detect if the callback functions are really small, because then the callback arguments will all be hot in cache. (And if the callback functions are not really small, then you won’t notice the extra indirection anyway.)

  • You are not considering exceptions, and that’s usually not a major problem if you just write good code—and your code is well-written—but there are some places where it might be a problem.

    What happens if a callback throws? You could end up with surprising behaviour. Nothing will leak, and no UB will be triggered (at least in the code visible), but what will happen is that all remaining callbacks that were supposed to be notified won’t be. That may or may not be a serious problem, depending on the nature of the message.

    What may be more deadly is that callbacks that have replaced themselves will be surprised to discover… they haven’t been replaced. That will be at least surprising… but if the callbacks have already deleted themselves some way, under the assumption that they have replaced themselves and successfully returned, so they’re done, right?… boom.

    So how to fix this? Well, the easiest way is to require the callbacks to be noexcept. Not like the broadcaster is going to handle any failures on their part, right?

    Another way would be to toss the update_list, and instead only keep track of a single optional<pair<listenerCookie, listener>>. If it has_value(), then right after the callback—before either removing anything, or advancing the iterator—you replace the contents of *it then and there. That way, even if a callback throws, at least all previous callbacks that completed successfully and wanted to be replaced are cool.

    (I’ll also offer another option, later, in the design review.)

    That still leaves the issue that if a callback throws, no other callbacks will be notified… which will lead to non-deterministic behaviour in multi-threaded code, because there doesn’t seem to be a way to sort callbacks. In other words, if you start a notify_all(), and a callback throws, you have no way of knowing which callbacks completed and which didn’t. So you can’t even catch the exception, handle/ignore it, then continue notifying.

    You could catch any exceptions thrown by callbacks in a std::exception_ptr, continue notifying, and then, at the end, throw if the exception pointer tests true. But that would only handle a single exception. What if multiple callbacks throw? Tricky! Not impossible to handle, but damn tricky. (You might need an exception type that holds a list of exceptions!) Making callbacks noexcept is a MUCH easier solution. But that’s a design decision you’ll have to wrestle over.

  • You have a serious bug, in that callbacks can deadlock your program if they call any public member functions of the broadcast type.

    The problem is that you are allowing arbitrary code execution while holding a lock within your notify_all() function. Each callback being notified can do… bugger-well anything! There’s nothing to stop one from calling notify_all() again! Or from calling one of the other public member function of the broadcast type.

    Now, you can argue that doing so would be logically stupid, and for the most part that’s true: You already provide means to remove and replace callbacks within the notify loop, so there should never be a need to call remove_listener() from a callback. But a callback might very well reasonably want to add a listener, and there’s no other way to do that other than via add_listener(). It might also be reasonable for a callback to want to trigger a new notification, in which case it will need to call notify_all().

    I can’t tell you the “right” way to fix this problem, because it will depend on your design space. I can offer suggestions.

    1. Within notify_all(), do the lock, COPY THE LIST OF LISTENERS, then release the lock, and start notifying using the copied list. That way, all the callbacks are done without holding the lock. If you need to reacquire the lock—such as to do a remove or replace of a listener—that’s fine.

      This is not a perfect solution, because you could still end up with the situation where a listener has removed itself and returned successfully, then gets called again. (And, of course, you’ve pretty much lost everything you gained by using std::list iterators in the first place.) Fixing that issue will be tricky. (One potential solution would be to require all the callbacks to be std::shared_ptr. That’s an interesting solution in that it allows thread-safe auto-deregistration any time if you only hold a container of std::weak_ptr.)

      However, it would allow you to do the notifies concurrently, which could be a huge gain.

    2. Disallow callbacks to do anything dirty by using std::try_lock(). In each function, replace the lock line with: `auto lock = std::unique_lock{m}; if (auto res = std::try_lock(m); res == -1) { /* do your business */ } else { throw std::logic_error{«recursive lock»}; }

    3. Use a recursive mutex so callbacks won’t actually lock. No, don’t do this. It will only work if the callbacks stay in the same thread (and there’s no way to force that), and even then, it will require herculean efforts to handle race conditions.

There’s one style issue I’d like to critique, then I’d like to discuss your design. So this will be more of an overall design review than a specifically-code review.

Style issue: naming convetions

On the style front, I find your choice to mangle member function arguments… but not data members… a bit odd. Standard practice is to add an underscore to data member names (or to tag them with m_, or something else like that). Why? Because those data members are visible in multiple places (every member function, for example). But member function arguments are only visible in that one, single member function. There doesn’t seem to be any point in mangling them, because no one else will ever see them.

For example, if I see the following member function:

auto class_t::func(int a)
{
    a = 1;
    b = 2;
    _c = 3;
}

… one of the most important things I want to be able to do is see the definitions of those three variables. When I see an undecorated variable name, like a, the first place I look is within the closest scope. Then the next scope, and the next scope, and so on, until I reach function scope, at which point I check the function arguments. In this case, that’s where I’ll find a.

But I won’t find b. So the next step is to check the class scope, for static data members. If I don’t find it there, then I know b is at namespace scope—it’s a “global variable”—and start looking out in the world for it.

All of which is to say is that variables like b are a pain in the ass. Well, if they’re static data members, they’re only mildly annoying for forcing me to leave function I’m investigating and go digging through the class declaration. But if I have to go search the entire friggin’ project for a variable declaration (and even then, never be 100% sure I haven’t missed a declaration in some other namespace that might get picked up first)… yeah, I be peeved. Luckily this isn’t that great a problem in practice.

Which is why variables names like _c (or c_ or m_c; whatever the project convention) are so lovely. When I don’t find the definition locally—and, frankly, I usually don’t even bother to check locally, because, yanno, convention—I know EXACTLY where to find it. I look in the class definition, and, if it’s a const member function, add const, and… done. Simple.

In general, when I see an undecorated name that isn’t local, I know it’s a global variable of some kind—either literally global (that is, at namespace scope) or functionally global (that is, a class static member). In either case, I know without even looking whether this function is one of those functions whose side-effects are going to be a problem. If a function has only undecorated names that are local, and decorated names, then I know at a glance that its effects are all local; either completely internal to the function itself, or completely internal to the object being operated on (that is, *this).

In other words, without even the foggiest clue about the rest of the project, I can look at func() above and know that it has globally-visible side-effects, because of b. I don’t even need to know what b is, or where b is defined, or anything about class_t. Just by the convention, I can tell that this is a function that may cause surprises elsewhere in the code. I can be pretty damn sure that it’s not re-entrant (and thus not thread-safe). On the other hand, if b were removed, then I would be able to assume that func() is safe; that if I call it, it won’t touch anything other than its own internals and *this, which means it’s thread-safe (so long as *this isn’t being shared among threads, in which case, if it were, I would assume it’s being synchronized externally). That’s the power of a good convention.

Now let’s consider your convention:

auto class_t::func(int a_)
{
    a_ = 1;
    b = 2;
    c = 3;
}

Okay, so I see a_, and I know to check the local scope, and eventually find it as a function argument. Cool.

Now what happens with b and c? Where do I even begin looking? Should I assume they are non-static data members? Static data members? Globals? Is this function re-entrant? I mean, if I went hunting around for the definitions of b and c eventually I would find out… but that’s a far cry from being able to know just from a glance.

Design review

First off, I think the idea to take advantage of the invalidation characteristics of std::list’s iterators is brilliant. I admit I rarely think about std::list when reaching for a container (honestly, the only time I think of it is when testing bi-di iterators), but now you’ve inspired me to give it a second look for some design problems I’ve run into.

Now, when I considered your design questions, I have to admit I went down exactly the same trail you did. I first thought, well, let’s use a variant for the return with an optional replacement… oh, no, that won’t work because that would make the function’s type recursive… okay, what if we made the update_callback_from_inside_callback() safer… could use a recursive_mutex, oh you already thought of that…. I ended up stuck exactly where you are.

But then I had one of those face-palming brainstorms, and came up with this (note, code is not tested, probably won’t compile, and is likely broken and incomplete—this is just for illustration of an idea):

struct return_t {};
struct remove_t {};

using replace_t = std::any;

using result_t = std::variant<return_t, remove_t, replace_t>;

template <typename... Args>
class broadcaster
{
    // ... [snip] ... using listener = std :: function ;  // ... [snip] ... void notify_all (Args const & ... args) {auto l = lock ();  for (auto it = _targets.cbegin (); it! = _targets.cend ();) {// "overload" - довольно распространенный тип утилиты - например, // на странице cppreference есть версия при visit () .  // // Пока мы не получим сопоставление с образцом, это самое красивое, что мы можем сделать.  it = std :: visit (overload {// Удалить слушателя.
                    [&_targets, it](remove_t) {return _targets.erase (это);  }, // Заменим слушателя.
                    [&_targets, it](replace_t & r) {auto i = _targets.erase (оно, оно);  * я = std :: move (std :: any_cast  (r));  return ++ it;  }, // По умолчанию: просто продолжаем уведомление.
                    [it](авто &&) {return ++ it;  },}, (* это) (аргументы ...));  }} // ... [snip] ...};

Теперь, как написано, это будет работать (я полагаю!), Но потому что std::any меньше чем std::function в каждом известном мне stdlib он всегда будет запускать выделение. Теперь вам может быть все равно, потому что замена слушателя будет чертовски редкой, так что … Но если ты делать забота, тогда вы могли бы заменить std::any с настраиваемым типом стирания типа, который содержит правильно выровненный массив std::byte то есть sizeof(std::function) большой. Даже не имеет значения который реализация std::function вы используете для определения типа, поэтому вам не нужно знать Args.... На самом деле, а не std::any, вы могли бы просто использовать std::function<void()>, потому что указатели на функции могут быть преобразованы в указатели на другие функции и обратно. Но тогда вы теряете проверку типа any_cast. Как написано, это безопасно по типу, потому что any_cast обнаружит махинации. Если вы хотите, чтобы эта проверка была сделана раньше, например в пределах обратного вызова или даже во время компиляции — есть уловки, которые вы можете использовать, например, создание replace_t тип с частным конструктором и broadcast<Args...> друг, и у него есть make_replace_token(std::function<result_t(Args const&...)>) функция или что-то в этом роде.

Вы даже можете добавить другое поведение —add_t— добавляемый обратный вызов со стиранием типа. (Конечно, так не получится notify_all() в настоящее время написан, потому что вы не можете изменять список во время итерации по нему. Но может работать и с другими вариантами!) Вы даже можете добавить add_multiple_t, который позволяет обратному вызову регистрировать несколько слушателей.

Но, честно говоря, было бы лучше, если бы обратные вызовы могли безопасно модифицировать вещатель, потому что тогда они могли бы добавлять, удалять или даже уведомлять — все без забот. Тогда вам даже не понадобится ни одна из этих гимнастик, потому что обратный звонок может просто позвонить remove_listener() (и у вас может быть replace_listener(), тоже!).

Ключевой момент, о котором следует помнить, заключается в том, что выполнение произвольного кода удерживая замок это как брейк-данс на пиротехническом шоу, залитый бензином.

Лучшее возможное решение — выполнять все обратные вызовы, пока не удерживая замок. Если вы можете сделать эту работу, все будет в выигрыше: обратные вызовы могут добавлять новые обратные вызовы, удалять обратные вызовы (включая самих себя), заменять обратные вызовы, выполнять новые запуски уведомлений … в основном что-нибудь. Сложная часть — заставить эту работу работать.

Большая уловка для выполнения трюка — убедиться, что обратный вызов может знать черт возьми, уверен на 100% если он был удален (или заменен), чтобы он мог знать не могут быть вызванным снова (и, таким образом, безопасно уничтожить). Это не банально. Одно из решений — разделить собственность между вещательной компанией и всем, кто хочет знать, активен ли обратный вызов; с участием std::shared_ptr (используется правильно), нет возможности вызвать или иным образом получить доступ к уже удаленному обратному вызову. Есть и другие, более легкие решения, такие как сохранение атомарного флага «активен» для каждого обратного вызова… но вы должны быть осторожны с проблемами TOCTOU. Как я уже сказал, сложно!

Дополнительно

Проверка во время компиляции для noexcept функции

К сожалению std::function не поддерживает должным образом noexcept функции, начиная с C ++ 20. Это известная проблема, начиная с C ++ 17; Не знаю, почему до сих пор не исправили. (Ну, ладно, я догадываюсь, это потому, что это не легко исправить, и потому что пандемия действительно замедлила работу комитета. Возможно, он просто проскользнул под полосу приоритета.)

Но это не так уж и важно, потому что есть только два места, где вам нужно проверить обратный вызов: add_listener() и update_callback_from_inside_callback(). Везде вы просто использовать обратный вызов, и вы можете поставить noexcept функции в не-noexcept std::function. (Вы просто не можете сделать обратное; вы не можетеnoexcept функционировать в noexcept std::function.)

Итак, вот как вы можете исправить add_listener():

template <class... Message>
class Broadcaster
{
private:
    using listener = std::function<bool(Message...)>; // this no longer needs to be public
public:
    using listenerCookie = typename std::list<listener>::const_iterator;

    // ... [snip] ...

    // if you don't have C++20 concepts, that's fine; the concept is just
    // an extra check, not a necessary one
    template <std::invocable<Message...> F>
    listenerCookie add_listener(F&& r_)
    {
        // this is where the real check happens
        static_assert(std::is_nothrow_invocable_r_v<bool, F, Message...>);

        auto l = lock();

        // it's cool to put a noexcept function in a non-noexcept std::function
        // so we just forward what we're given into a function<bool(Message...)>

        return targets.emplace(targets.cend(), listener{std::forward<F>(r_)});
    }

    // ... [snip] ...

Та же идея для update_callback_from_inside_callback().

Это немного раздувает интерфейс, поэтому вы можете сделать следующее:

    template <std::invocable<Message...> F>
    listenerCookie add_listener(F&& r_)
    {
        static_assert(std::is_nothrow_invocable_r_v<bool, F, Message...>);

        return add_listener_impl_(listener{std::forward<F>(r_)});
    }

    // ... [snip] ...

private:
    listenerCookie add_listener_impl_(listener&& r)
    {
        auto l = lock();
        return targets.emplace(targets.cend(), std::move(r));
    }

    // ... [snip] ...

В любом случае это не повлияет на сайт вызова (за исключением, конечно, если ваш обратный вызов не noexcept, он больше не будет компилироваться). Все это должно работать:

auto foo_1(int, double) noexcept -> bool;

b.add_listener(foo_1);

auto foo_2 = [=](int, double) noexcept -> bool { return true; };

b.add_listener(foo_2);
// or
b.add_listener(std::move(foo_2));

b.add_listener([](int, double) noexcept -> bool { return true; });

struct func
{
    std::string name = {};
    int call_count = 0;

    auto operator()(int, double) noexcept
    {
        std::cout << name << " has been called " << ++call_count << " times";

        return call_count < 100;
    }
};

b.add_listener(func{.name = "steve"});

auto f = func{.name = "anya"};
b.add_listener(std::move(f));
```

  • Будьте осторожны с именами, заканчивающимися на _t — может конфликтовать с именами из заголовков C (зарезервировано для стандартной библиотеки).

    — Тоби Спейт

  • 1

    @TobySpeight AFAIK, это правило POSIX, а не правило C. Только C резервирует (u)int*_t, или что-то вроде того.

    — инди

  • @indi большое спасибо за обширный обзор! Мне понадобится время, чтобы обработать. Один вопрос: следует ли мне добавить обновленный дизайн внизу моего исходного сообщения или опубликовать его в новом вопросе, если мне нравится дальнейший обзор? Идея итераторов std :: list в качестве файлов cookie откуда-то на SO, больше не может отслеживать. Сообщение, на которое я ссылаюсь, фактически использует вашу технику блокировки копирования, а затем использования внешней блокировки (даже не заметил!). Мне нужно подумать, что это значит для заказа. Это, безусловно, упростило бы ситуацию. Я избегал weak_ptr / shared_ptr для решения проблем времени жизни, возможно, преждевременной оптимизации;)

    — Дидерик К. Нихорстер

  • У меня вопрос, который я могу задать сейчас: я бы очень хотел, чтобы мои обратные вызовы были без исключений. Могу ли я добиться этого? что-то вроде std::function<bool(Message...) noexcept> не существует.

    — Дидерик К. Нихорстер

  • Я думаю, что СОП по CR состоит в том, чтобы сделать новый пост, когда вы хотите пересмотреть существенно измененный код; вы всегда можете ссылаться на этот пост.

    — инди

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *