Сокращение времени выполнения создания поля интеграции в алгоритме поля потока (python)

Поле интеграции необходимо пересчитывать каждый раз, когда меняются начальные или конечные координаты. Я хотел использовать алгоритм поля потока для задачи, которая включает в себя большие лабиринты с изменением начальной или конечной точки каждые 0,5 секунды. В настоящий момент алгоритм слишком медленный, чтобы быть полезным, потому что поле интегрирования необходимо пересчитывать каждый раз, когда изменяется начальная или конечная точка.

Есть у кого-то идеи, как сократить время работы метода create_integration_field в классе FlowField? Рад каждой подсказке! (Если вы хорошо разбираетесь в numpy, я также был бы признателен за возможное решение, использующее его.) Заранее спасибо:

import numpy as np
import matplotlib.pyplot as plt

show_animation = True


def draw_horizontal_line(start_x, start_y, length, o_x, o_y, o_dict, path):
    for i in range(start_x, start_x + length):
        for j in range(start_y, start_y + 2):
            o_x.append(i)
            o_y.append(j)
            o_dict[(i, j)] = path


def draw_vertical_line(start_x, start_y, length, o_x, o_y, o_dict, path):
    for i in range(start_x, start_x + 2):
        for j in range(start_y, start_y + length):
            o_x.append(i)
            o_y.append(j)
            o_dict[(i, j)] = path


class FlowField:
    def __init__(self, obs_grid, goal_x, goal_y, start_x, start_y,
                 limit_x, limit_y):
        self.start_pt = [start_x, start_y]
        self.goal_pt = [goal_x, goal_y]
        self.obs_grid = obs_grid
        self.limit_x, self.limit_y = limit_x, limit_y
        self.cost_field = {}
        self.integration_field = {}
        self.vector_field = {}

    def find_path(self):
        self.create_cost_field()
        self.create_integration_field()
        self.assign_vectors()
        self.follow_vectors()

    def create_cost_field(self):
        """Assign cost to each grid which defines the energy
        it would take to get there."""
        for i in range(self.limit_x):
            for j in range(self.limit_y):
                if self.obs_grid[(i, j)] == 'free':
                    self.cost_field[(i, j)] = 1
                elif self.obs_grid[(i, j)] == 'medium':
                    self.cost_field[(i, j)] = 7
                elif self.obs_grid[(i, j)] == 'hard':
                    self.cost_field[(i, j)] = 20
                elif self.obs_grid[(i, j)] == 'obs':
                    continue

                if [i, j] == self.goal_pt:
                    self.cost_field[(i, j)] = 0

    def create_integration_field(self):
        """Start from the goal node and calculate the value
        of the integration field at each node. Start by
        assigning a value of infinity to every node except
        the goal node which is assigned a value of 0. Put the
        goal node in the open list and then get its neighbors
        (must not be obstacles). For each neighbor, the new
        cost is equal to the cost of the current node in the
        integration field (in the beginning, this will simply
        be the goal node) + the cost of the neighbor in the
        cost field + the extra cost (optional). The new cost
        is only assigned if it is less than the previously
        assigned cost of the node in the integration field and,
        when that happens, the neighbor is put on the open list.
        This process continues until the open list is empty."""
        for i in range(self.limit_x):
            for j in range(self.limit_y):
                if self.obs_grid[(i, j)] == 'obs':
                    continue
                self.integration_field[(i, j)] = np.inf
                if [i, j] == self.goal_pt:
                    self.integration_field[(i, j)] = 0

        open_list = [(self.goal_pt, 0)]
        while open_list:
            curr_pos, curr_cost = open_list[0]
            curr_x, curr_y = curr_pos
            for i in range(-1, 2):
                for j in range(-1, 2):
                    x, y = curr_x + i, curr_y + j
                    if self.obs_grid[(x, y)] == 'obs':
                        continue
                    if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
                        e_cost = 10
                    else:
                        e_cost = 14
                    neighbor_energy = self.cost_field[(x, y)]
                    neighbor_old_cost = self.integration_field[(x, y)]
                    neighbor_new_cost = curr_cost + neighbor_energy + e_cost
                    if neighbor_new_cost < neighbor_old_cost:
                        self.integration_field[(x, y)] = neighbor_new_cost
                        open_list.append(([x, y], neighbor_new_cost))
            del open_list[0]

    def assign_vectors(self):
        """For each node, assign a vector from itself to the node with
        the lowest cost in the integration field. An agent will simply
        follow this vector field to the goal"""
        for i in range(self.limit_x):
            for j in range(self.limit_y):
                if self.obs_grid[(i, j)] == 'obs':
                    continue
                if [i, j] == self.goal_pt:
                    self.vector_field[(i, j)] = (None, None)
                    continue
                offset_list = [(i + a, j + b)
                               for a in range(-1, 2)
                               for b in range(-1, 2)]
                neighbor_list = [{'loc': pt,
                                  'cost': self.integration_field[pt]}
                                 for pt in offset_list
                                 if self.obs_grid[pt] != 'obs']
                neighbor_list = sorted(neighbor_list, key=lambda x: x['cost'])
                best_neighbor = neighbor_list[0]['loc']
                self.vector_field[(i, j)] = best_neighbor
    
    def follow_vectors(self):
        curr_x, curr_y = self.start_pt
        while curr_x is not None and curr_y is not None:
            curr_x, curr_y = self.vector_field[(curr_x, curr_y)]
            if curr_x is None or curr_y is None:
                break

            if show_animation:
                plt.plot(curr_x, curr_y, "b*")
                plt.pause(0.001)

        if show_animation:
            plt.show()


def main():
    # set obstacle positions
    obs_dict = {}
    for i in range(51):
        for j in range(51):
            obs_dict[(i, j)] = 'free'
    o_x, o_y, m_x, m_y, h_x, h_y = [], [], [], [], [], []

    s_x = 5.0
    s_y = 5.0
    g_x = 35.0
    g_y = 45.0

    # draw outer border of maze
    draw_vertical_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
    draw_vertical_line(48, 0, 50, o_x, o_y, obs_dict, 'obs')
    draw_horizontal_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
    draw_horizontal_line(0, 48, 50, o_x, o_y, obs_dict, 'obs')

    # draw inner walls
    all_x = [10, 10, 10, 15, 20, 20, 30, 30, 35, 30, 40, 45]
    all_y = [10, 30, 45, 20, 5, 40, 10, 40, 5, 40, 10, 25]
    all_len = [10, 10, 5, 10, 10, 5, 20, 10, 25, 10, 35, 15]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_vertical_line(x, y, l, o_x, o_y, obs_dict, 'obs')

    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [35, 40, 15, 10, 45, 20, 10, 15, 25, 45, 10, 30, 10, 40]
    all_y = [5, 10, 15, 20, 20, 25, 30, 35, 35, 35, 40, 40, 45, 45]
    all_len = [10, 5, 10, 10, 5, 5, 10, 5, 10, 5, 10, 5, 5, 5]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_horizontal_line(x, y, l, o_x, o_y, obs_dict, 'obs')

    # Some points are assigned a slightly higher energy value in the cost
    # field. For example, if an agent wishes to go to a point, it might
    # encounter different kind of terrain like grass and dirt. Grass is
    # assigned medium difficulty of passage (color coded as green on the
    # map here). Dirt is assigned hard difficulty of passage (color coded
    # as brown here). Hence, this algorithm will take into account how
    # difficult it is to go through certain areas of a map when deciding
    # the shortest path.

    # draw paths that have medium difficulty (in terms of going through them)
    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [10, 45]
    all_y = [22, 20]
    all_len = [8, 5]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_vertical_line(x, y, l, m_x, m_y, obs_dict, 'medium')

    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [20, 30, 42] + [47] * 5
    all_y = [35, 30, 38] + [37 + i for i in range(2)]
    all_len = [5, 7, 3] + [1] * 3
    for x, y, l in zip(all_x, all_y, all_len):
        draw_horizontal_line(x, y, l, m_x, m_y, obs_dict, 'medium')

    # draw paths that have hard difficulty (in terms of going through them)
    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [15, 20, 35]
    all_y = [45, 20, 35]
    all_len = [3, 5, 7]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_vertical_line(x, y, l, h_x, h_y, obs_dict, 'hard')

    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [30] + [47] * 5
    all_y = [10] + [37 + i for i in range(2)]
    all_len = [5] + [1] * 3
    for x, y, l in zip(all_x, all_y, all_len):
        draw_horizontal_line(x, y, l, h_x, h_y, obs_dict, 'hard')

    if show_animation:
        plt.plot(o_x, o_y, "sr")
        plt.plot(m_x, m_y, "sg")
        plt.plot(h_x, h_y, "sy")
        plt.plot(s_x, s_y, "og")
        plt.plot(g_x, g_y, "o")
        plt.grid(True)

    flow_obj = FlowField(obs_dict, g_x, g_y, s_x, s_y, 50, 50)
    flow_obj.find_path()


if __name__ == '__main__':
    main()

1 ответ
1

Код, по крайней мере, с точки зрения стиля, уже довольно приличный. Рассматривать:

  • Добавить подсказки типа PEP484

  • Ваш draw_*_line методы имеют запутанное название — на самом деле они ничего не рисуют; это процедуры построения / настройки данных. Кроме того, вместо того, чтобы видоизменять o_x, o_y и o_dict, полностью исключить побочные эффекты этой функции просто — просто верните два списка и dict, а на вызывающей стороне вызовите два extends и an update, соответственно.

  • start_pt и goal_pt следует использовать кортежи, а не списки

  • Для этого кода:

      for i in range(self.limit_x):
          for j in range(self.limit_y):
    

Поскольку это повторяется несколько раз, подумайте о том, чтобы преобразовать его в функцию генератора, которая дает кортежи координат.

  • Рассмотрите возможность замены вашего 'obs'и т. д. строки с Enum значения
  • Используйте набор вместо списка для проверки членства в if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
  • Для тебя neighbor_listвместо слабо типизированного словаря сделайте класс — a @dataclass подойдет

Реализация, которая выполняет некоторые из вышеперечисленных:

from typing import Tuple, Dict, List

import numpy as np
import matplotlib.pyplot as plt

show_animation = True


Coord = Tuple[int, int]


def draw_horizontal_line(
    start_x: int, start_y: int, length: int,
    o_x: List[int], o_y: List[int],
    o_dict: Dict[Coord, str],
    path: str,
) -> None:
    for i in range(start_x, start_x + length):
        for j in range(start_y, start_y + 2):
            o_x.append(i)
            o_y.append(j)
            o_dict[(i, j)] = path


def draw_vertical_line(
    start_x: int, start_y: int, length: int,
    o_x: List[int], o_y: List[int],
    o_dict: Dict[Coord, str],
    path: str,
) -> None:
    for i in range(start_x, start_x + 2):
        for j in range(start_y, start_y + length):
            o_x.append(i)
            o_y.append(j)
            o_dict[(i, j)] = path


class FlowField:
    def __init__(
        self,
        obs_grid: Dict[Coord, str],
        goal_x: float, goal_y: float,
        start_x: float, start_y: float,
        limit_x: int, limit_y: int,
    ):
        self.start_pt = (start_x, start_y)
        self.goal_pt = (goal_x, goal_y)
        self.obs_grid = obs_grid
        self.limit_x, self.limit_y = limit_x, limit_y
        self.cost_field: Dict[Coord, int] = {}
        self.integration_field: Dict[Coord, int] = {}
        self.vector_field: Dict[Coord, Coord] = {}

    def find_path(self) -> None:
        self.create_cost_field()
        self.create_integration_field()
        self.assign_vectors()
        self.follow_vectors()

    def create_cost_field(self) -> None:
        """Assign cost to each grid which defines the energy
        it would take to get there."""
        for i in range(self.limit_x):
            for j in range(self.limit_y):
                coord = (i, j)
                if self.obs_grid[coord] == 'free':
                    self.cost_field[coord] = 1
                elif self.obs_grid[coord] == 'medium':
                    self.cost_field[coord] = 7
                elif self.obs_grid[coord] == 'hard':
                    self.cost_field[coord] = 20
                elif self.obs_grid[coord] == 'obs':
                    continue

                if coord == self.goal_pt:
                    self.cost_field[coord] = 0

    def create_integration_field(self) -> None:
        """Start from the goal node and calculate the value
        of the integration field at each node. Start by
        assigning a value of infinity to every node except
        the goal node which is assigned a value of 0. Put the
        goal node in the open list and then get its neighbors
        (must not be obstacles). For each neighbor, the new
        cost is equal to the cost of the current node in the
        integration field (in the beginning, this will simply
        be the goal node) + the cost of the neighbor in the
        cost field + the extra cost (optional). The new cost
        is only assigned if it is less than the previously
        assigned cost of the node in the integration field and,
        when that happens, the neighbor is put on the open list.
        This process continues until the open list is empty."""
        for i in range(self.limit_x):
            for j in range(self.limit_y):
                if self.obs_grid[(i, j)] == 'obs':
                    continue
                self.integration_field[(i, j)] = np.inf
                if (i, j) == self.goal_pt:
                    self.integration_field[(i, j)] = 0

        open_list = [(self.goal_pt, 0)]
        while open_list:
            curr_pos, curr_cost = open_list[0]
            curr_x, curr_y = curr_pos
            for i in range(-1, 2):
                for j in range(-1, 2):
                    x, y = curr_x + i, curr_y + j
                    if self.obs_grid[(x, y)] == 'obs':
                        continue
                    if (i, j) in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
                        e_cost = 10
                    else:
                        e_cost = 14
                    neighbor_energy = self.cost_field[(x, y)]
                    neighbor_old_cost = self.integration_field[(x, y)]
                    neighbor_new_cost = curr_cost + neighbor_energy + e_cost
                    if neighbor_new_cost < neighbor_old_cost:
                        self.integration_field[(x, y)] = neighbor_new_cost
                        open_list.append(([x, y], neighbor_new_cost))
            del open_list[0]

    def assign_vectors(self) -> None:
        """For each node, assign a vector from itself to the node with
        the lowest cost in the integration field. An agent will simply
        follow this vector field to the goal"""
        for i in range(self.limit_x):
            for j in range(self.limit_y):
                if self.obs_grid[(i, j)] == 'obs':
                    continue
                if (i, j) == self.goal_pt:
                    self.vector_field[(i, j)] = (None, None)
                    continue
                offset_list = [(i + a, j + b)
                               for a in range(-1, 2)
                               for b in range(-1, 2)]
                neighbor_list = [{'loc': pt,
                                  'cost': self.integration_field[pt]}
                                 for pt in offset_list
                                 if self.obs_grid[pt] != 'obs']
                neighbor_list = sorted(neighbor_list, key=lambda x: x['cost'])
                best_neighbor = neighbor_list[0]['loc']
                self.vector_field[(i, j)] = best_neighbor

    def follow_vectors(self) -> None:
        curr_x, curr_y = self.start_pt
        while curr_x is not None and curr_y is not None:
            curr_x, curr_y = self.vector_field[(curr_x, curr_y)]
            if curr_x is None or curr_y is None:
                break

            if show_animation:
                plt.plot(curr_x, curr_y, "b*")
                plt.pause(0.001)

        if show_animation:
            plt.show()


def main():
    # set obstacle positions
    obs_dict = {}
    for i in range(51):
        for j in range(51):
            obs_dict[(i, j)] = 'free'
    o_x, o_y, m_x, m_y, h_x, h_y = [], [], [], [], [], []

    s_x = 5.0
    s_y = 5.0
    g_x = 35.0
    g_y = 45.0

    # draw outer border of maze
    draw_vertical_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
    draw_vertical_line(48, 0, 50, o_x, o_y, obs_dict, 'obs')
    draw_horizontal_line(0, 0, 50, o_x, o_y, obs_dict, 'obs')
    draw_horizontal_line(0, 48, 50, o_x, o_y, obs_dict, 'obs')

    # draw inner walls
    all_x = [10, 10, 10, 15, 20, 20, 30, 30, 35, 30, 40, 45]
    all_y = [10, 30, 45, 20, 5, 40, 10, 40, 5, 40, 10, 25]
    all_len = [10, 10, 5, 10, 10, 5, 20, 10, 25, 10, 35, 15]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_vertical_line(x, y, l, o_x, o_y, obs_dict, 'obs')

    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [35, 40, 15, 10, 45, 20, 10, 15, 25, 45, 10, 30, 10, 40]
    all_y = [5, 10, 15, 20, 20, 25, 30, 35, 35, 35, 40, 40, 45, 45]
    all_len = [10, 5, 10, 10, 5, 5, 10, 5, 10, 5, 10, 5, 5, 5]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_horizontal_line(x, y, l, o_x, o_y, obs_dict, 'obs')

    # Some points are assigned a slightly higher energy value in the cost
    # field. For example, if an agent wishes to go to a point, it might
    # encounter different kind of terrain like grass and dirt. Grass is
    # assigned medium difficulty of passage (color coded as green on the
    # map here). Dirt is assigned hard difficulty of passage (color coded
    # as brown here). Hence, this algorithm will take into account how
    # difficult it is to go through certain areas of a map when deciding
    # the shortest path.

    # draw paths that have medium difficulty (in terms of going through them)
    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [10, 45]
    all_y = [22, 20]
    all_len = [8, 5]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_vertical_line(x, y, l, m_x, m_y, obs_dict, 'medium')

    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [20, 30, 42] + [47] * 5
    all_y = [35, 30, 38] + [37 + i for i in range(2)]
    all_len = [5, 7, 3] + [1] * 3
    for x, y, l in zip(all_x, all_y, all_len):
        draw_horizontal_line(x, y, l, m_x, m_y, obs_dict, 'medium')

    # draw paths that have hard difficulty (in terms of going through them)
    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [15, 20, 35]
    all_y = [45, 20, 35]
    all_len = [3, 5, 7]
    for x, y, l in zip(all_x, all_y, all_len):
        draw_vertical_line(x, y, l, h_x, h_y, obs_dict, 'hard')

    all_x[:], all_y[:], all_len[:] = [], [], []
    all_x = [30] + [47] * 5
    all_y = [10] + [37 + i for i in range(2)]
    all_len = [5] + [1] * 3
    for x, y, l in zip(all_x, all_y, all_len):
        draw_horizontal_line(x, y, l, h_x, h_y, obs_dict, 'hard')

    if show_animation:
        plt.plot(o_x, o_y, "sr")
        plt.plot(m_x, m_y, "sg")
        plt.plot(h_x, h_y, "sy")
        plt.plot(s_x, s_y, "og")
        plt.plot(g_x, g_y, "o")
        plt.grid(True)

    flow_obj = FlowField(obs_dict, g_x, g_y, s_x, s_y, 50, 50)
    flow_obj.find_path()


if __name__ == '__main__':
    main()

    Добавить комментарий

    Ваш адрес email не будет опубликован. Обязательные поля помечены *