Я обрабатываю объект генератора неизвестной «длины». Я должен держать вещи «ленивыми» из-за управления памятью. Обработка требует больших вычислительных ресурсов, поэтому ее решение в многопроцессорном стиле является решением (по крайней мере, мне так кажется).
Я решил эту проблему multiproc на объекте-генераторе с помощью комбинации патча обезьяны и ограниченной очереди.
Что меня действительно чешет, так это пластырь с обезьяной … Как вы думаете, это нормально применять imap()
на объекте генератора? Как бы ты это сделал?
Подчеркну, что основное внимание уделяется вычислить выходы генератора параллельно.
С точки зрения этого «минимального примера»:
process_line, process_line_init, process_first_n_line
это функции, о которых мне больше всего интересно ваше мнение.
import multiprocessing as mp
import psutil
import queue
from typing import Any, Dict, Iterable, Set
def yield_n_line(n: int)-> Iterable[Dict[str, str]]:
for i in range(n):
yield {'body': "Never try to 'put' without a timeout sec declared"}
def get_unique_words(x: Dict[str, str])-> Set[str]:
return set(x['body'].split())
def process_line(x:Dict[str, str])-> Set[str]:
try:
process_line.q.put(x, block=True, timeout=2)
except queue.Full:
pass
return get_unique_words(x)
def process_line_init(q: mp.Queue)-> None:
process_line.q = q
def process_first_n_line(number_of_lines: int)-> Any:
n_line = yield_n_line(number_of_lines)
if psutil.cpu_count(logical=False) > 4:
cpu_count = psutil.cpu_count(logical=False)-2
else:
cpu_count = psutil.cpu_count(logical=False)
q = mp.Queue(maxsize=8000)
p = mp.Pool(cpu_count, process_line_init, [q])
results = p.imap(process_line, n_line)
for _ in range(number_of_lines):
try:
q.get(timeout=2)
except queue.Empty:
q.close()
q.join_thread()
yield results.next()
p.close()
p.terminate()
p.join()
pass
def yield_uniqueword_chunks(
n_line: int = 10_000_000,
chunksize: int = 1_787_000)-> Iterable[Set[str]]:
chunk = set()
for result in process_first_n_line(n_line):
chunk.update(result)
if len(chunk) > chunksize:
yield chunk
chunk = set()
yield chunk
def main()-> None:
for chunk in yield_uniqueword_chunks(
n_line=1000, #Number of total comments to process
chunksize=200 #number of unique words in a chunk (around 32MB)
):
print(chunk)
#export(chunk)
if __name__ == "__main__":
main()
1 ответ
Вот краткая иллюстрация того, как достичь поставленной цели, а именно: вычислять выходы генератора параллельно. Я предлагаю это, потому что я не мог понять цель большей части сложности в вашем текущем коде. Я подозреваю, что есть проблемы, которые вы нам не объяснили или которые я не смог сделать (если да, то этот ответ может быть неправильным). В любом случае иногда полезно взглянуть на проблему в упрощенной форме, чтобы увидеть, может ли ваш реальный вариант использования нормально работать с этими параметрами.
import multiprocessing as mp
def main():
for chunk in process_data(1000, 50):
print()
print(len(chunk), chunk)
def process_data(n, chunksize):
# Set up the Pool using a context manager.
# This relieves you of the hassle of join/close.
with mp.Pool() as p:
s = set()
# Just iterate over the results as they come in.
# No need to check for empty queues, etc.
for res in p.imap(worker, source_gen(n)):
s.update(res)
if len(s) >= chunksize:
yield s
s = set()
def source_gen(n):
# A data source that will yield N values.
# To get the data in this example:
# curl 'https://www.gutenberg.org/files/2600/2600-0.txt' -o war-peace.txt
with open('war-peace.txt') as fh:
for i, line in enumerate(fh):
yield line
if i >= n:
break
def worker(line):
# A single-argument worker function.
# If you need multiple args, bundle them in tuple/dict/etc.
return [word.lower() for word in line.split()]
if __name__ == "__main__":
main()
Этот пример иллюстрирует разновидность распараллеливания:
Источник данных работает в одном процессе (родительский).
Последующие вычисления выполняются в нескольких дочерних процессах.
И окончательная агрегация / отчетность выполняется в одном процессе (также родительском).
За кулисами, Pool.imap()
использует травление и очереди для передачи данных между процессами. Если ваш реальный вариант использования требует распараллеливания генерации данных и / или отчетности, необходим другой подход.
Спасибо, за быстрый ответ, я проверил ваш пример, и сам по себе он работает и заслуживает внимания. Через день я смогу проверить / реализовать его в моем конкретном случае использования и посмотреть, где я усложнил эту проблему. Шаблон ‘for res in p.imap (worker, source_gen (n))’ как-то всегда давал мне ошибку, поэтому я выбрал свое решение. Спасибо, вернусь чуть позже!
— Поттер А
Работает как шарм. Раньше я использовал шаблон «С» для бассейна, но только для списка. Я слишком усложнил эту задачу.
— Поттер А