imitation_datasets

controller

Controller for running experiments.

Source code in src/imitation_datasets/controller.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class Controller:
    """Controller for running experiments."""

    def __init__(
            self,
            enjoy: EnjoyFunction,
            collate: CollateFunction,
            amount: int,
            threads: int = 1,
            path: str = './dataset/'
    ) -> None:
        """Initialize the controller.

        Args:
            enjoy (EnjoyFunction): Function to run the expert.
            collate (CollateFunction): Function to collate the data.
            amount (int): Amount of episodes to run.
            threads (int, optional): Amount of threads to use. Defaults to 1.
            path (str, optional): Path to save the dataset. Defaults to './dataset/'.
        """
        self.enjoy = enjoy
        self.collate = collate
        self.threads = CPUS(threads)
        self.experiments = Experiment(amount)
        self.path = path

        self.pbar = None
        set_start_method('spawn', force=True)

    def create_folder(self, path: str) -> None:
        """Create a folder if it does not exist.

        Args:
            path (str): Path to the folder.
        """
        if not os.path.exists(path):
            os.makedirs(path)

    async def set_cpu(self, cpu: int) -> None:
        """Set the cpu affinity for the current process.

        Args:
            cpu (int): CPU index to use.
        """
        try:
            proc = psutil.Process()
            proc.cpu_affinity([int(cpu)])
            if 'linux' in platform:
                os.sched_setaffinity(proc.pid, [int(cpu)])
        except OSError:
            pass

    def enjoy_closure(self, opt: Namespace) -> EnjoyFunction:
        """Create a closure for the enjoy function.

        Args:
            opt (Namespace): Namespace with the arguments.

        Returns:
            EnjoyFunction: Enjoy function with part of the arguments.
        """
        os.system("set LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libGLEW.so")
        os.system("set LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/nvidia")
        return partial(self.enjoy, expert=Experts.get_expert(opt.game))

    def collate_closure(self, opt: Namespace) -> CollateFunction:
        """Create a closure for the collate function.

        Args:
            opt (Namespace): Namespace with the arguments.

        Returns:
            CollateFunction: Collate function with part of the arguments.
        """
        path = f'{self.path}{opt.game}/'
        files = list(listdir(path))
        return partial(self.collate, data=files, path=path)

    async def enjoy_sequence(self, future: EnjoyFunction, executor: ProcessPoolExecutor) -> bool:
        """_summary_

        Args:
            future (EnjoyFunction): Enjoy function already with async future.
            executor (ProcessPoolExecutor): Executor to run the future.

        Returns:
            bool: Result of the future.
                  True if the expert was able to solve the game. False otherwise.
        """
        # Pre
        cpu = await self.threads.cpu_allock()
        await self.experiments.start()
        await self.set_cpu(cpu)

        # Enjoy
        result = await asyncio.get_event_loop().run_in_executor(executor, future)

        # Post
        self.threads.cpu_release(cpu)
        await self.experiments.stop(result)
        self.pbar.update(1 if result else 0)

        return result if result else await asyncio.gather(self.enjoy_sequence(future, executor))

    async def run(self, opt) -> None:
        """Run the experiments.

        Args:
            opt (Namespace): Namespace with the arguments.
        """
        path = f'{self.path}{opt.game}/'
        self.create_folder(path)

        tasks = []
        with ProcessPoolExecutor() as executor:
            for idx in range(self.experiments.amount):
                enjoy = self.enjoy_closure(opt)
                enjoy = partial(enjoy, path=path, context=Context(self.experiments, idx))
                task = asyncio.ensure_future(
                    self.enjoy_sequence(
                        enjoy,
                        executor
                    )
                )
                tasks.append(task)
            await asyncio.gather(*tasks)

    def start(self, opt: Namespace):
        """Start the experiments.

        Args:
            opt (Namespace): Namespace with the arguments.

        Raises:
            exception: Exception (general) raised during the execution.
        """
        try:
            if opt.mode in ['all', 'play']:
                self.pbar = tqdm(range(self.experiments.amount), desc='Running episodes')
                asyncio.run(self.run(opt))

            if opt.mode in ['all', 'collate']:
                self.pbar = tqdm(range(self.experiments.amount), desc='Running collate')
                collate = self.collate_closure(opt)
                collate()
        except Exception as exception:
            self.experiments.add_log(-99, exception)
            raise exception
        finally:
            self.experiments.write_log()

__init__(enjoy, collate, amount, threads=1, path='./dataset/')

Initialize the controller.

Parameters:
  • enjoy (EnjoyFunction) –

    Function to run the expert.

  • collate (CollateFunction) –

    Function to collate the data.

  • amount (int) –

    Amount of episodes to run.

  • threads (int, default: 1 ) –

    Amount of threads to use. Defaults to 1.

  • path (str, default: './dataset/' ) –

    Path to save the dataset. Defaults to './dataset/'.

src/imitation_datasets/controller.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
        self,
        enjoy: EnjoyFunction,
        collate: CollateFunction,
        amount: int,
        threads: int = 1,
        path: str = './dataset/'
) -> None:
    """Initialize the controller.

    Args:
        enjoy (EnjoyFunction): Function to run the expert.
        collate (CollateFunction): Function to collate the data.
        amount (int): Amount of episodes to run.
        threads (int, optional): Amount of threads to use. Defaults to 1.
        path (str, optional): Path to save the dataset. Defaults to './dataset/'.
    """
    self.enjoy = enjoy
    self.collate = collate
    self.threads = CPUS(threads)
    self.experiments = Experiment(amount)
    self.path = path

    self.pbar = None
    set_start_method('spawn', force=True)

collate_closure(opt)

Create a closure for the collate function.

Parameters:
  • opt (Namespace) –

    Namespace with the arguments.

Returns:
  • CollateFunction( CollateFunction ) –

    Collate function with part of the arguments.

src/imitation_datasets/controller.py
83
84
85
86
87
88
89
90
91
92
93
94
def collate_closure(self, opt: Namespace) -> CollateFunction:
    """Create a closure for the collate function.

    Args:
        opt (Namespace): Namespace with the arguments.

    Returns:
        CollateFunction: Collate function with part of the arguments.
    """
    path = f'{self.path}{opt.game}/'
    files = list(listdir(path))
    return partial(self.collate, data=files, path=path)

create_folder(path)

Create a folder if it does not exist.

Parameters:
  • path (str) –

    Path to the folder.

src/imitation_datasets/controller.py
47
48
49
50
51
52
53
54
def create_folder(self, path: str) -> None:
    """Create a folder if it does not exist.

    Args:
        path (str): Path to the folder.
    """
    if not os.path.exists(path):
        os.makedirs(path)

enjoy_closure(opt)

Create a closure for the enjoy function.

Parameters:
  • opt (Namespace) –

    Namespace with the arguments.

Returns:
  • EnjoyFunction( EnjoyFunction ) –

    Enjoy function with part of the arguments.

src/imitation_datasets/controller.py
70
71
72
73
74
75
76
77
78
79
80
81
def enjoy_closure(self, opt: Namespace) -> EnjoyFunction:
    """Create a closure for the enjoy function.

    Args:
        opt (Namespace): Namespace with the arguments.

    Returns:
        EnjoyFunction: Enjoy function with part of the arguments.
    """
    os.system("set LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libGLEW.so")
    os.system("set LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/nvidia")
    return partial(self.enjoy, expert=Experts.get_expert(opt.game))

enjoy_sequence(future, executor) async

summary

Parameters:
  • future (EnjoyFunction) –

    Enjoy function already with async future.

  • executor (ProcessPoolExecutor) –

    Executor to run the future.

Returns:
  • bool( bool ) –

    Result of the future. True if the expert was able to solve the game. False otherwise.

src/imitation_datasets/controller.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def enjoy_sequence(self, future: EnjoyFunction, executor: ProcessPoolExecutor) -> bool:
    """_summary_

    Args:
        future (EnjoyFunction): Enjoy function already with async future.
        executor (ProcessPoolExecutor): Executor to run the future.

    Returns:
        bool: Result of the future.
              True if the expert was able to solve the game. False otherwise.
    """
    # Pre
    cpu = await self.threads.cpu_allock()
    await self.experiments.start()
    await self.set_cpu(cpu)

    # Enjoy
    result = await asyncio.get_event_loop().run_in_executor(executor, future)

    # Post
    self.threads.cpu_release(cpu)
    await self.experiments.stop(result)
    self.pbar.update(1 if result else 0)

    return result if result else await asyncio.gather(self.enjoy_sequence(future, executor))

run(opt) async

Run the experiments.

Parameters:
  • opt (Namespace) –

    Namespace with the arguments.

src/imitation_datasets/controller.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
async def run(self, opt) -> None:
    """Run the experiments.

    Args:
        opt (Namespace): Namespace with the arguments.
    """
    path = f'{self.path}{opt.game}/'
    self.create_folder(path)

    tasks = []
    with ProcessPoolExecutor() as executor:
        for idx in range(self.experiments.amount):
            enjoy = self.enjoy_closure(opt)
            enjoy = partial(enjoy, path=path, context=Context(self.experiments, idx))
            task = asyncio.ensure_future(
                self.enjoy_sequence(
                    enjoy,
                    executor
                )
            )
            tasks.append(task)
        await asyncio.gather(*tasks)

set_cpu(cpu) async

Set the cpu affinity for the current process.

Parameters:
  • cpu (int) –

    CPU index to use.

src/imitation_datasets/controller.py
56
57
58
59
60
61
62
63
64
65
66
67
68
async def set_cpu(self, cpu: int) -> None:
    """Set the cpu affinity for the current process.

    Args:
        cpu (int): CPU index to use.
    """
    try:
        proc = psutil.Process()
        proc.cpu_affinity([int(cpu)])
        if 'linux' in platform:
            os.sched_setaffinity(proc.pid, [int(cpu)])
    except OSError:
        pass

start(opt)

Start the experiments.

Parameters:
  • opt (Namespace) –

    Namespace with the arguments.

Raises:
  • exception

    Exception (general) raised during the execution.

src/imitation_datasets/controller.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def start(self, opt: Namespace):
    """Start the experiments.

    Args:
        opt (Namespace): Namespace with the arguments.

    Raises:
        exception: Exception (general) raised during the execution.
    """
    try:
        if opt.mode in ['all', 'play']:
            self.pbar = tqdm(range(self.experiments.amount), desc='Running episodes')
            asyncio.run(self.run(opt))

        if opt.mode in ['all', 'collate']:
            self.pbar = tqdm(range(self.experiments.amount), desc='Running collate')
            collate = self.collate_closure(opt)
            collate()
    except Exception as exception:
        self.experiments.add_log(-99, exception)
        raise exception
    finally:
        self.experiments.write_log()

experts

Policy

Policy dataclass to load and use expert policies.

Source code in src/imitation_datasets/experts.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class Policy:
    """Policy dataclass to load and use expert policies."""

    name: str
    repo_id: str
    filename: str
    threshold: float
    algo: BaseAlgorithm
    policy: BaseAlgorithm = field(init=False, default=None)
    internal_state: Any = field(init=False, default=None)
    environment: Any = field(init=False, default=None)

    def load(self) -> BaseAlgorithm:
        """
        Load policy from HuggingFace hub.
        It uses a custom_object to replicate stable_baselines behaviour.

        custom_objects = {
            "learning_rate": 0.0,
            "lr_schedule": lambda _: 0.0,
            "clip_range": lambda _: 0.0
        }

        Returns:
            BaseAlgorithm: Stable baseline policy loaded from HuggingFace hub.
        """
        checkpoint = load_from_hub(
            repo_id=self.repo_id,
            filename=self.filename,
        )

        custom_objects = {
            "learning_rate": 0.0,
            "lr_schedule": lambda _: 0.0,
            "clip_range": lambda _: 0.0
        }

        self.policy = self.algo.load(
            checkpoint,
            custom_objects=custom_objects
        )
        return self.policy

    def predict(
            self,
            obs: List[Union[int, float]],
            deterministic: bool = True
    ) -> Tuple[
        Union[int, float, List[Union[int, float]]],
        Union[int, float, List[Union[int, float]]]
    ]:
        """
        Predict action given observation.

        Args:
            obs (List[int | float]): observation from environment.
            deterministic (bool, optional): Use exploration to predict action. Defaults to True.

        Returns:
            action (Union[int, float, List[Union[int, float]]]):
                action predicted by the policy.
            internal_states (Union[int, float, List[Union[int, float]]]):
                internal states of the policy.

        Note: typing depends on the environment.
        """
        action, internal_states = self.policy.predict(
            obs,
            state=self.internal_state,
            deterministic=deterministic,
        )
        self.internal_state = internal_states
        return action, internal_states

    def get_environment(self) -> str:
        """Return environment name.

        Returns:
            str: environment name.
        """
        if self.environment is None:
            self.environment = gym.make(self.name, render_mode="rgb_array")
        return self.environment

get_environment()

Return environment name.

Returns:
  • str( str ) –

    environment name.

src/imitation_datasets/experts.py
87
88
89
90
91
92
93
94
95
def get_environment(self) -> str:
    """Return environment name.

    Returns:
        str: environment name.
    """
    if self.environment is None:
        self.environment = gym.make(self.name, render_mode="rgb_array")
    return self.environment

load()

Load policy from HuggingFace hub. It uses a custom_object to replicate stable_baselines behaviour.

custom_objects = { "learning_rate": 0.0, "lr_schedule": lambda : 0.0, "clip_range": lambda : 0.0 }

Returns:
  • BaseAlgorithm( BaseAlgorithm ) –

    Stable baseline policy loaded from HuggingFace hub.

src/imitation_datasets/experts.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def load(self) -> BaseAlgorithm:
    """
    Load policy from HuggingFace hub.
    It uses a custom_object to replicate stable_baselines behaviour.

    custom_objects = {
        "learning_rate": 0.0,
        "lr_schedule": lambda _: 0.0,
        "clip_range": lambda _: 0.0
    }

    Returns:
        BaseAlgorithm: Stable baseline policy loaded from HuggingFace hub.
    """
    checkpoint = load_from_hub(
        repo_id=self.repo_id,
        filename=self.filename,
    )

    custom_objects = {
        "learning_rate": 0.0,
        "lr_schedule": lambda _: 0.0,
        "clip_range": lambda _: 0.0
    }

    self.policy = self.algo.load(
        checkpoint,
        custom_objects=custom_objects
    )
    return self.policy

predict(obs, deterministic=True)

Predict action given observation.

Parameters:
  • obs (List[int | float]) –

    observation from environment.

  • deterministic (bool, default: True ) –

    Use exploration to predict action. Defaults to True.

Returns:
  • action( Union[int, float, List[Union[int, float]]] ) –

    action predicted by the policy.

  • internal_states( Union[int, float, List[Union[int, float]]] ) –

    internal states of the policy.

Note: typing depends on the environment.

src/imitation_datasets/experts.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def predict(
        self,
        obs: List[Union[int, float]],
        deterministic: bool = True
) -> Tuple[
    Union[int, float, List[Union[int, float]]],
    Union[int, float, List[Union[int, float]]]
]:
    """
    Predict action given observation.

    Args:
        obs (List[int | float]): observation from environment.
        deterministic (bool, optional): Use exploration to predict action. Defaults to True.

    Returns:
        action (Union[int, float, List[Union[int, float]]]):
            action predicted by the policy.
        internal_states (Union[int, float, List[Union[int, float]]]):
            internal states of the policy.

    Note: typing depends on the environment.
    """
    action, internal_states = self.policy.predict(
        obs,
        state=self.internal_state,
        deterministic=deterministic,
    )
    self.internal_state = internal_states
    return action, internal_states

Experts

Helper class to register and get expert policies.

Source code in src/imitation_datasets/experts.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class Experts:
    """Helper class to register and get expert policies."""
    experts: Dict[str, Policy] = {
        key: Policy(**value) for env in [atari, classic, mujoco] for key, value in env.items()
    }

    @classmethod
    def register(cls, identifier: str, policy: Policy) -> None:
        """Register a new policy."""
        if not isinstance(policy.threshold, float):
            policy.threshold = float(policy.threshold)

        cls.experts[identifier] = policy

    @classmethod
    def get_expert(cls, identifier: str) -> Policy:
        """Return expert policy.

        Args:
            identifier (str): identifier of the policy.

        Returns:
            Policy: dataclass with expert policy information.
        """
        try:
            return cls.experts[identifier]
        except KeyError:
            return None

    @classmethod
    def get_register(cls) -> None:
        """Print entire register of expert policies."""
        return cls.experts

get_expert(identifier) classmethod

Return expert policy.

Parameters:
  • identifier (str) –

    identifier of the policy.

Returns:
  • Policy( Policy ) –

    dataclass with expert policy information.

src/imitation_datasets/experts.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@classmethod
def get_expert(cls, identifier: str) -> Policy:
    """Return expert policy.

    Args:
        identifier (str): identifier of the policy.

    Returns:
        Policy: dataclass with expert policy information.
    """
    try:
        return cls.experts[identifier]
    except KeyError:
        return None

get_register() classmethod

Print entire register of expert policies.

src/imitation_datasets/experts.py
127
128
129
130
@classmethod
def get_register(cls) -> None:
    """Print entire register of expert policies."""
    return cls.experts

register(identifier, policy) classmethod

Register a new policy.

src/imitation_datasets/experts.py
104
105
106
107
108
109
110
@classmethod
def register(cls, identifier: str, policy: Policy) -> None:
    """Register a new policy."""
    if not isinstance(policy.threshold, float):
        policy.threshold = float(policy.threshold)

    cls.experts[identifier] = policy

functions

enjoy

This is a simple enjoy function example. It has three arguments and should return a boolean.

src/imitation_datasets/functions.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def enjoy(expert: Policy, path: str, context: Context) -> bool:
    """
    This is a simple enjoy function example.
    It has three arguments and should return a boolean.
    """
    done = False
    expert.load()

    env = GymWrapper(expert.get_environment(), version="newest")

    states, actions = [], []
    acc_reward, state = 0, env.reset()
    while not done:
        action, _ = expert.predict(state)
        state, reward, done, _ = env.step(action)
        acc_reward += reward
        states.append(state)
        actions.append(action)
    env.close()

    episode = {
        'states': np.array(states),
        'actions': np.array(actions)
    }
    if acc_reward >= expert.threshold:
        np.savez(f'{path}{context.index}', **episode)
        context.add_log(f'Accumulated reward {acc_reward}')
    return acc_reward >= expert.threshold

baseline_enjoy

Enjoy following StableBaseline output.

src/imitation_datasets/functions.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def baseline_enjoy(expert: Policy, path: str, context: Context) -> bool:
    """Enjoy following StableBaseline output."""
    done = False
    expert.load()

    env = GymWrapper(expert.get_environment(), version="newest")

    states = []
    actions = []
    rewards = []
    state = env.reset()
    acc_reward = 0

    while not done:
        action, _ = expert.predict(state)
        states.append(state)
        actions.append(action)

        state, reward, done, _ = env.step(action)
        acc_reward += reward
        rewards.append(reward)
    env.close()

    episode_returns = np.array([acc_reward])

    episode = {
        'obs': np.array(states),
        'actions': np.array(actions),
        'rewards': np.array(rewards),
        'episode_returns': episode_returns
    }
    if acc_reward >= expert.threshold:
        np.savez(f'{path}{context.index}', **episode)
        context.add_log(f'Accumulated reward {acc_reward}')
    return acc_reward >= expert.threshold

collate

This function is a simple collate function.

src/imitation_datasets/functions.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def collate(path, data) -> bool:
    """This function is a simple collate function."""
    episodes_starts = []
    states, actions = [], []

    for file in data:
        episode = np.load(f'{path}{file}')
        states.append(episode['states'])
        actions.append(episode['actions'])

        episode_starts = np.zeros(episode['actions'].shape)
        episode_starts[0] = 1
        episodes_starts.append(episode_starts)

    states = np.array(states)
    states = states.reshape((-1, states.shape[-1]))
    actions = np.array(actions).reshape(-1)
    episodes_starts = np.array(episodes_starts).reshape(-1)

    episode = {
        'states': states,
        'actions': actions,
        'episode_starts': episodes_starts
    }
    np.savez(f'{path}teacher', **episode)

    for file in data:
        os.remove(f'{path}{file}')

    return True

baseline_collate

Collate that outputs the same as StableBaseline.

src/imitation_datasets/functions.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def baseline_collate(path: str, data: List[str]) -> bool:
    """Collate that outputs the same as StableBaseline."""
    episode = np.load(f'{path}{data[0]}')
    observation_space = episode["obs"].shape[1]

    states = np.ndarray(shape=(0, observation_space))
    episodes_starts = []
    actions = []
    rewards = []
    episode_returns = []

    for file in data:
        episode = np.load(f'{path}{file}')
        states = np.append(states, episode['obs'], axis=0)
        actions += episode['actions'].tolist()
        rewards += episode['rewards'].tolist()
        episode_returns += episode['episode_returns'].tolist()

        episode_starts = np.zeros(episode['actions'].shape)
        episode_starts[0] = 1
        episodes_starts += episode_starts.tolist()

    states = states.reshape((-1, states.shape[-1]))

    actions = np.array(actions).reshape(-1)
    episodes_starts = np.array(episodes_starts).reshape(-1)

    rewards = np.array(rewards).reshape(-1)

    episode_returns = np.array(episode_returns).squeeze()

    episode = {
        'obs': states,
        'actions': actions,
        'rewards': rewards,
        'episode_returns': episode_returns,
        'episode_starts': episodes_starts
    }
    np.savez(f'{path}teacher', **episode)

    for file in data:
        os.remove(f'{path}{file}')

    return True

utils

Experiment

Experiment dataclass to keep track of the experiments.

Source code in src/imitation_datasets/utils.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass
class Experiment:
    """Experiment dataclass to keep track of the experiments."""

    amount: int
    path: str = './logs.txt'
    waiting: int = field(
        init=False,
        default_factory=int
    )
    logs: DefaultDict[int, list] = field(
        init=False,
        default_factory=lambda: defaultdict(list)
    )
    experiment_semaphore: asyncio.Lock = field(
        init=False,
        default=asyncio.BoundedSemaphore(value=1)
    )

    def __post_init__(self) -> None:
        """Write in log file that the dataset creation has started."""
        if os.path.exists(self.path):
            os.remove(self.path)

        if not os.path.exists(self.path):
            with open(self.path, 'w', encoding='utf8') as log_file:
                log_file.write('#### Starting dataset creation ####\n')

    def is_done(self) -> bool:
        """Check if the experiment is done.

        Returns:
            bool: True if the experiment is done, False otherwise.
        """
        return self.amount == 0

    async def start(self, amount: int = 1) -> Tuple[bool, int]:
        """Start an experiment.

        Args:
            amount (int, optional): How many experiments are left to run. Defaults to 1.

        Returns:
            status (bool): True if the experiment can be started, False otherwise.
            amount (int): How many experiments are left to run.
        """
        await self.experiment_semaphore.acquire()
        if self.amount > 0:
            self.waiting += amount
            self.amount -= amount
            self.experiment_semaphore.release()
            return True, self.amount

        self.experiment_semaphore.release()
        return False, -1

    async def stop(self, status: bool, amount: int = 1) -> None:
        """Stop an experiment.

        Args:
            status (bool): True if the experiment was successful, False otherwise.
            amount (int, optional): How many experiments are left to run. Defaults to 1.
        """
        await self.experiment_semaphore.acquire()
        self.amount += 0 if status else amount
        self.waiting -= amount
        self.experiment_semaphore.release()

    def add_log(self, experiment: int, log: str) -> None:
        """Add a log to the experiment.

        Args:
            experiment (int): Experiment index.
            log (str): Log to add.
        """
        self.logs[experiment].append(log)

    def write_log(self) -> None:
        """Write the logs in the log file."""
        with open('./logs.txt', 'a', encoding='utf8') as log_file:
            for idx, logs in self.logs.items():
                for log in logs:
                    log_file.write(f'\nExperiment {idx}: {log}')
                log_file.write('\n')

__post_init__()

Write in log file that the dataset creation has started.

src/imitation_datasets/utils.py
53
54
55
56
57
58
59
60
def __post_init__(self) -> None:
    """Write in log file that the dataset creation has started."""
    if os.path.exists(self.path):
        os.remove(self.path)

    if not os.path.exists(self.path):
        with open(self.path, 'w', encoding='utf8') as log_file:
            log_file.write('#### Starting dataset creation ####\n')

add_log(experiment, log)

Add a log to the experiment.

Parameters:
  • experiment (int) –

    Experiment index.

  • log (str) –

    Log to add.

src/imitation_datasets/utils.py
102
103
104
105
106
107
108
109
def add_log(self, experiment: int, log: str) -> None:
    """Add a log to the experiment.

    Args:
        experiment (int): Experiment index.
        log (str): Log to add.
    """
    self.logs[experiment].append(log)

is_done()

Check if the experiment is done.

Returns:
  • bool( bool ) –

    True if the experiment is done, False otherwise.

src/imitation_datasets/utils.py
62
63
64
65
66
67
68
def is_done(self) -> bool:
    """Check if the experiment is done.

    Returns:
        bool: True if the experiment is done, False otherwise.
    """
    return self.amount == 0

start(amount=1) async

Start an experiment.

Parameters:
  • amount (int, default: 1 ) –

    How many experiments are left to run. Defaults to 1.

Returns:
  • status( bool ) –

    True if the experiment can be started, False otherwise.

  • amount( int ) –

    How many experiments are left to run.

src/imitation_datasets/utils.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def start(self, amount: int = 1) -> Tuple[bool, int]:
    """Start an experiment.

    Args:
        amount (int, optional): How many experiments are left to run. Defaults to 1.

    Returns:
        status (bool): True if the experiment can be started, False otherwise.
        amount (int): How many experiments are left to run.
    """
    await self.experiment_semaphore.acquire()
    if self.amount > 0:
        self.waiting += amount
        self.amount -= amount
        self.experiment_semaphore.release()
        return True, self.amount

    self.experiment_semaphore.release()
    return False, -1

stop(status, amount=1) async

Stop an experiment.

Parameters:
  • status (bool) –

    True if the experiment was successful, False otherwise.

  • amount (int, default: 1 ) –

    How many experiments are left to run. Defaults to 1.

src/imitation_datasets/utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def stop(self, status: bool, amount: int = 1) -> None:
    """Stop an experiment.

    Args:
        status (bool): True if the experiment was successful, False otherwise.
        amount (int, optional): How many experiments are left to run. Defaults to 1.
    """
    await self.experiment_semaphore.acquire()
    self.amount += 0 if status else amount
    self.waiting -= amount
    self.experiment_semaphore.release()

write_log()

Write the logs in the log file.

src/imitation_datasets/utils.py
111
112
113
114
115
116
117
def write_log(self) -> None:
    """Write the logs in the log file."""
    with open('./logs.txt', 'a', encoding='utf8') as log_file:
        for idx, logs in self.logs.items():
            for log in logs:
                log_file.write(f'\nExperiment {idx}: {log}')
            log_file.write('\n')

Context

Context dataclass to keep track of the context of the experiment.

Source code in src/imitation_datasets/utils.py
120
121
122
123
124
125
126
127
128
@dataclass
class Context:
    """Context dataclass to keep track of the context of the experiment."""
    experiments: Experiment
    index: int

    def add_log(self, log: str) -> None:
        """Add a log to the experiment."""
        self.experiments.add_log(self.index, log)

add_log(log)

Add a log to the experiment.

src/imitation_datasets/utils.py
126
127
128
def add_log(self, log: str) -> None:
    """Add a log to the experiment."""
    self.experiments.add_log(self.index, log)

CPUS

CPUS dataclass to keep track of the available CPUs.

Source code in src/imitation_datasets/utils.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@dataclass
class CPUS(metaclass=Singleton):
    """CPUS dataclass to keep track of the available CPUs."""

    available_cpus: int = field(default_factory=multiprocessing.cpu_count())
    cpus: DefaultDict[int, bool] = field(init=False, default_factory=lambda: defaultdict(bool))
    cpu_semaphore: asyncio.Lock = field(init=False)

    def __post_init__(self) -> None:
        """Initialize the cpu_semaphore."""
        if self.available_cpus > multiprocessing.cpu_count() - 1:
            self.available_cpus = multiprocessing.cpu_count() - 1
        self.cpu_semaphore = asyncio.BoundedSemaphore(value=self.available_cpus)

    async def cpu_allock(self) -> int:
        """Acquire a CPU.

        Returns:
            int: CPU index.
        """
        await self.cpu_semaphore.acquire()
        for idx in range(self.available_cpus):
            if not self.cpus[idx]:
                self.cpus[idx] = True
                return idx

    def cpu_release(self, cpu_idx: int) -> None:
        """Release a CPU.

        Args:
            cpu_idx (int): CPU index.
        """
        try:
            self.cpus[cpu_idx] = False
            self.cpu_semaphore.release()
        except ValueError:
            pass

__post_init__()

Initialize the cpu_semaphore.

src/imitation_datasets/utils.py
139
140
141
142
143
def __post_init__(self) -> None:
    """Initialize the cpu_semaphore."""
    if self.available_cpus > multiprocessing.cpu_count() - 1:
        self.available_cpus = multiprocessing.cpu_count() - 1
    self.cpu_semaphore = asyncio.BoundedSemaphore(value=self.available_cpus)

cpu_allock() async

Acquire a CPU.

Returns:
  • int( int ) –

    CPU index.

src/imitation_datasets/utils.py
145
146
147
148
149
150
151
152
153
154
155
async def cpu_allock(self) -> int:
    """Acquire a CPU.

    Returns:
        int: CPU index.
    """
    await self.cpu_semaphore.acquire()
    for idx in range(self.available_cpus):
        if not self.cpus[idx]:
            self.cpus[idx] = True
            return idx

cpu_release(cpu_idx)

Release a CPU.

Parameters:
  • cpu_idx (int) –

    CPU index.

src/imitation_datasets/utils.py
157
158
159
160
161
162
163
164
165
166
167
def cpu_release(self, cpu_idx: int) -> None:
    """Release a CPU.

    Args:
        cpu_idx (int): CPU index.
    """
    try:
        self.cpus[cpu_idx] = False
        self.cpu_semaphore.release()
    except ValueError:
        pass

GymWrapper

Wrapper for gym environment. Since Gymnasium and Gym version 0.26 there are some environments that were working under Gym-v.0.21 stopped working. This wrapper just makes sure that the output for the environment will always work with the version the user wants.

Source code in src/imitation_datasets/utils.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
class GymWrapper:
    """
        Wrapper for gym environment. Since Gymnasium and Gym version 0.26
        there are some environments that were working under Gym-v.0.21 stopped 
        working. This wrapper just makes sure that the output for the environment 
        will always work with the version the user wants.
    """

    def __init__(self, environment: Any, version: str = "newest") -> None:
        """
        Args:
            name: gym environment name
            version: ["newest", "older"] refers to the compatibility version.

        In this case, "newest" is 0.26 and "older" is 0.21.
        """
        if version not in ["newest", "older"]:
            raise ValueError("Version has to be : ['newest', 'older']")

        self.env = environment
        state = environment.reset()
        if version == "older" and not isinstance(state[0], np.floating):
            raise WrapperException("Incopatible environment version and wrapper version.")
        if version == "newest" and not isinstance(state[0], np.ndarray):
            raise WrapperException("Incopatible environment version and wrapper version.")

        self.version = version

    @property
    def action_space(self):
        """Map gym action_space attribute to wrapper."""
        return self.env.action_space

    @property
    def observation_space(self):
        """Map gym env_space attribute to wrapper."""
        return self.env.observation_space

    def set_seed(self, seed: int) -> None:
        """Set seed for all packages (Pytorch, Numpy and Python).

        Args:
            seed (optional, int): seed number to use for the random generator.
        """
        torch.manual_seed(seed)
        np.random.seed(seed)
        random.seed(seed)

    def reset(self) -> Union[Tuple[List[float], Dict[str, Any]], List[float]]:
        """Resets the framework and return the appropriate return."""
        state = self.env.reset()
        if self.version == "newest":
            return state[0]
        return state

    def step(
            self,
            action: Union[float, int]
    ) -> Union[
        Tuple[List[float], float, bool, bool, Dict[str, Any]],
        Tuple[List[float], float, bool, Dict[str, Any]]
    ]:
        """
        Perform an action in the environment and return the appropriate return
        according to version.
        """
        gym_return = self.env.step(action)
        if self.version == "newest":
            state, reward, terminated, truncated, info = gym_return
            return state, reward, terminated or truncated, info

        return gym_return

    def render(self, mode="rgb_array"):
        """Return the render for the environment."""
        if self.version == "newest":
            state = self.env.render()
            if state is None and self.env.render_mode != "human":
                raise WrapperException("No render mode set.")
            return state

        return self.env.render(mode)

    def close(self) -> None:
        """Close the environment."""
        self.env.close()

action_space property

Map gym action_space attribute to wrapper.

observation_space property

Map gym env_space attribute to wrapper.

__init__(environment, version='newest')

Parameters:
  • name

    gym environment name

  • version (str, default: 'newest' ) –

    ["newest", "older"] refers to the compatibility version.

In this case, "newest" is 0.26 and "older" is 0.21.

src/imitation_datasets/utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def __init__(self, environment: Any, version: str = "newest") -> None:
    """
    Args:
        name: gym environment name
        version: ["newest", "older"] refers to the compatibility version.

    In this case, "newest" is 0.26 and "older" is 0.21.
    """
    if version not in ["newest", "older"]:
        raise ValueError("Version has to be : ['newest', 'older']")

    self.env = environment
    state = environment.reset()
    if version == "older" and not isinstance(state[0], np.floating):
        raise WrapperException("Incopatible environment version and wrapper version.")
    if version == "newest" and not isinstance(state[0], np.ndarray):
        raise WrapperException("Incopatible environment version and wrapper version.")

    self.version = version

close()

Close the environment.

src/imitation_datasets/utils.py
266
267
268
def close(self) -> None:
    """Close the environment."""
    self.env.close()

render(mode='rgb_array')

Return the render for the environment.

src/imitation_datasets/utils.py
256
257
258
259
260
261
262
263
264
def render(self, mode="rgb_array"):
    """Return the render for the environment."""
    if self.version == "newest":
        state = self.env.render()
        if state is None and self.env.render_mode != "human":
            raise WrapperException("No render mode set.")
        return state

    return self.env.render(mode)

reset()

Resets the framework and return the appropriate return.

src/imitation_datasets/utils.py
231
232
233
234
235
236
def reset(self) -> Union[Tuple[List[float], Dict[str, Any]], List[float]]:
    """Resets the framework and return the appropriate return."""
    state = self.env.reset()
    if self.version == "newest":
        return state[0]
    return state

set_seed(seed)

Set seed for all packages (Pytorch, Numpy and Python).

Parameters:
  • seed ((optional, int)) –

    seed number to use for the random generator.

src/imitation_datasets/utils.py
221
222
223
224
225
226
227
228
229
def set_seed(self, seed: int) -> None:
    """Set seed for all packages (Pytorch, Numpy and Python).

    Args:
        seed (optional, int): seed number to use for the random generator.
    """
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

step(action)

Perform an action in the environment and return the appropriate return according to version.

src/imitation_datasets/utils.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def step(
        self,
        action: Union[float, int]
) -> Union[
    Tuple[List[float], float, bool, bool, Dict[str, Any]],
    Tuple[List[float], float, bool, Dict[str, Any]]
]:
    """
    Perform an action in the environment and return the appropriate return
    according to version.
    """
    gym_return = self.env.step(action)
    if self.version == "newest":
        state, reward, terminated, truncated, info = gym_return
        return state, reward, terminated or truncated, info

    return gym_return

WrapperException

Bases: Exception

Wrapper exception for all exceptions related to the wrapper.

Source code in src/imitation_datasets/utils.py
174
175
176
177
178
179
class WrapperException(Exception):
    """Wrapper exception for all exceptions related to the wrapper."""

    def __init__(self, message: str) -> None:
        self.message = message
        super().__init__(self.message)

dataset

BaselineDataset

Bases: Dataset

Teacher dataset for IL methods.

Source code in src/imitation_datasets/dataset/dataset.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class BaselineDataset(Dataset):
    """Teacher dataset for IL methods."""

    def __init__(
        self,
        path: str,
        source: str = "local",
        hf_split: str = "train",
        split: str = "train",
        n_episodes: int = None,
        transform: Callable[[torch.Tensor], torch.Tensor] = None
    ) -> None:
        """Initialize dataset.

        Args:
            path (Str): path to the dataset.
            source (str): whether is a HuggingFace or a local dataset.
                Defaults to 'local'.
            hf_split (str): HuggingFace split to use. Defaults to 'train'.
            split (str): split to use. Defaults to 'train'.
            n_episodes (int): number of episodes to use. Defaults to None.
            transform (Callable[[torch.Tensor], torch.Tensor]): transform to apply to the data. Defaults to None.

        Raises:
            ValueError: if path does not exist.
        """
        self.transform = transform

        if source == "local" and not os.path.exists(path):
            raise ValueError(f"No dataset at: {path}")

        if source == "local":
            self.data = np.load(path, allow_pickle=True)
            self.average_reward = np.mean(self.data["episode_returns"])
        else:
            dataset = load_dataset(path, split=hf_split)
            self.data = huggingface_to_baseline(dataset)
            if len(self.data["obs"].shape) == 1:
                self.data["obs"] = self.data["obs"].reshape((-1, 1))
            self.average_reward = []

        shape = [1] if isinstance(self.data["obs"][0], str) else self.data["obs"].shape[1:]
        self.states = np.ndarray(shape=(0, *shape))
        self.next_states = np.ndarray(shape=(0, *shape))

        if len(self.data["actions"].shape) == 1:
            action_size = 1
        else:
            action_size = self.data["actions"].shape[-1]
        self.actions = np.ndarray(shape=(0, action_size))

        episode_starts = list(np.where(self.data["episode_starts"] == 1)[0])
        episode_starts.append(len(self.data["episode_starts"]))

        if n_episodes is not None:
            if split == "train":
                episode_starts = episode_starts[:n_episodes + 1]
            else:
                episode_starts = episode_starts[n_episodes:]

        for start, end in zip(episode_starts, tqdm(episode_starts[1:], desc="Creating dataset")):
            episode = self.data["obs"][start:end]
            actions = self.data["actions"][start:end].reshape((-1, 1))
            self.actions = np.append(self.actions, actions[:-1], axis=0)
            self.states = np.append(self.states, episode[:-1], axis=0)
            self.next_states = np.append(self.next_states, episode[1:], axis=0)

            if source != "local":
                self.average_reward.append(self.data["rewards"][start:end].sum())

        if isinstance(self.average_reward, list):
            self.average_reward = np.mean(self.average_reward)

        assert self.states.shape[0] == self.actions.shape[0] == self.next_states.shape[0]

        if not isinstance(self.states[0, 0], str):
            self.states = torch.from_numpy(self.states)
            self.next_states = torch.from_numpy(self.next_states)
        self.actions = torch.from_numpy(self.actions)

    def __len__(self) -> int:
        """Dataset length.

        Returns:
            length (int): length.
        """
        return self.states.shape[0]

    def __getitem__(self, index: int) -> Tuple[torch.Tensor]:
        """Get item from dataset.

        Args:
            index (int): index.

        Returns:
            state (torch.Tensor): state for timestep t.
            action (torch.Tensor): action for timestep t.
            next_state (torch.Tensor): state for timestep t + 1.
        """
        state = self.states[index]
        next_state = self.next_states[index]
        if isinstance(state[0], str):
            state = ToTensor()(Image.open(state[0]))
            next_state = ToTensor()(Image.open(next_state[0]))

        if self.transform is not None:
            state = self.transform(state)
            next_state = self.transform(next_state)

        action = self.actions[index]
        return state, action, next_state

__getitem__(index)

Get item from dataset.

Parameters:
  • index (int) –

    index.

Returns:
  • state( Tensor ) –

    state for timestep t.

  • action( Tensor ) –

    action for timestep t.

  • next_state( Tensor ) –

    state for timestep t + 1.

src/imitation_datasets/dataset/dataset.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def __getitem__(self, index: int) -> Tuple[torch.Tensor]:
    """Get item from dataset.

    Args:
        index (int): index.

    Returns:
        state (torch.Tensor): state for timestep t.
        action (torch.Tensor): action for timestep t.
        next_state (torch.Tensor): state for timestep t + 1.
    """
    state = self.states[index]
    next_state = self.next_states[index]
    if isinstance(state[0], str):
        state = ToTensor()(Image.open(state[0]))
        next_state = ToTensor()(Image.open(next_state[0]))

    if self.transform is not None:
        state = self.transform(state)
        next_state = self.transform(next_state)

    action = self.actions[index]
    return state, action, next_state

__init__(path, source='local', hf_split='train', split='train', n_episodes=None, transform=None)

Initialize dataset.

Parameters:
  • path (Str) –

    path to the dataset.

  • source (str, default: 'local' ) –

    whether is a HuggingFace or a local dataset. Defaults to 'local'.

  • hf_split (str, default: 'train' ) –

    HuggingFace split to use. Defaults to 'train'.

  • split (str, default: 'train' ) –

    split to use. Defaults to 'train'.

  • n_episodes (int, default: None ) –

    number of episodes to use. Defaults to None.

  • transform (Callable[[Tensor], Tensor], default: None ) –

    transform to apply to the data. Defaults to None.

Raises:
  • ValueError

    if path does not exist.

src/imitation_datasets/dataset/dataset.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    path: str,
    source: str = "local",
    hf_split: str = "train",
    split: str = "train",
    n_episodes: int = None,
    transform: Callable[[torch.Tensor], torch.Tensor] = None
) -> None:
    """Initialize dataset.

    Args:
        path (Str): path to the dataset.
        source (str): whether is a HuggingFace or a local dataset.
            Defaults to 'local'.
        hf_split (str): HuggingFace split to use. Defaults to 'train'.
        split (str): split to use. Defaults to 'train'.
        n_episodes (int): number of episodes to use. Defaults to None.
        transform (Callable[[torch.Tensor], torch.Tensor]): transform to apply to the data. Defaults to None.

    Raises:
        ValueError: if path does not exist.
    """
    self.transform = transform

    if source == "local" and not os.path.exists(path):
        raise ValueError(f"No dataset at: {path}")

    if source == "local":
        self.data = np.load(path, allow_pickle=True)
        self.average_reward = np.mean(self.data["episode_returns"])
    else:
        dataset = load_dataset(path, split=hf_split)
        self.data = huggingface_to_baseline(dataset)
        if len(self.data["obs"].shape) == 1:
            self.data["obs"] = self.data["obs"].reshape((-1, 1))
        self.average_reward = []

    shape = [1] if isinstance(self.data["obs"][0], str) else self.data["obs"].shape[1:]
    self.states = np.ndarray(shape=(0, *shape))
    self.next_states = np.ndarray(shape=(0, *shape))

    if len(self.data["actions"].shape) == 1:
        action_size = 1
    else:
        action_size = self.data["actions"].shape[-1]
    self.actions = np.ndarray(shape=(0, action_size))

    episode_starts = list(np.where(self.data["episode_starts"] == 1)[0])
    episode_starts.append(len(self.data["episode_starts"]))

    if n_episodes is not None:
        if split == "train":
            episode_starts = episode_starts[:n_episodes + 1]
        else:
            episode_starts = episode_starts[n_episodes:]

    for start, end in zip(episode_starts, tqdm(episode_starts[1:], desc="Creating dataset")):
        episode = self.data["obs"][start:end]
        actions = self.data["actions"][start:end].reshape((-1, 1))
        self.actions = np.append(self.actions, actions[:-1], axis=0)
        self.states = np.append(self.states, episode[:-1], axis=0)
        self.next_states = np.append(self.next_states, episode[1:], axis=0)

        if source != "local":
            self.average_reward.append(self.data["rewards"][start:end].sum())

    if isinstance(self.average_reward, list):
        self.average_reward = np.mean(self.average_reward)

    assert self.states.shape[0] == self.actions.shape[0] == self.next_states.shape[0]

    if not isinstance(self.states[0, 0], str):
        self.states = torch.from_numpy(self.states)
        self.next_states = torch.from_numpy(self.next_states)
    self.actions = torch.from_numpy(self.actions)

__len__()

Dataset length.

Returns:
  • length( int ) –

    length.

src/imitation_datasets/dataset/dataset.py
 96
 97
 98
 99
100
101
102
def __len__(self) -> int:
    """Dataset length.

    Returns:
        length (int): length.
    """
    return self.states.shape[0]

huggingface

baseline_to_huggingface

Loads baseline dataset from NpzFile, converts into a dict and save it into a JSONL file for upload.

Parameters:
  • dataset_path (str) –

    path to the npz file.

  • new_path (str) –

    path to the new dataset.

Raises:
  • ValueError

    if one of the paths does not exist.

src/imitation_datasets/dataset/huggingface.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def baseline_to_huggingface(dataset_path: str, new_path: str, keys: List[str] = None) -> None:
    """Loads baseline dataset from NpzFile, converts into a dict and save it
    into a JSONL file for upload.

    Args:
        dataset_path (str): path to the npz file.
        new_path (str): path to the new dataset.

    Raises:
        ValueError: if one of the paths does not exist.
    """
    path = "/".join(dataset_path.split("/")[:-1])
    if not os.path.exists(path):
        raise ValueError(f"'{path}' does not exist.")

    path = "/".join(new_path.split("/")[:-1])
    if not os.path.exists(path):
        raise ValueError(f"'{path}' does not exist.")

    dataset = np.load(dataset_path, allow_pickle=True)
    dataset = convert_baseline_dataset_to_dict(dataset, keys)
    save_dataset_into_huggingface_format(dataset, new_path)

baseline_to_huggingface

Loads baseline dataset from NpzFile, converts into a dict and save it into a JSONL file for upload.

Parameters:
  • dataset_path (str) –

    path to the npz file.

  • new_path (str) –

    path to the new dataset.

Raises:
  • ValueError

    if one of the paths does not exist.

src/imitation_datasets/dataset/huggingface.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def baseline_to_huggingface(dataset_path: str, new_path: str, keys: List[str] = None) -> None:
    """Loads baseline dataset from NpzFile, converts into a dict and save it
    into a JSONL file for upload.

    Args:
        dataset_path (str): path to the npz file.
        new_path (str): path to the new dataset.

    Raises:
        ValueError: if one of the paths does not exist.
    """
    path = "/".join(dataset_path.split("/")[:-1])
    if not os.path.exists(path):
        raise ValueError(f"'{path}' does not exist.")

    path = "/".join(new_path.split("/")[:-1])
    if not os.path.exists(path):
        raise ValueError(f"'{path}' does not exist.")

    dataset = np.load(dataset_path, allow_pickle=True)
    dataset = convert_baseline_dataset_to_dict(dataset, keys)
    save_dataset_into_huggingface_format(dataset, new_path)

metrics

performance

Compute the performance for the agent. Performance normalises between random and expert policies rewards, where performance 0 corresponds to random policy performance, and 1 are for expert policy performance.

performance = (X - X_min) / (X_max - X_min),

where X_min is the random_reward, and X_max is the teacher_reward.

Parameters:
  • agent_reward (Number) –

    agent accumulated reward.

  • teacher_reward (Number) –

    teacher accumulated reward.

  • random_reward (Number) –

    random agent accumulated reward.

Raises:
  • ValueError

    if the teacher reward is inferior to the random agent.

  • ValueError

    Teacher and Random rewards should be Numbers.

Returns:
  • performance( Number ) –

    performance metric.

src/imitation_datasets/dataset/metrics.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def performance(
    agent_reward: Union[Number, List, np.ndarray],
    teacher_reward: Number,
    random_reward: Number
) -> Dict[str, Number]:
    """Compute the performance for the agent. Performance normalises between
    random and expert policies rewards, where performance 0 corresponds to
    random policy performance, and 1 are for expert policy performance.

    performance = (X - X_min) / (X_max - X_min),

    where X_min is the random_reward, and X_max is the teacher_reward.

    Args:
        agent_reward (Number): agent accumulated reward.
        teacher_reward (Number): teacher accumulated reward.
        random_reward (Number): random agent accumulated reward.

    Raises:
        ValueError: if the teacher reward is inferior to the random agent.
        ValueError: Teacher and Random rewards should be Numbers.

    Returns:
        performance (Number): performance metric.
    """
    if isinstance(teacher_reward, (list, np.ndarray)):
        raise ValueError("Teacher reward should not be a list")

    if isinstance(random_reward, (list, np.ndarray)):
        raise ValueError("Random reward should not be a list")

    if teacher_reward < random_reward:
        raise ValueError("Random reward should lesser than the teacher's.")

    if isinstance(agent_reward, list):
        agent_reward = np.array(agent_reward)

    perf = (agent_reward - random_reward) / (teacher_reward - random_reward)
    if isinstance(perf, np.ndarray):
        return {"performance": perf.mean(), "performance_std": perf.std()}
    return {"performance": perf, "performance_std": 0}

average_episodic_reward

Compute the average episodic reward for the agent. AER is the average of 'n' episodes for each agent in each environment.

Parameters:
  • agent_reward (List[Number]) –

    list of each episode accumulated reward.

Returns:
  • AER( Number ) –

    average episodic reward metric.

src/imitation_datasets/dataset/metrics.py
52
53
54
55
56
57
58
59
60
61
62
63
64
def average_episodic_reward(agent_reward: List[Number]) -> Dict[str, Number]:
    """Compute the average episodic reward for the agent. AER is the average
    of 'n' episodes for each agent in each environment.

    Args:
        agent_reward (List[Number]): list of each episode accumulated reward.

    Returns:
        AER (Number): average episodic reward metric.
    """
    if isinstance(agent_reward, list):
        agent_reward = np.array(agent_reward)
    return {"aer": agent_reward.mean(), "aer_std": agent_reward.std()}

accuracy

Compute the accuracy for a model. The accuracy returned is the percentage from 0 to 100.

Parameters:
  • prediction (Tensor) –

    logits from a model.

  • ground_truth (Tensor) –

    ground truth class.

Raises:
  • ValueError

    if predictions and ground_truth are not torch.Tensor.

  • ValueError

    if predictions are not two dimensional.

  • ValueError

    if ground_truth is not one dimensional.

Returns:
  • accuracy( Number ) –

    accuracy between 0 and 100 for a model.

src/imitation_datasets/dataset/metrics.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def accuracy(prediction: Tensor, ground_truth: Tensor) -> Number:
    """Compute the accuracy for a model. The accuracy returned is the percentage from 0 to 100.

    Args:
        prediction (torch.Tensor): logits from a model.
        ground_truth (torch.Tensor): ground truth class.

    Raises:
        ValueError: if predictions and ground_truth are not torch.Tensor.
        ValueError: if predictions are not two dimensional.
        ValueError: if ground_truth is not one dimensional.

    Returns:
        accuracy (Number): accuracy between 0 and 100 for a model.
    """

    if not isinstance(prediction, Tensor) or not isinstance(ground_truth, Tensor):
        raise ValueError("'prediction' and 'ground truth' should be a tensor")

    if len(prediction.size()) != 2:
        raise ValueError("'prediction' and 'ground truth' need to be 2 dimensional.")

    if len(ground_truth.size()) != 1:
        raise ValueError("'ground truth' need to be 1 dimensional.")

    return ((argmax(prediction, 1) == ground_truth).sum().item() / ground_truth.size(0)) * 100