Асинхронный парсер в Python

Я написал веб-скрапер и хотел бы, чтобы он работал как можно быстрее. Выскабливание не тривиально; Я очищаю несколько веб-страниц, собираю с них ссылки, очищаю их, затем собираю ссылки с этих, затем снова очищаю их (проиллюстрировано тремя различными функциями, показывающими многоуровневый процесс). Поскольку стандартные http-запросы в python блокируются, я сделал это с помощью async/await, используя метод httpx библиотека с использованием очереди заданий и рабочих процессов.

Вот код, который я написал. Я удалил некоторый доменный код и импорт, чтобы сделать код немного короче и более точным. Функции могут показаться похожими с удаленным промежуточным кодом, но они зависят от веб-страницы и, по сути, очень разные. Они просто демонстрируют, что это рекурсивная процедура.

@dataclass
class TaskSpec:
    func: Callable
    args: list = field(default_factory=list)
    kwargs: dict = field(default_factory=dict)


@dataclass
class TaskResult:
    result: Any
    new_tasks: list[TaskSpec] = field(default_factory=list)


async def get_page_1(url: str, client: httpx.AsyncClient):
    response = await client.get(url)
    content = BeautifulSoup(response.content, features="lxml")
    
    result = ...

    ...

    new_jobs = []
    for row in html_table_rows:
        url = ...

        new_jobs.append(TaskSpec(get_page_2, (url, client)))

    return TaskResult(result, new_jobs)


async def get_page_2(url: str, client: httpx.AsyncClient):
    response = await client.get(url)
    content = BeautifulSoup(response.content, features="lxml")
    
    result = ...

    ...

    new_jobs = []
    for row in html_table_rows:
        url = ...

        new_jobs.append(TaskSpec(get_page_3, (url, client)))

    return TaskResult(result, new_jobs)


async def get_page_3(url: str, client: httpx.AsyncClient):
    response = await client.get(url)
    content = BeautifulSoup(response.content, features="lxml")
    
    result = ...

    return TaskResult(result)


async def process_queue(queue: asyncio.Queue, result_set: list):
    while True:
        try:
            spec: TaskSpec = await queue.get()
            result: TaskResult = await spec.func(*spec.args, **spec.kwargs)

            for new_task in result.new_tasks:
                queue.put_nowait(new_task)

            result_set.append(result.result)

        # If the task failed, re-add the task to the queue
        except httpx.ReadTimeout:
            print("Request timed out. Re-adding task to queue...")
            queue.put_nowait(spec)

        except Exception as e:
            pass
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue()
    result_set = []

    num_workers = 20
    httpx_limits = httpx.Limits(max_connections=num_workers)

    async with httpx.AsyncClient(timeout=10, limits=httpx_limits) as client:
        for url in urls:
            queue.put_nowait(TaskSpec(get_page_1, (url, client)))

        workers = []
        for i in range(num_workers):
            workers.append(asyncio.create_task(process_queue(queue, result_set)))

        # Wait for the queue to be emptied
        await queue.join()

    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

Несколько моментов, касающихся моего кода, о которых я хотел бы получить отзывы:

  1. Причина использования класса TaskSpec в том, что я могу легко повторно добавить ту же задачу обратно в очередь. я получаю большое количество httpx.ReadTimeout ошибки, предположительно из-за моего медленного соединения или, возможно, из-за того, что фактический анализ занимает некоторое время, а некоторые запросы слишком долго остаются в ожидании. Поэтому иногда мне нужно повторно поставить в очередь задание по очистке веб-страницы, и это был самый простой способ, который я мог придумать. Я также пытался создать объекты задач и повторно добавить сопрограмму обратно в очередь, используя task.get_coro(), но это не сработало, поскольку сопрограмма уже была выполнена. Кроме того, таким образом, фактические функции синтаксического анализа get_page_{1,3} на самом деле не нужно знать об очереди.

  2. Я использовал 20 рабочих, потому что увеличение этого числа, казалось, приводило к большому количеству httpx.ReadTimeout ошибки. Я не знаю, почему это так. Мое предположение заключалось в том, что, возможно, httpx полностью возвращал ответы, но тогда мой код выполнялся слишком долго и возвращал управление циклу событий, так что они не могли быть обработаны в течение периода ожидания. Хотя это чистое предположение. Я хотел бы иметь возможность увеличить количество рабочих, чтобы, надеюсь, он работал быстрее, поскольку время выполнения всего скрипта по-прежнему зависит в основном от силы моего интернет-соединения.

  3. Я не доволен тем, что должен пройти result_set перечислить в process_queue. Это очень похоже на C, и мне не нравится делать это в Python. Тем не менее, я не вижу другого способа иметь process_queue сохранить результаты, а затем вернуть их обратно в main(). Это связано с тем, что рабочие на самом деле не знают, когда очередь пуста. Я жду, когда очередь опустеет main() с использованием queue.join(), а затем отмените рабочие задания. И если я их отменю, они ничего не смогут вернуть. Верно?

Это мой первый раз, когда я действительно играю с async/await в Python, поэтому могут быть гораздо лучшие способы сделать то же самое, и я был бы рад услышать любые отзывы. Мне также очень интересно, есть ли лучший способ повторно поставить в очередь невыполненные задания для выполнения, и может ли кто-нибудь пролить свет на httpx.ReadTimeout ошибки, я тоже был бы признателен.

ПОЯСНЕНИЕ: Чтобы было ясно, я спрашиваю не о том, в порядке ли сам код, а в порядке ли структура парсера и правильно ли я использую async/await. Я бы использовал эту структуру и для других парсеров, поэтому код, специфичный для веб-страницы, здесь не имеет значения. Это будет просто основой скребка.

2 ответа
2

Выбранный вами дизайн определенно работает, хотя он больше напоминает традиционный многопоточный дизайн, чем асинхронный. Ваш текущий дизайн также будет отлично работать с блокирующей библиотекой HTTP, при условии, что ваши 20 process_queue() каждый экземпляр выполнялся в отдельном потоке (но см. примечание о завершении очереди ниже).

Что мне действительно нравится в вашем дизайне, так это то, что вы разделяете get_page_n() функции из кода управления задачами. Однако управление задачами может быть излишне сложным, потому что вы думаете о сопрограммах как о легковесных потоках, тогда как часто полезнее думать о будущем: объектах, которые в конечном итоге разрешаются в значение и могут ожидаться.

Прежде всего, я хочу поговорить об управлении лимиты незавершенного производства. В настоящее время вы достигаете этого, начав 20 process_queue() задачи, которые вытягивают работу из очереди. Вместо этого вы можете запустить любое количество задач, которые напрямую работают с каким-либо элементом, и вместо этого управлять лимитом WIP с помощью примитива синхронизации, такого как семафор. Очередь работ стала бы ненужной, и работа этой функции переместилась бы на управление повторными попытками и возвращенными new_tasks. Например:

async def run_with_retry(spec: TaskSpec, limit: Semaphore) -> TaskResult:
      while True:  # potentially retry this task
        try:
            async with limit:
                return await spec.func(*spec.args, **spec.kwargs)
        except httpx.ReadTimeout:
            print("Request timed out. Retrying later...")
            continue

async def process_task_spec(spec: TaskSpec, limit: Semaphore, result_set: list) -> None:
    result = await run_with_retry(spec, limit)

    result_set.append(result.result)

    new_tasks = [
        asyncio.create_task(process_task_spec(new_task, limit, result_set))
        for new_task in result.new_tasks
    ]
    await asyncio.gather(new_tasks)

async def main():
    result_set = []

    num_workers = 20
    httpx_limits = httpx.Limits(max_connections=num_workers)
    task_limits = asyncio.Semaphore(num_workers)

    async with httpx.AsyncClient(timeout=10, limits=httpx_limits) as client:
        initial_tasks = [
            process_task_spec(
                TaskSpec(get_page_1, (url, client)),
                task_limits,
                result_set,
            )
            for url in urls
        ]
        await asyncio.gather(initial_tasks)

Обратите внимание, что сохранение очереди задач по-прежнему очень разумно, если вы хотите избежать этой рекурсивной структуры.

TaskSpec учебный класс была задача, как и в вашем первоначальном дизайне, являющаяся объектом, который в конечном итоге будет выполнен и преобразован в значение. Вместо того, чтобы использовать один из встроенных исполнителей Python, вы создали свой собственный исполнитель. Но теперь это не более чем отложенный вызов функции — класс можно заменить на Callable[[], Future[TaskResult]] тип и вызов вида:

TaskSpec(get_page_1, (url, client))

может быть записано как:

lambda: get_page_1(url, client)

В принципе, эту лямбду тоже можно было бы убрать совсем, т.к. get_page_1(url, client) является сопрограммой. Однако сопрограмма будет выполняться немедленно до первой точки ожидания. Таким образом, сохранение одного уровня косвенности все еще может быть желательным, чтобы контролировать, когда функция начинает выполняться, что важно для обеспечения соблюдения ваших ограничений WIP.

Далее мы можем избавиться от своего result_set Если хочешь. В моем пересмотренном дизайне process_task_spec() управляет выполнением спецификации задачи и всех подзадач. Поэтому его можно обновить, чтобы он возвращал список результатов, которые основная функция может объединить:

async def process_task_spec(spec: TaskSpec, limit: Semaphore) -> list:
    ...
    
    all_results = [result.result]

    new_tasks = [...]
    for sub_results in await asyncio.gather(new_tasks):
        all_results.extend(sub_results)
    
async def main():
    ...

        all_results = [*r for r in await asyncio.gather(initial_tasks)]

Это изменит порядок результатов с порядка завершения в вашем дизайне на порядок создания задач в моем дизайне. Это может быть, а может и не быть актуальным.

А примечание по обработке исключений: исключения и параллелизм сложны. asyncio.gather() дает нам два режима работы с исключениями:

  • Режимы по умолчанию не позволяют собрать первое исключение, но не отменяют другие задачи. Таким образом, await asyncio.gather(tasks) возможно, придется записать так:

    try:
      await asyncio.gather(tasks)
    except:
      for t in tasks:
        t.cancel()
      raise
    
  • Кроме того, вы можете установить return_exceptions=True. Это также не приведет к какой-либо отмене, но вернет один результат для каждой задачи, заданной сбору. Результатом будет либо фактический результат, либо объект исключения. По сути, это оборачивает любую задачу следующим образом:

    async def return_exception(task):
      try:
        return await task
      except Exception as e:
        return e
    

    Это может быть уместно, если вы позже просматриваете результаты и агрегируете / сообщаете об исключениях, не желая быстро завершать программу.

А примечание о завершении очереди: В своем вопросе вы отметили проблему, заключающуюся в том, что вы ждали, пока очередь опустеет, а затем должны отменить своих воркеров. В многопоточном дизайне типичным подходом будет ожидание, пока очередь ввода не опустеет и все задачи не дадут результаты, а затем отправка нового ввода, который вызовет упорядоченное завершение рабочих процессов. В Python это часто делается путем постановки в очередь Noneа рабочий поток проверяет, является ли удаленный из очереди элемент None.

Понимание таймаутов. Если у вас возникли проблемы со сроками, первым шагом будет сбор полезной статистики — сколько времени на самом деле требуется для обработки каждой задачи? Вы можете потратить время непосредственно перед и после await spec.func(...). Для исследования задержки обычно используют процентили (например, 50%/медиана для представления «типичной» задержки или процентили 95% для представления «максимальной» задержки). Вы можете поэкспериментировать, чтобы увидеть, как меняется задержка с разными num_workers пределы. Было бы лучше, если бы эти показатели были значительно ниже настроенного тайм-аута в 10 секунд.

Тайм-ауты могут быть вызваны рядом причин, таких как ограничение скорости на сервере, к которому вы подключаетесь, потеря пакета или насыщение пропускной способности вашего соединения или блокировка операций в ваших сопрограммах. Также стоит отметить, что у вас не должно быть больше активных воркеров, чем доступных подключений.

Заметка о скрапе. Если вы очищаете чужие серверы, правила этикета предполагают, что вы должны избегать чрезмерной нагрузки. Это означает ограничение скорости ваших запросов, как правило, до одного запроса каждые несколько секунд на целевой сервер. Если вы сканируете ссылки, также важно соблюдать robots.txt. Несоблюдение такого этикета может привести к срабатыванию правил брандмауэра против вас.

Вы можете управлять такими ограничениями скорости в цикле повторных попыток, но до раздела с ограничением WIP. Например, у вас может быть общий last_accessed словарь, который предоставляет метку времени для каждого домена. Доступ к словарю будет защищен замком:

from contextlib import asynccontextmanager

async def run_with_retry(spec: TaskSpec, wip_limit: Semaphore, rate_limit: RateLimit) -> TaskResult:
    domain = get_domain(spec.url)
    while True:  # potentially retry this task
        try:
            async with rate_limit(domain), wip_limit:
                return await spec.func(*spec.args, **spec.kwargs)
        ...

class RateLimit:
  def __init__(self, delay_seconds: float) -> None:
    assert delay_seconds > 0.0
    # explict "None" indicates domain is currently processed by someone else
    self._last_accessed: dict[str, Optional[float]] = {}
    self._lock: asyncio.Lock()
    self._delay_seconds = delay_seconds

  @asynccontextmanager
  async def __call__(self, domain: str):
    while True:
      async with self._lock:
        last_accessed = self._last_accessed.get(domain, -math.inf)

        if last_accessed is None:
          # Domain currently processed by someone else:
          # must wait at least this long anyway
          sleep_for = self._delay_seconds
        else:
          sleep_for = last_accessed + self._delay_seconds - time.monotonic()

        if sleep_for <= 0:
          self._last_accessed[domain] = None  # mark domain as in-use
          break

      await asyncio.sleep(sleep_for)

    try:
      yield
    finally:
      async with self._lock:
        assert self._last_accessed[domain] is None
        self._last_accessed[domain] = time.monotonic()     

Этот дизайн требует паузы не менее delay_seconds между запросами, эффективно упорядочивая запросы к одному и тому же домену. Если вместо этого вы хотите ограничить среднюю скорость запросов (что может разрешить одновременные запросы), реализацию можно упростить, избавившись от None статус. Вместо того, чтобы помечать домен как используемый, запись будет обновлена ​​до time.monotonic() до выхода, и никакой дальнейшей бухгалтерии не потребуется после выхода.

Примечание: этот ответ был первоначально опубликован на Code Review SE.

get_page_{1,3} почти идентичны. Вы можете заменить их одной функцией, которая принимает depth как параметр. Вы называете это с depth=2 из main() а затем выполните рекурсию при уменьшении depth на 1 на каждом шаге. Если depth == 0 не продолжайте рекурсию.

Никогда не пиши

except Exception as e:
    pass

Вы замалчиваете все возможные ошибки, которые могут произойти в try блокировать. Если что-то пойдет не так неожиданным образом, вы никогда не узнаете об этом, и это будет невозможно отладить.

Иметь дело с httpx.ReadTimeouts на нижнем уровне, внутри get_page() метод. Просто повторите попытку получить response пока вы не получите его (вы можете написать метод для этого). Это избавит вас от повторного добавления задачи в очередь.

Сами ошибки, скорее всего, вызваны вашей ограниченной скоростью интернета, когда вы пытаетесь загрузить страницы с 20 разных воркеров, так что это узкое место для производительности программы.

Прохождение result_set перечислить в process_queue в полном порядке.

Делиться

Улучшить этот ответ

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *