In case you have any remarks or questions on these tutorials they are always welcome, preferably via the slack channel wiki-content-feedback. (You can notify the education committee specifically by adding @educo to your message.) You may also send us an email at education@serpentineai.nl.
In this lesson we are going to talk about pathfinding for images. If you have followed the Pommerman tutorial, this version will be very similar, if not, no worries .
The main goal will be to create a path from a predetermined starting point to an end point. If you are unfamiliar with BFS or A* (a star), take a look at the following tutorial.
Following good programming practice it is generally good to make a separate class for the pathfinding. This makes it easies to reuse for other cases, and make the agent.py
only contain the main logic.
Implement BFS (or/and A*) and test it on the following image, with start and end coordinates as (10, 10) and (500, 500) respectively. Assume that you can't walk on the black tiles, and have to use the white ones only.
Tips:
skeleton
tab contains, a template for the pathfinding class.cv2.imread(image.png, cv2.IMREAD_GRAYSCALE)
skeleton - solved
tab contains the solution per subsections, so you can look at one part without seeing the full solution.Extra:
import colorsys
import heapq
from collections import deque
from typing import List, Tuple
import cv2
import numpy as np
Point = Tuple[int, int]
class Pathfinder:
def __init__(self, threshold=127):
""" Initialze threshold. """
def in_bounds(self, col: int, row: int) -> bool:
""" Return True if the position is positive and inside the bounds. """
def passable(self, img: np.ndarray, col: int, row: int) -> bool:
""" Return True if the board position is passable. """
def neighbors(self, img: np.ndarray, col: int, row: int, dist: int = 1) -> List[Point]:
""" Return all valid neighbours of a selected point. """
def bfs(self, image: np.ndarray, start: Point, end: Point, dist=1) -> Tuple[dict, dict]:
""" Perform a bfs to locate the end point. """
def reverse_path(self, came_from: dict, end: Point) -> List[Point]:
""" Return a list of directions to take to reach the end point (empty if unreachable). """
def __init__(self, threshold=127):
self.threshold = np.array(threshold)
self.height, self.width = None, None # Place holders will be calculated on the fly.
def in_bounds(self, col: int, row: int) -> bool:
""" Return True if the x and y coordinates are in bound for the specified width and height. """
return 0 <= col < self.width and 0 <= row < self.height
In this code the possibily for RGB image is taken into account. For gray scale images only line 4 is required.
def passable(self, img: np.ndarray, col: int, row: int) -> bool:
""" Return true if the image is passable on that location. """
if self.threshold.size == 1:
return img[row, col] > self.threshold
return all(img[row, col] > self.threshold) # Handle RGB images, is a bonus
def neighbors(self, img: np.ndarray, col: int, row: int, dist: int = 1) -> List[Point]:
""" Returns all directly valid adjacent neighbors (horizontal and vertical neighbors). """
neighbors = [(col + dist, row), (col - dist, row), (col, row - dist), (col, row + dist)] # E W N S
if (col + row) % 2 == 0: neighbors.reverse() # S N W E
results = [(x, y) for x, y in neighbors if self.in_bounds(x, y)]
results = [(x, y) for x, y in results if self.passable(img, x, y)]
return results
When we are always going over the different neighbors in the same order, there will be some preference for exploring the first ocurring direction (Right
or E(ast)
). This will lead to paths going around the outside. Now for us human we usually prefer to take a more direct path, and therefore we have to reverse the neighbors directions after every other step. Visually it will look like this:
X X X X X X 0 0
0 0 0 X 0 X X 0
0 0 0 X 0 0 X X
0 0 0 X 0 0 0 X
Where the left value performs the pathfinding without any changes, and the right on with adding a neighbors.reverse
part after every other step. There is a quick trick of getting the right pattern, by reversing
the values where the sum of the x
and y
index are even. Since odd and even will always switch between steps, if x+y
is even, any step will change that number to odd, and vice versa. In code it will be similar to:
neighbors = [(col + dist, row), ...] # E W N S
if (x + y) % 2 == 0: neighbors.reverse() # S N W E
For both pathfinding, bfs and a*, we also implemented a cost, which is equal to the distance from a random point on the iamge, with respect to our start position. This is optional and not required, but help visualize the points that the algorithms have visited.
def bfs(self, image: np.ndarray, start: Point, end: Point, dist=1) -> Tuple[dict, dict]:
""" Perform a BFS on an image, going from start to end. """
frontier = deque()
frontier.appendleft(start)
came_from = dict()
came_from[start] = None
came_from_distance = dict()
came_from_distance[start] = 0
self.height, self.width = image.shape[:2]
while len(frontier):
current = frontier.popleft()
if current == end: break
for next_node in self.neighbors(image, *current, dist=dist):
if next_node not in came_from:
frontier.append(next_node)
came_from[next_node] = current
came_from_distance[next_node] = came_from_distance[current] + 1
return came_from, came_from_distance
def a_star(self, image: np.ndarray, start: Point, end: Point, dist=1) -> Tuple[dict, dict]:
""" Perform A* on an image, going from start to end. """
frontier = []
heapq.heappush(frontier, (0, start)) # Elements, (priority, key)
came_from = dict()
cost_so_far = dict()
came_from[start] = None
cost_so_far[start] = 0
self.height, self.width = image.shape[:2]
while len(frontier):
cost, current = heapq.heappop(frontier)
if current == end: break
for next_node in self.neighbors(image, *current, dist=dist):
new_cost = cost_so_far[current] + 1
if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
cost_so_far[next_node] = new_cost
came_from[next_node] = current
priority = new_cost + (next_node[0] - end[0]) ** 2 + (next_node[1] - end[1]) ** 2
heapq.heappush(frontier, (priority, next_node))
return came_from, cost_so_far
def reverse_path(self, came_from: dict, end: Point) -> List[Point]:
""" Generate a path based on a dictionary of parent relations and the original end goal. """
path, parent = [end], came_from.get(end, None)
while parent is not None:
path.append(parent)
parent = came_from[parent]
return list(reversed(path))
class Pathfinder:
"""
Contains BFS and A* implementations for images.
Parameters
----------
- threshold : Tuple[int, ...]
color that is used for determining if a pixel is passable.
If the pixel value is larger than the Threshold it can be passed
otherwise it cannot. By default it assumes an RGB image.
"""
def __init__(self, threshold=(127, 127, 127)):
self.threshold = np.array(threshold)
self.height, self.width = None, None # Place holders will be calculated on the fly.
def in_bounds(self, x: int, y: int) -> bool:
""" Return True if the x and y coordinates are in bound for the specified width and height. """
return 0 <= x < self.width and 0 <= y < self.height
def passable(self, img: np.ndarray, x: int, y: int) -> bool:
""" Return true if the image is passable on that location. """
if self.threshold.size == 1:
return img[y, x] > self.threshold
return all(img[y, x] > self.threshold)
def neighbors(self, img: np.ndarray, x: int, y: int, dist: int = 1) -> List[Point]:
""" Returns all directly valid adjacent neighbors (horizontal and vertical neighbors). """
neighbors = [(x + dist, y), (x - dist, y), (x, y - dist), (x, y + dist)] # E W N S
if (x + y) % 2 == 0: neighbors.reverse() # S N W E
results = [(x, y) for x, y in neighbors if self.in_bounds(x, y)]
results = [(x, y) for x, y in results if self.passable(img, x, y)]
return results
def bfs(self, image: np.ndarray, start: Point, end: Point, dist=1) -> Tuple[dict, dict]:
""" Perform a BFS on an image, going from start to end. """
frontier = deque()
frontier.appendleft(start)
came_from = dict()
came_from[start] = None
came_from_distance = dict()
came_from_distance[start] = 0
self.height, self.width = image.shape[:2]
while len(frontier):
current = frontier.popleft()
if current == end: break
for next_node in self.neighbors(image, *current, dist=dist):
if next_node not in came_from:
frontier.append(next_node)
came_from[next_node] = current
came_from_distance[next_node] = came_from_distance[current] + 1
return came_from, came_from_distance
def a_star(self, image: np.ndarray, start: Point, end: Point, dist=1) -> Tuple[dict, dict]:
""" Perform A* on an image, going from start to end. """
frontier = []
heapq.heappush(frontier, (0, start)) # Elements, (priority, key)
came_from = dict()
cost_so_far = dict()
came_from[start] = None
cost_so_far[start] = 0
self.height, self.width = image.shape[:2]
while len(frontier):
cost, current = heapq.heappop(frontier)
if current == end: break
for next_node in self.neighbors(image, *current, dist=dist):
new_cost = cost_so_far[current] + 1
if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
cost_so_far[next_node] = new_cost
came_from[next_node] = current
priority = new_cost + (next_node[0] - end[0]) ** 2 + (next_node[1] - end[1]) ** 2
heapq.heappush(frontier, (priority, next_node))
return came_from, cost_so_far
def reverse_path(self, came_from: dict, end: Point) -> List[Point]:
""" Generate a path based on a dictionary of parent relations and the original end goal. """
path, parent = [end], came_from.get(end, None)
while parent is not None:
path.append(parent)
parent = came_from[parent]
return list(reversed(path))
In order to help visualize the pathfinding the following functions can color in the path and display the cost (if provided) for every cell.
def color_path(self, image, path, padding=0, color=(255, 0, 0)):
"""
Color an image path in a specific color. (Displaying the found path)
Parameters
----------
- img : A gray scale or RGB image
Image for which the path is determined using the `reverse_path` method.
- path : Dict[Tuple[int, int], Tuple[int, int]]
Contains parent information for every location (the trail)
- padding : int
Extra padding on the side of the path, by default the path has a width of 1 pixel.
- color : Tuple[int, int, int]
The path will be drawn using this color.
Returns
--------
The image with the drawing of the path on top of it.
If you do not want this, make sure to copy the image
before inputting it to this method.
"""
if len(image.shape) < 3:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
for dist, (x, y) in enumerate(path):
image[y - padding:y + padding + 1, x - padding:x + padding + 1] = color
return image
def color_cost(self, image: np.ndarray, cost: dict, padding=0) -> np.ndarray:
"""
Color an image path in a specific color. (Displaying the found cost)
Parameters
----------
- img : A gray scale or RGB image
Image for which the path is determined using the `reverse_path` method.
- path : Dict[Tuple[int, int], Tuple[int, int]]
Contains parent information for every location (the trail)
- padding : int
Extra padding on the side of the path, by default the path has a width of 1 pixel.
- color : Tuple[int, int, int]
The path will be drawn using this color.
Returns
--------
The image with the drawing of the path on top of it.
If you do not want this, make sure to copy the image
before inputting it to this method.
"""
if len(image.shape) < 3:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
cost_maximum = max(cost.values()) * 3
for (x, y), cost in cost.items():
color = colorsys.hsv_to_rgb(cost / (cost_maximum + 1), 1, 1)
color = list(reversed([i * 255 for i in color]))
image[y - padding:y + padding + 1, x - padding:x + padding + 1] = color
return image
import cv2
from core.pathfinder import Pathfinder
if __name__ == '__main__':
"""
Minimal working example of testing the pathfinding.
The commented lines are for a downsampled maze.
"""
maze = cv2.imread('maze-clean.png', cv2.IMREAD_GRAYSCALE)
# maze = cv2.resize(maze, (25, 25), interpolation=cv2.INTER_LINEAR_EXACT)
pathfinder = Pathfinder(threshold=127)
came_from, came_from_distance = pathfinder.bfs(maze, (10, 10), (510, 510))
path = pathfinder.reverse_path(came_from, (500, 500))
# came_from, came_from_distance = pathfinder.bfs(maze, (0, 0), (24, 24))
# path = pathfinder.reverse_path(came_from, (24, 24))
maze = pathfinder.color_cost(maze, came_from_distance)
maze = pathfinder.color_path(maze, path, padding=0)
# maze = cv2.resize(maze, (512, 512), interpolation=cv2.INTER_NEAREST_EXACT)
cv2.imshow('Result', maze)
cv2.waitKey(0)
The left shows the result of the unscaled image, the right shows the result of a downscaled image. The advantage of the downscaled version is that the pathfinding is orders of magnitude faster, since the grid has gone from 512x512 to only 25x25 pixels.
Now that you have an example of the pathfinding, it is required to implement it in the agent. A small thing you might want to consider is that we are using a downsampled image, instead of the original image. This is no problem, because we also scaled the start and end location of the sprites in the previous lesson at the xy-location
.
Next to determining a path, we also have to translate the found path to an action to execture. For this you can use the following information
Action | info | int | row diff | column diff |
---|---|---|---|---|
Stop | This action is a pass. | 0 | 0 | 0 |
Left | Move left on the maze. | 1 | 0 | -1 |
Up | Move up on the maze. | 3 | -1 | 0 |
Down | Move down on the maze. | 5 | 1 | 0 |
Right | Move right on the maze. | 7 | 0 | 1 |
Using the above it is possible to determine the difference between the current position of the player and the next location according to the calculated path. Finalize the agent code, by implementing pathfinding, using the sprites center for start and end cooridinates, and determine the right action from the path.
Hint: it is a good practice to separate the path finding and the computation of the action, for this you can use the following skeleton methods.
from typing import Tuple, Dict, List
...
class Agent:
def extract_path(self, maze: np.ndarray, centers: Dict[str, Point], draw_on_image=False) -> List[Point]:
""" Perform pathfinding on the maze, and create a path from the player to the goal. """
def extract_action(self, path: List[Point]) -> int:
""" Determines the action to execute based on the current and next step in the path. """
from core.pathfinding import Pathfinder
class Agent:
def __init__(self, venv: procgen.ProcgenEnv):
self.action_space = venv.action_space.n
self.pathfinder = Pathfinder(threshold=127)
...
def compute_action(self, observation: np.ndarray) -> np.ndarray:
""" Calculate the best action for the agent. """
maze, scale = self.extract_maze(observation, grid_size=(25, 25))
sprites = self.extract_sprites(observation, scale)
if len(sprites) == 2:
path = self.extract_path(maze, sprites, draw_on_image=True)
action = self.extract_action(path)
return np.array([action])
return np.random.randint(0, self.action_space, 1)
def extract_path(self, maze: np.ndarray, centers: Dict[str, Point], draw_on_image=False) -> List[Point]:
""" Perform pathfinding on the maze, and create a path from the player to the goal. """
[self.remove_sprite(maze, (x, y, 1, 1)) for x, y in centers.values()]
came_from, came_from_distance = self.pathfinder.bfs(maze, centers['player'], centers['goal'])
path = self.pathfinder.reverse_path(came_from, centers['goal'])
if draw_on_image:
maze = self.pathfinder.color_cost(maze, came_from_distance)
maze = self.pathfinder.color_path(maze, path, padding=0)
maze = cv2.resize(maze, (512, 512), interpolation=cv2.INTER_NEAREST_EXACT)
self.show(maze, window_name='Full maze')
return path
def extract_action(self, path: List[Point]) -> int:
""" Determines the action to execute based on the current and next step in the path. """
direction_col, direction_row = np.array(path[1]) - np.array(path[0])
if abs(direction_row) > 0: return 3 if direction_row > 0 else 5 # Down, Up
if abs(direction_col) > 0: return 7 if direction_col > 0 else 1 # Right, Left
return 0
If you have followed the tutorial up unto this point, you should see a similar results as on the left side. The right side is displaying how to the pathfinding is used to create a path from the mouse to the cheese.
In this lesson we have taken a look at implementing a pathfinding algorithm for images, and how we can follow this path. By implementing the code of this lesson, you should have a working agent that solves the environment consistent and relatively quick.
The main goal of General Video Game Playing, is that you can make agents that work on all kind of different environments. Therefore in the next lesson we are going to see how we can adapt this code to solve a different procgen game, namely Heist
.