amago.loading#

Load training data.

Functions

RLData_pad_collate(samples)

Pads the sequence data of a list of RLData gathered by dataloaders.

get_path_to_trajs(dset_root, dset_name, fifo)

load_traj_from_disk(path)

Loads a trajectory from disk.

Classes

Batch(obs, rl2s, rews, dones, actions, time_idxs)

Keeps data organized during training step

DiskTrajDataset(dset_root, dset_name, ...[, ...])

Imitates the typical FIFO replay buffer but stores sequence data on disk.

DoNothingDataset()

A dataset that does nothing and is never ready for training.

MixtureOfDatasets(datasets, sampling_weights)

Sample from a mixture of datasets.

RLData(obs, rews, dones, actions, time_idxs)

Stores an individual training example (sequence).

RLDataset([dset_name])

Abstract base class that defines what Experiment expects from its pytorch Dataset.

class Batch(obs, rl2s, rews, dones, actions, time_idxs)[source]#

Bases: object

Keeps data organized during training step

actions: Tensor#
dones: Tensor#
obs: dict[Tensor]#
rews: Tensor#
rl2s: Tensor#
time_idxs: Tensor#
to(device)[source]#
class DiskTrajDataset(dset_root, dset_name, dset_max_size, dset_min_size=1, relabeler=None)[source]#

Bases: RLDataset

Imitates the typical FIFO replay buffer but stores sequence data on disk.

Tip

This class is @gin.configurable. Default values of kwargs can be overridden using gin.

Tells the parallel actors to write finished trajectories to a buffer directory and then deletes the oldest trajectories when the dataset exceeds a max size. Also creates a “protected” directory where trajectory data from previous runs or demonstrations can be stored and sampled from without being deleted (in the style of DQfD and so on).

Creates a dataset in the structure:

dset_root/
└── dset_name/
    └── buffer/
        ├── fifo/
           └── # .traj or .npz files here
        └── protected/
            └── # .traj or .npz files here
Parameters:
  • dset_root (str) – The root directory to store the dataset.

  • dset_name (str) – The name of the dataset. Used to create a buffer subdirectory and to identify the dataset in logging metrics.

  • dset_max_size (int) – The maximum number of trajectories to keep in the FIFO buffer.

  • dset_min_size (int) – The minimum number of trajectories that need to be present before training updates on this dataset can begin.

  • relabeler (Relabeler | None) – A function that edits/relabels trajectory data when loaded. (amago.hindsight.Relabeler).

delete(delete_protected=False)[source]#

Called when the user wants to delete the dataset (from disk?).

get_description()[source]#

Returns a string describing the dataset. Printed as part of the hparam summary when the experiment starts.

Return type:

str

on_end_of_collection(experiment)[source]#

Implements the FIFO buffer behavior.

Return type:

dict[str, Any]

property ready_for_training: bool#

Whether the dataset is ready to be used for training. If False, Experiment will keep collecting data until learning updates can begin.

sample_random_trajectory()[source]#

Prepare a (random) full trajectory or at least the maximum sequence length of the dataset. Sampling random subsets up to the policy’s sequence length is handled automatically.

Return type:

RLData

property save_new_trajs_to: str | None#

Tells the parallel actors where to write new trajectories.

If None, the actors will not write new trajectories to disk.

class DoNothingDataset[source]#

Bases: RLDataset

A dataset that does nothing and is never ready for training.

Useful for repurposing Experiment to manage inference or eval only.

get_description()[source]#

Returns a string describing the dataset. Printed as part of the hparam summary when the experiment starts.

property ready_for_training#

Whether the dataset is ready to be used for training. If False, Experiment will keep collecting data until learning updates can begin.

sample_random_trajectory()[source]#

Prepare a (random) full trajectory or at least the maximum sequence length of the dataset. Sampling random subsets up to the policy’s sequence length is handled automatically.

Return type:

RLData

property save_new_trajs_to#

Tells the parallel actors where to write new trajectories.

If None, the actors will not write new trajectories to disk.

class MixtureOfDatasets(datasets, sampling_weights, smooth_sudden_starts=None, dset_name=None)[source]#

Bases: RLDataset

Sample from a mixture of datasets.

Parameters:
  • datasets (list[RLDataset]) – A list of RLDataset objects.

  • sampling_weights (list[float]) – Probability of sampling from each dataset. Must sum to 1.

  • smooth_sudden_starts (int | None) – When a dataset becomes ready for training mid-way through training, anneal its sampling_weight from 0 –> assigned sampling_weights[i] over smooth_sudden_starts epochs.

  • dset_name (str | None) – The name of the dataset. Used to identify the dataset in logging metrics. Defaults to the class name.

Note

Only samples from datasets that are currently ready_for_training(). For example, a mixture of some custom offline dataset and the DiskTrajDataset will only sample from the offline dataset until the DiskTrajDataset has written trajectories to disk. Sampling weights are renormalized amongst the datasets that are ready for training.

Only one dataset can direct the Experiment’s parallel actors to save new trajectories to disk (only one dataset’s dset.save_new_trajs_to is not None).

configure_from_experiment(experiment)[source]#

Call configure() with settings inferred from the main Experiment.

delete()[source]#

Called when the user wants to delete the dataset (from disk?).

get_description()[source]#

Returns a string describing the dataset. Printed as part of the hparam summary when the experiment starts.

Return type:

str

on_end_of_collection(experiment)[source]#

Callback for Experiment to call after each round of environment interaction / data collection, but before gradient updates resume.

Parameters:

epoch – The current epoch (in case there is some kind of schedule)

Return type:

dict[str, Any]

Returns a dictionary of metrics to log.

property ready_for_training: bool#

Whether the dataset is ready to be used for training. If False, Experiment will keep collecting data until learning updates can begin.

sample_random_trajectory()[source]#

Prepare a (random) full trajectory or at least the maximum sequence length of the dataset. Sampling random subsets up to the policy’s sequence length is handled automatically.

Return type:

RLData

property save_new_trajs_to: str | None#

Tells the parallel actors where to write new trajectories.

If None, the actors will not write new trajectories to disk.

update_dset_weights(epoch)[source]#
class RLData(obs, rews, dones, actions, time_idxs, rl2s=None)[source]#

Bases: object

Stores an individual training example (sequence).

Parameters:
  • obs (dict[str, torch.Tensor]) – A dictionary of observation tensors. The shape of each value is (T, Any), where T is the length of the sequence in timesteps.

  • rews (torch.FloatTensor) – A tensor of rewards. Shape: (T - 1, 1). There is no reward before the first timestep.

  • dones (torch.BoolTensor) – A tensor of done flags. Shape: (T - 1, 1). There is no done flag before the first timestep.

  • actions (torch.FloatTensor) – A tensor of actions. Shape: (T - 1, action dim). There is no action on the last (terminal) timestep.

  • time_idxs (torch.LongTensor) – A tensor of time indices. Shape: (T, 1). The global timestep counter of each timestep in the episode. If this is an entire episode, time_idxs is range(0, T).

  • rl2s (Optional[torch.FloatTensor]) – A tensor of previous action/reward pairs. Shape: (T, 1 + action dim). The previous reward concatenated with the previous action. Used to make these values input to the policy. If not provided, they will be generated by shifting the actions and rewards.

actions: FloatTensor#
dones: BoolTensor#
obs: dict[str, Tensor]#
random_slice(length, padded_sampling='none')[source]#

Randomly slices the sequence to a given length.

Parameters:
  • length (int) – The length of the sequence to sample.

  • padded_sampling (str) –

    The mode of padding to use. “none” –> sample a random start index and take the next length timesteps. Can bias against the timesteps at the end of the trajectory. “both” –> sample without bias against the start and end of the trajectory, but can lead to

    much more of the batch being padded/masked.

    ”left” –> sample while effectively padding the left side of the sequence for cases where the start of the training sequence is not always the first timestep of the trajectory. “right” –> sample while effectively padding the right side of the sequence for cases where the end of the trajectory may be undersampled.

rews: FloatTensor#
rl2s: FloatTensor | None = None#
time_idxs: LongTensor#
RLData_pad_collate(samples)[source]#

Pads the sequence data of a list of RLData gathered by dataloaders.

Parameters:

samples (list[RLData]) – A list of RLData objects.

Return type:

Batch

Returns:

A Batch object.

class RLDataset(dset_name=None)[source]#

Bases: ABC, Dataset

Abstract base class that defines what Experiment expects from its pytorch Dataset.

Allows the main training loop to be used with existing data sources (offline RL) that were not generated by our environment wrappers.

RLDatasets have no real length or fixed indices like typical pytorch Datasets. The “length” is determined by the update : data ratio of the experiment.

Datasets have a reference to the Experiment that is training on them, which can be used to access the Accelerator and other useful information.

Parameters:

dset_name (str | None) – The name of the dataset. Used to identify the dataset in logging metrics. Defaults to the class name.

check_configured()[source]#
configure(items_per_epoch, max_seq_len, padded_sampling, has_edit_rights)[source]#

Configure the dataset with additional hyperparameters.

We’ve basically split the constructor in two parts, because in practice we will be grabbing these settings from the main Experiment.

Parameters:
  • items_per_epoch (int) – The number of trajectoires to sample each epoch. Defines the end of dataloader iteration.

  • max_seq_len (int) – The maximum sequence length to sample.

  • max_seq_len. (padded_sampling. Strategy for sampling from trajectories longer than) – See random_slice() or Experiment docs for more details.

  • has_edit_rights (bool) – Whether the dataset has total authority to edit the underlying data (e.g., to delete trajectory files from disk). Sometimes turned off for async setups.

configure_from_experiment(experiment)[source]#

Call configure() with settings inferred from the main Experiment.

delete()[source]#

Called when the user wants to delete the dataset (from disk?).

abstract get_description()[source]#

Returns a string describing the dataset. Printed as part of the hparam summary when the experiment starts.

Return type:

str

on_end_of_collection(experiment)[source]#

Callback for Experiment to call after each round of environment interaction / data collection, but before gradient updates resume.

Parameters:

epoch – The current epoch (in case there is some kind of schedule)

Return type:

dict[str, Any]

Returns a dictionary of metrics to log.

property ready_for_training: bool#

Whether the dataset is ready to be used for training. If False, Experiment will keep collecting data until learning updates can begin.

abstract sample_random_trajectory()[source]#

Prepare a (random) full trajectory or at least the maximum sequence length of the dataset. Sampling random subsets up to the policy’s sequence length is handled automatically.

Return type:

RLData

abstract property save_new_trajs_to: str | None#

Tells the parallel actors where to write new trajectories.

If None, the actors will not write new trajectories to disk.

get_path_to_trajs(dset_root, dset_name, fifo)[source]#
Return type:

str

load_traj_from_disk(path)[source]#

Loads a trajectory from disk.

Switches between the available file formats of (“.traj” and “.npz”)

Parameters:

path (str) – The path to the trajectory file saved by our environment wrappers as part of online data collection in Experiment.

Return type:

Trajectory | FrozenTraj

Returns:

A Trajectory or FrozenTraj object.