Вычислить выходы генератора параллельно

Я обрабатываю объект генератора неизвестной «длины». Я должен держать вещи «ленивыми» из-за управления памятью. Обработка требует больших вычислительных ресурсов, поэтому ее решение в многопроцессорном стиле является решением (по крайней мере, мне так кажется).

Я решил эту проблему multiproc на объекте-генераторе с помощью комбинации патча обезьяны и ограниченной очереди.

Что меня действительно чешет, так это пластырь с обезьяной … Как вы думаете, это нормально применять imap() на объекте генератора? Как бы ты это сделал?

Подчеркну, что основное внимание уделяется вычислить выходы генератора параллельно.
С точки зрения этого «минимального примера»:

process_line, process_line_init, process_first_n_line

это функции, о которых мне больше всего интересно ваше мнение.

import multiprocessing as mp
import psutil
import queue 
from typing import Any, Dict, Iterable, Set

def yield_n_line(n: int)-> Iterable[Dict[str, str]]:
    for i in range(n):
        yield {'body': "Never try to 'put' without a timeout sec declared"}

def get_unique_words(x: Dict[str, str])-> Set[str]:
    return set(x['body'].split())

def process_line(x:Dict[str, str])-> Set[str]:
    try:
        process_line.q.put(x, block=True, timeout=2)
    except queue.Full:
        pass
    return get_unique_words(x)

def process_line_init(q: mp.Queue)-> None:
    process_line.q = q

def process_first_n_line(number_of_lines: int)-> Any:
    n_line = yield_n_line(number_of_lines)
    
    if psutil.cpu_count(logical=False) > 4:
        cpu_count = psutil.cpu_count(logical=False)-2
    else:
        cpu_count = psutil.cpu_count(logical=False)
    q = mp.Queue(maxsize=8000)
    p = mp.Pool(cpu_count, process_line_init, [q])
    results = p.imap(process_line, n_line)
    for _ in range(number_of_lines):
        try:
            q.get(timeout=2)
        except queue.Empty:
            q.close()
            q.join_thread()
        yield results.next()
    p.close()
    p.terminate()
    p.join()
    pass

def yield_uniqueword_chunks(
    n_line: int = 10_000_000,
    chunksize: int = 1_787_000)-> Iterable[Set[str]]:
    chunk = set()
    for result in process_first_n_line(n_line):
        chunk.update(result)
        if len(chunk) > chunksize:
            yield chunk
            chunk = set()
    yield chunk

def main()-> None:
    for chunk in yield_uniqueword_chunks(
        n_line=1000, #Number of total comments to process
        chunksize=200 #number of unique words in a chunk (around 32MB)
        ):
        print(chunk)
        #export(chunk)
    
if __name__ == "__main__":
    main()

1 ответ
1

Вот краткая иллюстрация того, как достичь поставленной цели, а именно: вычислять выходы генератора параллельно. Я предлагаю это, потому что я не мог понять цель большей части сложности в вашем текущем коде. Я подозреваю, что есть проблемы, которые вы нам не объяснили или которые я не смог сделать (если да, то этот ответ может быть неправильным). В любом случае иногда полезно взглянуть на проблему в упрощенной форме, чтобы увидеть, может ли ваш реальный вариант использования нормально работать с этими параметрами.

import multiprocessing as mp

def main():
    for chunk in process_data(1000, 50):
        print()
        print(len(chunk), chunk)
    
def process_data(n, chunksize):
    # Set up the Pool using a context manager.
    # This relieves you of the hassle of join/close.
    with mp.Pool() as p:
        s = set()
        # Just iterate over the results as they come in.
        # No need to check for empty queues, etc.
        for res in p.imap(worker, source_gen(n)):
            s.update(res)
            if len(s) >= chunksize:
                yield s
                s = set()

def source_gen(n):
    # A data source that will yield N values.
    # To get the data in this example:
    # curl 'https://www.gutenberg.org/files/2600/2600-0.txt' -o war-peace.txt
    with open('war-peace.txt') as fh:
        for i, line in enumerate(fh):
            yield line
            if i >= n:
                break

def worker(line):
    # A single-argument worker function.
    # If you need multiple args, bundle them in tuple/dict/etc.
    return [word.lower() for word in line.split()]

if __name__ == "__main__":
    main()

Этот пример иллюстрирует разновидность распараллеливания:

  • Источник данных работает в одном процессе (родительский).

  • Последующие вычисления выполняются в нескольких дочерних процессах.

  • И окончательная агрегация / отчетность выполняется в одном процессе (также родительском).

За кулисами, Pool.imap() использует травление и очереди для передачи данных между процессами. Если ваш реальный вариант использования требует распараллеливания генерации данных и / или отчетности, необходим другой подход.

  • 1

    Спасибо, за быстрый ответ, я проверил ваш пример, и сам по себе он работает и заслуживает внимания. Через день я смогу проверить / реализовать его в моем конкретном случае использования и посмотреть, где я усложнил эту проблему. Шаблон ‘for res in p.imap (worker, source_gen (n))’ как-то всегда давал мне ошибку, поэтому я выбрал свое решение. Спасибо, вернусь чуть позже!

    — Поттер А

  • Работает как шарм. Раньше я использовал шаблон «С» для бассейна, но только для списка. Я слишком усложнил эту задачу.

    — Поттер А

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *