Hi All,
I am current leaning how to build a custom environment but am struggling to understand how to properly define my observation spec for my environment.
Just a quick bit of context, when I try and lean some new coding language I like to try and build a solver for a edge matching puzzle like a 4x4 EternityII clone. So in this case I would like to build a environment that represents the game a then build a agent to play it.
this is the code I have so far:
import copy
import numpy as np
import string
from tf_agents.environments import py_environment
from tf_agents.specs import BoundedArraySpec
from tf_agents.trajectories.time_step import StepType
from tf_agents.trajectories.time_step import TimeStep
class Tile:
def __init__(self, id, sides):
self.id = id
self.orientation = 0
self.sides = sides
def rotate(self, rotation):
if rotation == 0:
self.orientation = (self.orientation + 1) % 4
self.sides = np.roll(self.sides, 1)
elif rotation == 1:
self.orientation = (self.orientation - 1) % 4
self.sides = np.roll(self.sides, -1)
elif rotation == 2:
self.orientation = (self.orientation + 2) % 4
self.sides = np.roll(self.sides, 2)
def render(self):
side_string = ""
for side in self.sides:
side_string += string.ascii_lowercase[side]
return side_string
class puzzleEnv(py_environment.PyEnvironment):
def __init__(self, tile_set, discount=1.0):
super(puzzleEnv, self).__init__(handle_auto_reset=True)
self.tile_set = tile_set
self.board = self._generate_initial_state(tile_set=tile_set)
self._action_spec = BoundedArraySpec(
shape=(3, ), dtype=np.int32, minimum=[0, 0, 0], maximum=[3, 15, 15], name='action')
self._observation_spec = BoundedArraySpec(
shape=(4, 4, 4), dtype=np.int32, minimum=0, maximum=15, name='observation')
self._discount = np.asarray(discount, dtype=np.float32)
self._states = None
self._episode_ended = False
def action_spec(self):
return self._action_spec
def observation_spec(self):
return self._observation_spec
def _reset(self):
"""Return initial_time_step."""
self.board = self._generate_initial_state(self.tile_set)
return TimeStep(StepType.FIRST, np.asarray(0.0, dtype=np.float32),
self._discount, self._states)
def _step(self, action):
"""Apply action and return new time_step."""
is_final, reward, _ = self._check_states()
if is_final:
return TimeStep(StepType.LAST, reward, self._discount, self._states)
if action[0] == 3:
self._swap_tiles(action[1], action[2])
else:
self._rotate_tile(action[0], action[1])
is_final, reward, _ = self._check_states()
step_type = StepType.MID
if np.all(self._states == 0):
step_type = StepType.FIRST
elif is_final:
step_type = StepType.LAST
return TimeStep(step_type, reward, self._discount, self._states)
def _check_states(self):
flat_board = self.board.flatten()
top_edges = np.asarray([e.sides[0] for e in flat_board]).reshape(4,4)
right_edges = np.asarray([e.sides[1] for e in flat_board]).reshape(4,4)
bottom_edges = np.asarray([e.sides[2] for e in flat_board]).reshape(4,4)
left_edges = np.asarray([e.sides[3] for e in flat_board]).reshape(4,4)
bottom_edges = np.roll(bottom_edges, 1, axis=0)
left_edges = np.roll(left_edges, -1, axis=1)
solved_edges = np.count_nonzero(top_edges == bottom_edges) + np.count_nonzero(right_edges == left_edges)
reward = solved_edges / (4 * 4 * 2)
is_final = reward >= 1.0
return is_final, reward, solved_edges
def _generate_initial_state(self, tile_set, board_size=(4, 4)):
tiles = np.ndarray((board_size[0]*board_size[1], ), dtype=object)
for i, tile in enumerate(tile_set):
tiles[i] = Tile(i, tile)
# np.random.shuffle(tiles)
board = tiles.reshape(board_size)
return board
def _rotate_tile(self, rotation, board_position):
tile1_x, tile1_y = np.unravel_index(board_position, self.board.shape)
self.board[tile1_x, tile1_y].rotate(rotation)
def _swap_tiles(self, board_position_1, board_position_2):
tile1_x, tile1_y = np.unravel_index(board_position_1, self.board.shape)
tile2_x, tile2_y = np.unravel_index(board_position_2, self.board.shape)
# swap the tiles
self.board[tile1_x, tile1_y], self.board[tile2_x, tile2_y] = self.board[tile2_x, tile2_y], self.board[tile1_x, tile1_y]
def get_state(self) -> TimeStep:
# Returning an unmodifiable copy of the state.
return copy.deepcopy(self._current_time_step)
def set_state(self, time_step: TimeStep):
self._current_time_step = time_step
self._states = time_step.observation
def render(self, mode='bucas'):
if mode == 'bucas':
board_edges = ""
board_pieces = ""
for i, tile in enumerate(self.board.flatten()):
board_pieces += str(tile.id).zfill(3)
board_edges += tile.render()
return board_edges, board_pieces
else:
raise ValueError("Invalid render mode: {}".format(mode))
The game here is a 4x4 grid with each place on the grid having a title and the tile has four side, so I believe the observation spec should be shaped like (4, 4, 4)
but my ‘board’ object is a (4, 4)
of Tile
objects.
How should I be handling the observation spec?
Also happy to hear any suggestions on code improvements.
Thanks,