Чтение stdout и stderr подпроцесса с использованием Win32, перекрывающийся ввод-вывод

Я пишу код Windows на C ++, который запускает процесс, а затем считывает его stdout и stderr по отдельности в буферы.

Если я попытаюсь просто прочитать потоки по одному, он может зависнуть, если дочерний процесс пытается записать в другой поток. Итак, я узнал, что мне нужно создавать именованные каналы, а затем вызывать ReadFile со структурой OVERLAPPED и ждать их.

Доступно полностью рабочее решение в GitHub. Однако я прошу обзор в основном файла ReadMultiplePipes.cpp, указанного ниже.

Мне нужно поддерживать только 2 канала (stdout и stderr), но я сделал более общую функцию, которая получает вектор дескрипторов каналов и возвращает вектор символов той же длины. Он читает их, пока все они не станут ожидающими (или закрытыми), а затем ожидает всех ожидающих и снова возвращается к чтению.

Вот что меня беспокоит:

  • Правильно ли выполняет код Overlapped I / O, то есть он не зависнет, если подпроцесс записывает выходные данные, он не потеряет байты и т. Д.
  • Правильная ли обработка ошибок и управление ресурсами, т.е. отсутствие утечек.
  • Есть ли более короткий способ сделать это, я не думал, что для чтения двух потоков потребуется столько кода.

ReadMultiplePipes.cpp

#include <algorithm>
#include <iostream>
#include <list>
#include <sstream>
#include <vector>

#include <tchar.h>
#include <Windows.h>

#include "Shared.h"

struct ReadPipeInfo
{
    ReadPipeInfo(HANDLE handle) : readHandle(handle)
    {
        ZeroMemory(&overlapped, sizeof(OVERLAPPED));
        if (!(overlapped.hEvent = CreateEvent(nullptr, false, true, nullptr)))
            throw std::runtime_error("Call CreateEvent failed.");
    }
    ~ReadPipeInfo()
    {
        CloseHandle(overlapped.hEvent);
    }
    HANDLE readHandle = nullptr;
    std::vector<char> outputBuffer;
    OVERLAPPED overlapped = {};
    UCHAR buf[1024 * 4] = {};
    // 0   - open
    // 997 - ERROR_IO_PENDING  - Overlapped I/O operation is in progress.
    // 107 - ERROR_BROKEN_PIPE - The pipe has been ended.
    DWORD state = 0;
};

static DWORD ReadAndInsertOverlappedResult(ReadPipeInfo& readPipe);

std::vector<std::vector<char>> ReadFromMultiplePipes(const std::list<HANDLE>& handles)
{
    std::vector<std::shared_ptr<ReadPipeInfo>> readPipes;
    std::transform(handles.begin(), handles.end(), std::back_inserter(readPipes), [](HANDLE x) { return std::make_shared<ReadPipeInfo>(x); });

    while (std::any_of(readPipes.begin(), readPipes.end(), [](auto& i) { return i->state != ERROR_BROKEN_PIPE; }) )
    {
        // Read until all pipes are pending or broken
        std::vector<std::shared_ptr<ReadPipeInfo>>::iterator it;
        while ((it = std::find_if(readPipes.begin(), readPipes.end(), [](auto& i) { return i->state == 0; })) != readPipes.end())
        {
            auto& currOpenPipe = **it;
            BOOL rfResult = ::ReadFile(currOpenPipe.readHandle, currOpenPipe.buf, sizeof(currOpenPipe.buf), nullptr, &currOpenPipe.overlapped);
            if (rfResult)
                CHECK_WIN32_ERROR(ReadAndInsertOverlappedResult(currOpenPipe));
            else
            {
                DWORD lastError = ::GetLastError();
                if (lastError != ERROR_IO_PENDING && lastError != ERROR_BROKEN_PIPE) // 997 109
                    throw std::runtime_error("ReadFile failed");
                else
                {
                    currOpenPipe.state = lastError;
                    ::SetLastError(0);
                }
            }
        }

        // Wait for pending IO
        std::vector<std::shared_ptr<ReadPipeInfo>> pendingReadPipes;
        std::copy_if(readPipes.begin(), readPipes.end(), std::back_inserter(pendingReadPipes), [](auto& x) { return x->state == ERROR_IO_PENDING; });
        if (pendingReadPipes.size() > 0)
        {
            std::vector<HANDLE> eventsToWait;
            std::transform(pendingReadPipes.begin(), pendingReadPipes.end(), std::back_inserter(eventsToWait), [](auto& x) { return x->overlapped.hEvent; });
            DWORD waitResult = ::WaitForMultipleObjects((DWORD)eventsToWait.size(), eventsToWait.data(), FALSE, INFINITE);
            if (waitResult >= WAIT_OBJECT_0 && waitResult < WAIT_OBJECT_0 + pendingReadPipes.size())
            {
                auto& currPendingPipe = *pendingReadPipes[waitResult];
                CHECK_WIN32_ERROR(ReadAndInsertOverlappedResult(currPendingPipe));
            }
            else
                throw std::runtime_error("WaitForMultipleObjects failed");
        }
    }

    std::vector<std::vector<char>> ret;
    std::transform(readPipes.begin(), readPipes.end(), std::back_inserter(ret), [](auto& x) { return x->outputBuffer; });
    return ret;
}

static DWORD ReadAndInsertOverlappedResult(ReadPipeInfo& readPipe)
{
    DWORD lpNumberOfBytesTransferred = 0;
    BOOL gor = ::GetOverlappedResult(readPipe.readHandle, &readPipe.overlapped, &lpNumberOfBytesTransferred, false);
    if (gor)
    {
        readPipe.state = 0;
        readPipe.outputBuffer.insert(readPipe.outputBuffer.end(), readPipe.buf, readPipe.buf + lpNumberOfBytesTransferred);
        return TRUE;
    }
    else
    {
        if (::GetLastError() != ERROR_BROKEN_PIPE) // 109
            return FALSE;
        readPipe.state = ERROR_BROKEN_PIPE;
        ::SetLastError(0);
        return TRUE;
    }
}

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *