Опрос FD во вспомогательном потоке, динамическое добавление / удаление дескрипторов

Когда я писал, я понял, что у меня есть несколько классов, опрашивающих дескрипторы файлов в своем собственном потоке – много дублированного кода, такого как синхронизация, создание массива опросов и т. Д.

Я решил удалить повторяющийся код, написав следующий класс, который занимается опросом файловых файлов в фоновом режиме. Есть предположения?

template<typename Callback, typename ReadFn>
class Poller
{
public:
    using storage_type = std::vector<struct pollfd>;
    using callbacks_type = std::map<int, Callback>;

    explicit Poller(ReadFn&& readFn) : readFn(std::forward(readFn)) {}
    ~Poller()
    {
        if (worker.joinable())
        {
            executeInContext([](storage_type& storage, callbacks_type& callbacks){
                storage.erase(storage.begin()+1, storage.end());
                callbacks.clear();
            });

            worker.join();
            close(_fd[0]);
            close(_fd[1]);
        }
    }

    void addDescriptor(int fd, Callback cb)
    {
        if (!worker.joinable())
        {
            pipe(_fd);
            _storage.push_back({_fd[0], POLLPRI | POLLIN, 0});
            _storage.push_back({fd, POLLPRI | POLLIN, 0});
            _callbacks[fd] = cb;

            worker = std::thread([this]{threadFunc();});
        }
        else
        {
            executeInContext([fd, cb](storage_type& storage, callbacks_type& callbacks){
                auto it = std::find_if(storage.begin(), storage.end(),
                                       [fd](struct pollfd i){ return fd == i.fd; });
                if (it == storage.end())
                {
                    storage.push_back({fd, POLLPRI | POLLIN, 0});
                }
                callbacks[fd] = cb;
            });
        }
    }

    void removeDescriptor(int fd)
    {
        executeInContext([fd](storage_type& storage, callbacks_type& callbacks){
            if (auto it = std::find_if(storage.begin(), storage.end(), [fd](struct pollfd i){
                                            return i.fd == fd;
                                        }); it != storage.end())
            {
                callbacks.erase(it->fd);
                storage.erase(it);
            }
        });

        if (_storage.size() == 1 && worker.joinable())
        {
            worker.join();

            close(_fd[0]);
            close(_fd[1]);
            _storage.clear();
        }
    }

    template<typename Callable>
    void executeInContext(Callable&& func)
    {
        ::write(_fd[1], "suse", 5);
        {
            std::unique_lock lock(mutex);
            std::invoke(std::forward(func),
                        std::ref(_storage), std::ref(_callbacks));
        }
        cv.notify_all();
    }

protected:
    void threadFunc()
    {
        std::unique_lock lock(mutex);
        while (true)
        {
            if (int rv = poll(_storage.data(), _storage.size(), -1); rv > 0)
            {
                /* Pipe sync request */
                if (_storage.begin()->revents)
                {
                    char buff[0x10];
                    ::read(_fd[0], buff, sizeof(buff));

                    /* Wait for main thread to execute its stuff */
                    cv.wait(lock);

                    if (_storage.size() == 1)
                    {
                        /* All fds have been removed except the pipe, exit */
                        break;
                    }
                }

                for (auto& pfd : _storage)
                {
                    if (pfd.revents)
                    {
                        std::invoke(readFn,
                                    pfd.fd, _callbacks.at(pfd.fd));
                    }
                }
            }
            else if (errno == EINTR)
            {
                continue;
            }
            else
            { /* Debug trace breakpoint */ }
        }
    }

private:
    std::thread worker{};
    std::vector<struct pollfd> _storage{};
    std::map<int, Callback> _callbacks{};
    std::condition_variable cv{};
    std::mutex mutex{};
    int _fd[2]{};

    ReadFn readFn;
};

Пример использования:

using callback_type = std::function<void(int)>;
auto readfn = [](int fd, callback_type cb){
    struct event_structure ev{};
    ::read(fd, &ev, sizeof(ev));

    std::invoke(cb, ev.property);
};
Poller<callback_type, decltype(readfn)> poller(readfn);

int fd = /* open FD */;
poller.addDescriptor(fd, [](int){/* Handle property change */});
/* More descriptors with unique callbacks added/removed during program execution */

1 ответ
1

Запустить поток в конструкторе

Запуск и остановка потока по запросу требует осторожности, чтобы не вызвать каких-либо условий гонки. Но нет никаких причин не запускать поток все время, если он просто заблокирован в ожидании команды на sync fd, он не использует процессорного времени. Поэтому я рекомендую вам запустить поток в конструкторе и остановить его в деструкторе.

Отсутствует обработка ошибок

Призывы к ::read() а также ::write() может потерпеть неудачу; обязательно правильно обрабатывать сбои.

Использовать std::vector для _callbacks

Должна быть возможность хранить обратные вызовы в std::vector, что соответствует его порядку с порядком _storage. Таким образом, вам не придется выполнять довольно дорогостоящий поиск, чтобы получить обратный вызов; вы просто повторяете _storage а также _callbacks одновременно в threadFunc().

Безопасность потоков

Код, который у вас есть, может работать для вашего приложения, но он довольно хрупкий. Вы написали его таким образом, что добавлять и удалять дескрипторы в Poller в том же потоке, где вы его создали. Самое главное, обратный вызов не может безопасно удалить себя. Можно было бы сделать его более надежным за счет добавления немного большей сложности.

Одна из причин заключается в том, что добавление элементов или удаление элементов из _storage сделает недействительными любые итераторы, в том числе те, которые используются за кулисами for-зациклиться threadFunc(). Возможное решение – установить флаг при добавлении или удалении элементов из цикла и после вызова обратного вызова немедленно выйти из цикла, если этот флаг установлен.

Другая проблема в том, что призыв executeInContext() изнутри потока опроса приведет к тупиковой ситуации. Потенциальное решение – позволить определить, executeInContext() определить, вызывается ли он из того же потока, что и threadFunc(), и если это так, не пытайтесь заблокировать мьютекс.

  • Я согласен с тем, что обратный вызов не может удалить себя, но почему я могу добавлять элементы только из потока, который его создал?

    – Квест

  • Причина в том, что итераторы, которые все еще используются, таким образом аннулируются, и возникает ситуация тупика. Я обновил ответ.

    – Г. Сон



Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *