27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 | class ABCO(BCO):
"""Augmented Behavioural Cloning from Observation method based on (Monteiro et. al., 2020)"""
__version__ = "1.0.0"
__author__ = "Monteiro et. al."
__method_name__ = "Augmented Behavioural Cloning from Observation"
def __init__(
self,
environment: Env,
enjoy_criteria: int = 100,
verbose: bool = False,
config_file: str = None
) -> None:
if config_file is None:
config_file = CONFIG_FILE
super().__init__(environment, enjoy_criteria, verbose, config_file)
self.save_path = f"./tmp/abco/{self.environment_name}/"
def train(
self,
n_epochs: int,
train_dataset: Dict[str, DataLoader],
eval_dataset: Dict[str, DataLoader] = None,
folder: str = None
) -> Self:
if folder is None:
folder = f"../benchmark_results/abco/{self.environment_name}"
super().train(
n_epochs,
train_dataset,
eval_dataset,
folder
)
return self
def _append_samples(self, train_dataset: DataLoader) -> DataLoader:
"""Append samples to DataLoader.
Args:
train_dataset (DataLoader): current train dataset.
Returns:
train_dataset (DataLoader): new train dataset.
"""
metrics, i_pos = self._enjoy(return_ipos=True)
i_pos_ratio = metrics.get('success_rate', 0)
idm_ratio = 1 - i_pos_ratio
if i_pos_ratio == 0:
return train_dataset
i_pos_size = i_pos["states"].shape[0]
idm_size = train_dataset['idm_dataset'].dataset.states.shape[0]
i_pos_k = max(0, int(i_pos_size * i_pos_ratio))
idm_k = max(0, int(idm_size * idm_ratio))
i_pos_idx = torch.multinomial(torch.tensor(range(i_pos_size)).float(), i_pos_k)
try:
idm_idx = torch.multinomial(torch.tensor(range(idm_size)).float(), idm_k)
except RuntimeError:
idm_idx = []
train_dataset['idm_dataset'].dataset.states = torch.cat((
train_dataset['idm_dataset'].dataset.states[idm_idx],
torch.from_numpy(i_pos['states'])[i_pos_idx]),
dim=0
)
train_dataset['idm_dataset'].dataset.next_states = torch.cat((
train_dataset['idm_dataset'].dataset.next_states[idm_idx],
torch.from_numpy(i_pos['next_states'])[i_pos_idx]),
dim=0
)
train_dataset['idm_dataset'].dataset.actions = torch.cat((
train_dataset['idm_dataset'].dataset.actions[idm_idx],
torch.from_numpy(i_pos['actions'].reshape((-1, 1)))[i_pos_idx]),
dim=0
)
return train_dataset
def _enjoy(
self,
render: bool = False,
teacher_reward: Number = None,
random_reward: Number = None,
return_ipos: bool = False,
) -> Union[Metrics, Tuple[Metrics, Dict[str, List[float]]]]:
"""Function for evaluation of the policy in the environment
Args:
render (bool): Whether it should render. Defaults to False.
teacher_reward (Number): reward for teacher policy.
random_reward (Number): reward for a random policy.
return_ipos (bool): whether it should return data to append to I_pos.
Returns:
Metrics:
aer (Number): average reward for 100 episodes.
aer_std (Number): standard deviation for aer.
performance (Number): if teacher_reward and random_reward are
informed than the performance metric is calculated.
perforamance_std (Number): standard deviation for performance.
success_rate (float): percentage that the agent reached the goal.
I_pos:
states (List[Number]): states before action.
actions (List[Number]): action given states.
next_states (List[Number]): next state given states and actions.
"""
environment = GymWrapper(self.environment)
average_reward = []
i_pos = defaultdict(list)
success_rate = []
for _ in range(100):
done = False
obs = environment.reset()
accumulated_reward = 0
goal = False
while not done:
if render:
environment.render()
action = self.predict(obs)
i_pos['states'].append(obs)
i_pos['actions'].append(action)
gym_return = environment.step(action)
obs, reward, done, *_ = gym_return
accumulated_reward += reward
goal |= reached_goal(self.environment_name, gym_return, accumulated_reward)
i_pos['next_states'].append(obs)
average_reward.append(accumulated_reward)
success_rate.append(goal)
metrics = average_episodic_reward(average_reward)
if teacher_reward is not None and random_reward is not None:
metrics.update(performance(
average_reward, teacher_reward, random_reward))
metrics['success_rate'] = np.mean(success_rate)
i_pos = {key: np.array(value) for key, value in i_pos.items()}
if return_ipos:
return metrics, i_pos
return metrics
|