Understanding observation spec Custom environment

Hi All,
I am current leaning how to build a custom environment but am struggling to understand how to properly define my observation spec for my environment.

Just a quick bit of context, when I try and lean some new coding language I like to try and build a solver for a edge matching puzzle like a 4x4 EternityII clone. So in this case I would like to build a environment that represents the game a then build a agent to play it.

this is the code I have so far:

import copy
import numpy as np
import string

from tf_agents.environments import py_environment
from tf_agents.specs import BoundedArraySpec
from tf_agents.trajectories.time_step import StepType
from tf_agents.trajectories.time_step import TimeStep


class Tile:
    def __init__(self, id, sides):
        self.id = id
        self.orientation = 0
        self.sides = sides

    def rotate(self, rotation):
        if rotation == 0:
            self.orientation = (self.orientation + 1) % 4
            self.sides = np.roll(self.sides, 1)
        elif rotation == 1:
            self.orientation = (self.orientation - 1) % 4
            self.sides = np.roll(self.sides, -1)
        elif rotation == 2:
            self.orientation = (self.orientation + 2) % 4
            self.sides = np.roll(self.sides, 2)

    def render(self):
        side_string = ""
        for side in self.sides:
            side_string += string.ascii_lowercase[side]
        return side_string


class puzzleEnv(py_environment.PyEnvironment):
    def __init__(self, tile_set, discount=1.0):
        super(puzzleEnv, self).__init__(handle_auto_reset=True)

        self.tile_set = tile_set
        self.board = self._generate_initial_state(tile_set=tile_set)

        self._action_spec = BoundedArraySpec(
            shape=(3, ), dtype=np.int32, minimum=[0, 0, 0], maximum=[3, 15, 15], name='action')
        self._observation_spec = BoundedArraySpec(
            shape=(4, 4, 4), dtype=np.int32, minimum=0, maximum=15, name='observation')

        self._discount = np.asarray(discount, dtype=np.float32)
        self._states = None
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        """Return initial_time_step."""
        self.board = self._generate_initial_state(self.tile_set)
        return TimeStep(StepType.FIRST, np.asarray(0.0, dtype=np.float32),
                        self._discount, self._states)

    def _step(self, action):
        """Apply action and return new time_step."""
        is_final, reward, _ = self._check_states()
        if is_final:
            return TimeStep(StepType.LAST, reward, self._discount, self._states)

        if action[0] == 3:
            self._swap_tiles(action[1], action[2])
        else:
            self._rotate_tile(action[0], action[1])

        is_final, reward, _ = self._check_states()

        step_type = StepType.MID
        if np.all(self._states == 0):
            step_type = StepType.FIRST
        elif is_final:
            step_type = StepType.LAST

        return TimeStep(step_type, reward, self._discount, self._states)

    def _check_states(self):
        flat_board = self.board.flatten()
        top_edges =      np.asarray([e.sides[0] for e in flat_board]).reshape(4,4)
        right_edges =    np.asarray([e.sides[1] for e in flat_board]).reshape(4,4)
        bottom_edges =   np.asarray([e.sides[2] for e in flat_board]).reshape(4,4)
        left_edges =     np.asarray([e.sides[3] for e in flat_board]).reshape(4,4)

        bottom_edges = np.roll(bottom_edges, 1, axis=0)
        left_edges = np.roll(left_edges, -1, axis=1)

        solved_edges = np.count_nonzero(top_edges == bottom_edges) + np.count_nonzero(right_edges == left_edges)

        reward = solved_edges / (4 * 4 * 2)
        is_final = reward >= 1.0
        return is_final, reward, solved_edges

    def _generate_initial_state(self, tile_set, board_size=(4, 4)):
        tiles = np.ndarray((board_size[0]*board_size[1], ), dtype=object)
        for i, tile in enumerate(tile_set):
            tiles[i] = Tile(i, tile)
        # np.random.shuffle(tiles)
        board = tiles.reshape(board_size)
        return board

    def _rotate_tile(self, rotation, board_position):
        tile1_x, tile1_y = np.unravel_index(board_position, self.board.shape)
        self.board[tile1_x, tile1_y].rotate(rotation)

    def _swap_tiles(self, board_position_1, board_position_2):
        tile1_x, tile1_y = np.unravel_index(board_position_1, self.board.shape)
        tile2_x, tile2_y = np.unravel_index(board_position_2, self.board.shape)
        # swap the tiles
        self.board[tile1_x, tile1_y], self.board[tile2_x, tile2_y] = self.board[tile2_x, tile2_y], self.board[tile1_x, tile1_y]

    def get_state(self) -> TimeStep:
        # Returning an unmodifiable copy of the state.
        return copy.deepcopy(self._current_time_step)

    def set_state(self, time_step: TimeStep):
        self._current_time_step = time_step
        self._states = time_step.observation

    def render(self, mode='bucas'):
        if mode == 'bucas':
            board_edges = ""
            board_pieces = ""
            for i, tile in enumerate(self.board.flatten()):
                board_pieces += str(tile.id).zfill(3)
                board_edges += tile.render()
            return board_edges, board_pieces
        else:
            raise ValueError("Invalid render mode: {}".format(mode))

The game here is a 4x4 grid with each place on the grid having a title and the tile has four side, so I believe the observation spec should be shaped like (4, 4, 4) but my ‘board’ object is a (4, 4) of Tile objects.

How should I be handling the observation spec?

Also happy to hear any suggestions on code improvements.

Thanks,

I have made some incremental steps, environment now looks like:

import copy
import numpy as np
import string

from tf_agents.environments import py_environment
from tf_agents.specs import BoundedArraySpec
from tf_agents.trajectories.time_step import StepType
from tf_agents.trajectories.time_step import TimeStep


class Tile:
    def __init__(self, id, sides):
        self.id = id
        self.orientation = 0
        self.sides = sides

    def rotate(self, rotation):
        if rotation == 0:
            self.orientation = (self.orientation + 1) % 4
            self.sides = np.roll(self.sides, 1)
        elif rotation == 1:
            self.orientation = (self.orientation - 1) % 4
            self.sides = np.roll(self.sides, -1)
        elif rotation == 2:
            self.orientation = (self.orientation + 2) % 4
            self.sides = np.roll(self.sides, 2)

    def render(self):
        side_string = ""
        for side in self.sides:
            side_string += string.ascii_lowercase[side]
        return side_string


class puzzleEnv(py_environment.PyEnvironment):
    def __init__(self, tile_set, discount=1.0):
        super(puzzleEnv, self).__init__(handle_auto_reset=True)

        self.tile_set = tile_set
        self.board = self._generate_initial_state(tile_set=tile_set)

        self._action_spec = BoundedArraySpec(
            shape=(3, ), dtype=np.int32, minimum=[0, 0, 0], maximum=[3, 15, 15], name='action')
        self._observation_spec = BoundedArraySpec(
            shape=(4, 4, 4), dtype=np.int32, minimum=0, maximum=21, name='observation')

        self._discount = np.asarray(discount, dtype=np.float32)
        self._state = np.zeros((4,4,4) , dtype=np.int32)
        self._board_to_state()
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        """Return initial_time_step."""
        self.board = self._generate_initial_state(self.tile_set)
        self._board_to_state()
        return TimeStep(StepType.FIRST, np.asarray(0.0, dtype=np.float32),
                        self._discount, self._state)

    def _step(self, action):
        """Apply action and return new time_step."""
        is_final, reward, _ = self._check_state()
        if is_final:
            return TimeStep(StepType.LAST, reward, self._discount, self._state)

        if action[0] == 3:
            self._swap_tiles(action[1], action[2])
        else:
            self._rotate_tile(action[0], action[1])

        is_final, reward, _ = self._check_state()

        step_type = StepType.MID
        if np.all(self._state == 0):
            step_type = StepType.FIRST
        elif is_final:
            step_type = StepType.LAST

        return TimeStep(step_type, reward, self._discount, self._state)

    def _check_state(self):
        self._board_to_state()
        flat_board = self.board.flatten()
        top_edges =      np.asarray([e.sides[0] for e in flat_board]).reshape(4,4)
        right_edges =    np.asarray([e.sides[1] for e in flat_board]).reshape(4,4)
        bottom_edges =   np.asarray([e.sides[2] for e in flat_board]).reshape(4,4)
        left_edges =     np.asarray([e.sides[3] for e in flat_board]).reshape(4,4)

        bottom_edges = np.roll(bottom_edges, 1, axis=0)
        left_edges = np.roll(left_edges, -1, axis=1)

        solved_edges = np.count_nonzero(top_edges == bottom_edges) + np.count_nonzero(right_edges == left_edges)

        reward = solved_edges / (4 * 4 * 2)
        is_final = reward >= 1.0
        return is_final, reward, solved_edges

    def _generate_initial_state(self, tile_set, board_size=(4, 4)):
        tiles = np.ndarray((board_size[0]*board_size[1], ), dtype=object)
        for i, tile in enumerate(tile_set):
            tiles[i] = Tile(i, tile)
        # np.random.shuffle(tiles)
        board = tiles.reshape(board_size)
        return board

    def _board_to_state(self):
        for i in range(4):
            for j in range(4):
                self._state[i, j, :] = self.board[i, j].sides

    def _rotate_tile(self, rotation, board_position):
        tile1_x, tile1_y = np.unravel_index(board_position, self.board.shape)
        self.board[tile1_x, tile1_y].rotate(rotation)

    def _swap_tiles(self, board_position_1, board_position_2):
        tile1_x, tile1_y = np.unravel_index(board_position_1, self.board.shape)
        tile2_x, tile2_y = np.unravel_index(board_position_2, self.board.shape)
        # swap the tiles
        self.board[tile1_x, tile1_y], self.board[tile2_x, tile2_y] = self.board[tile2_x, tile2_y], self.board[tile1_x, tile1_y]

    def get_state(self) -> TimeStep:
        # Returning an unmodifiable copy of the state.
        return copy.deepcopy(self._current_time_step)

    def set_state(self, time_step: TimeStep):
        self._current_time_step = time_step
        # self._state = time_step.observation
        self._board_to_state()

    def render(self, mode='bucas'):
        if mode == 'bucas':
            board_edges = ""
            board_pieces = ""
            for i, tile in enumerate(self.board.flatten()):
                board_pieces += str(tile.id).zfill(3)
                board_edges += tile.render()
            return board_edges, board_pieces
        else:
            raise ValueError("Invalid render mode: {}".format(mode))

and have hopefully worked out how to validate it with:

import Environment as Environment
import numpy as np

from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils

def load_pieces(file_path):
    pieces = []
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                pieces.append(np.array([int(x) for x in line.split(' ')]))
    return pieces

tile_set = load_pieces('./pieces4x4.txt')
env = Environment.puzzleEnv(tile_set=tile_set)
utils.validate_py_environment(env, episodes=5)

and am currently working through the error:

Exception has occurred: ValueError
Given `time_step`: TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([[[ 1, 17,  0,  0],
        [ 1,  5,  0,  0],
        [ 9, 17,  0,  0],
        [17,  9,  0,  0]],

       [[12, 13,  0,  9],
        [21,  1,  0,  9],
        [ 8, 17,  0, 13],
        [16, 13,  0, 17]],

       [[15,  9,  0,  5],
        [20,  5,  0,  5],
        [10,  1,  0, 13],
        [ 8,  9,  0, 17]],

       [[ 6, 20, 10, 21],
        [ 8, 15,  6,  8],
        [21,  8, 14, 16],
        [12,  8, 14, 21]]], dtype=int32),
 'reward': 0.03125,
 'step_type': array(1, dtype=int32)}) does not match expected `time_step_spec`: TimeStep(
{'discount': BoundedArraySpec(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0),
 'observation': BoundedArraySpec(shape=(4, 4, 4), dtype=dtype('int32'), name='observation', minimum=0, maximum=21),
 'reward': ArraySpec(shape=(), dtype=dtype('float32'), name='reward'),
 'step_type': ArraySpec(shape=(), dtype=dtype('int32'), name='step_type')})
  File "/workspaces/git/puzzle/data/environment_debug.py", line 19, in <module>
    utils.validate_py_environment(env, episodes=5)
ValueError: Given `time_step`: TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([[[ 1, 17,  0,  0],
        [ 1,  5,  0,  0],
        [ 9, 17,  0,  0],
        [17,  9,  0,  0]],

       [[12, 13,  0,  9],
        [21,  1,  0,  9],
        [ 8, 17,  0, 13],
        [16, 13,  0, 17]],

       [[15,  9,  0,  5],
        [20,  5,  0,  5],
        [10,  1,  0, 13],
        [ 8,  9,  0, 17]],

       [[ 6, 20, 10, 21],
        [ 8, 15,  6,  8],
        [21,  8, 14, 16],
        [12,  8, 14, 21]]], dtype=int32),
 'reward': 0.03125,
 'step_type': array(1, dtype=int32)}) does not match expected `time_step_spec`: TimeStep(
{'discount': BoundedArraySpec(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0),
 'observation': BoundedArraySpec(shape=(4, 4, 4), dtype=dtype('int32'), name='observation', minimum=0, maximum=21),
 'reward': ArraySpec(shape=(), dtype=dtype('float32'), name='reward'),
 'step_type': ArraySpec(shape=(), dtype=dtype('int32'), name='step_type')})

I’m not sure but it looks to be a either on the discount or the reward

Got working environment and can run a agent against it, but would still like to know if there are things I should be doing better, thanks. (need a review my environment topic)

anyways here is my current environment:

import copy
import numpy as np
import string

from tf_agents.environments import py_environment
from tf_agents.specs import BoundedArraySpec
from tf_agents.trajectories.time_step import StepType
from tf_agents.trajectories.time_step import TimeStep

class Tile:
    def __init__(self, id, sides):
        self.id = id
        self.orientation = 0
        self.sides = sides

    def rotate(self, rotation):
        if rotation == 0:
            self.orientation = (self.orientation + 1) % 4
            self.sides = np.roll(self.sides, 1)
        elif rotation == 1:
            self.orientation = (self.orientation - 1) % 4
            self.sides = np.roll(self.sides, -1)
        elif rotation == 2:
            self.orientation = (self.orientation + 2) % 4
            self.sides = np.roll(self.sides, 2)

    def render(self):
        side_string = ""
        for side in self.sides:
            side_string += string.ascii_lowercase[side]
        return side_string


class puzzleEnv(py_environment.PyEnvironment):
    def __init__(self, tile_set, discount=1.0):
        super(puzzleEnv, self).__init__(handle_auto_reset=True)

        self.tile_set = tile_set
        self.board = self._generate_initial_state(tile_set=tile_set)

        self._number_of_steps = 0

        self._action_spec = BoundedArraySpec(
            shape=(3, ), dtype=np.int32, minimum=0, maximum=15, name='action')
        self._observation_spec = BoundedArraySpec(
            shape=(4, 4, 4), dtype=np.int32, minimum=0, maximum=22, name='observation')

        self._discount = np.asarray(discount, dtype=np.float32)
        self._state = np.zeros((4, 4, 4), dtype=np.int32)
        self._board_to_state()
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        """Return initial_time_step."""
        self._number_of_steps = 0
        self._episode_ended = False
        self.board = self._generate_initial_state(self.tile_set)
        self._board_to_state()
        return TimeStep(StepType.FIRST, np.asarray(0.0, dtype=np.float32),
                        self._discount, self._state)

    def _step(self, action):
        """Apply action and return new time_step."""
        # Illegal action
        illegal_tile = action[1] < 0 or action[1] > 15 or action[2] < 0 or action[2] > 15
        illegal_action = action[0] < 0 or action[0] > 3
        if illegal_tile or illegal_action:
            return TimeStep(StepType.LAST, np.asarray(-0.001, dtype=np.float32),
                            self._discount, self._state)

        is_final, reward, _ = self._check_state()
        if is_final or self._episode_ended:
            return TimeStep(StepType.LAST, reward, self._discount, self._state)

        self._number_of_steps += 1

        if action[0] == 3:
            self._swap_tiles(action[1], action[2])
        else:
            self._rotate_tile(action[0], action[1])

        is_final, reward, _ = self._check_state()

        step_type = StepType.MID
        if np.all(self._state == 0):
            step_type = StepType.FIRST
        elif is_final:
            step_type = StepType.LAST

        return TimeStep(step_type, reward, self._discount, self._state)

    def _check_state(self):
        self._episode_ended = self._number_of_steps >= (4 * 4 * 3)
        self._board_to_state()
        flat_board = self.board.flatten()
        top_edges = np.asarray([e.sides[0] for e in flat_board]).reshape(4, 4)
        right_edges = np.asarray([e.sides[1] for e in flat_board]).reshape(4, 4)
        bottom_edges = np.asarray([e.sides[2] for e in flat_board]).reshape(4, 4)
        left_edges = np.asarray([e.sides[3] for e in flat_board]).reshape(4, 4)

        bottom_edges = np.roll(bottom_edges, 1, axis=0)
        left_edges = np.roll(left_edges, -1, axis=1)

        solved_edges = np.count_nonzero(
            top_edges == bottom_edges) + np.count_nonzero(right_edges == left_edges)

        reward = solved_edges / (4 * 4 * 2)
        is_final = reward >= 1.0
        return is_final, np.asarray(reward, dtype=np.float32), solved_edges

    def _generate_initial_state(self, tile_set, board_size=(4, 4)):
        tiles = np.ndarray((board_size[0]*board_size[1], ), dtype=object)
        for i, tile in enumerate(tile_set):
            tiles[i] = Tile(i, tile)
        # np.random.shuffle(tiles)
        board = tiles.reshape(board_size)
        return board

    def _board_to_state(self):
        for i in range(4):
            for j in range(4):
                self._state[i, j, :] = self.board[i, j].sides

    def _rotate_tile(self, rotation, board_position):
        tile1_x, tile1_y = np.unravel_index(board_position, self.board.shape)
        self.board[tile1_x, tile1_y].rotate(rotation)

    def _swap_tiles(self, board_position_1, board_position_2):
        tile1_x, tile1_y = np.unravel_index(board_position_1, self.board.shape)
        tile2_x, tile2_y = np.unravel_index(board_position_2, self.board.shape)
        # swap the tiles
        self.board[tile1_x, tile1_y], self.board[tile2_x,
                                                 tile2_y] = self.board[tile2_x, tile2_y], self.board[tile1_x, tile1_y]

    def get_state(self) -> TimeStep:
        # Returning an unmodifiable copy of the state.
        return copy.deepcopy(self._current_time_step)

    def set_state(self, time_step: TimeStep):
        self._current_time_step = time_step
        # self._state = time_step.observation
        self._board_to_state()

    def render(self, mode='bucas'):
        if mode == 'bucas':
            board_edges = ""
            board_pieces = ""
            for i, tile in enumerate(self.board.flatten()):
                board_pieces += str(tile.id).zfill(3)
                board_edges += tile.render()
            return board_edges, board_pieces
        else:
            raise ValueError("Invalid render mode: {}".format(mode))

Thanks for having a look.