注意
转到末尾下载完整的示例代码。
使用 Replay Buffers¶
作者: Vincent Moens
重放缓冲区是任何 RL 或控制算法的核心部分。 监督学习方法通常以训练循环为特征 其中,数据从静态数据集中随机提取并连续馈送 添加到模型和损失函数中。 在 RL 中,情况通常略有不同:数据是使用 模型,然后临时存储在动态结构中(体验 replay buffer),它用作 loss 模块的 dataset。
与往常一样,使用缓冲区的上下文极大地限制了 它是如何构建的:有些人可能希望在其他人想要的时候存储轨迹 来存储单个过渡。特定的采样策略可能更可取 在上下文中:某些项可以比其他项具有更高的优先级,或者可以 重要的是有或没有更换的样品。 计算因素也可能起作用,例如缓冲区的大小 这可能会超过可用的 RAM 存储空间。
由于这些原因,TorchRL 的重放缓冲区是完全可组合的:尽管 它们带有 “附带电池”,只需最少的工作量即可构建, 它们还支持许多自定义,例如存储类型、 采样策略或数据转换。
在本教程中,您将学习:
- 如何构建 Replay Buffer (RB) 并将其与 任何数据类型; 
- 如何自定义缓冲区的存储; 
- 如何将 RB 与 TensorDict 一起使用; 
- 如何从重放缓冲区采样或迭代, 以及如何定义采样策略; 
- 如何使用优先重放缓冲区; 
- 如何转换传入和传出的数据 缓冲区; 
- 如何在缓冲区中存储轨迹。 
基础知识:构建原版重放缓冲区¶
TorchRL 的重放缓冲区旨在优先考虑模块化、 可组合性、效率和简单性。例如,创建一个基本的 Replay Buffer 是一个简单的过程,如下所示 例:
import tempfile
from torchrl.data import ReplayBuffer
buffer = ReplayBuffer()
默认情况下,此重放缓冲区的大小为 1000。让我们检查一下
通过使用extend()方法:
print("length before adding elements:", len(buffer))
buffer.extend(range(2000))
print("length after adding elements:", len(buffer))
length before adding elements: 0
length after adding elements: 1000
我们使用了extend()方法,即
旨在一次添加多个项目。如果传递的对象
to 具有多个维度,则其第一个维度为
被认为是要在缓冲区中拆分为单独元素的那个。extend
这实质上意味着,当添加多维张量或 tensordicts 添加到缓冲区中,缓冲区将只查看第一维 计算它保存在内存中的元素时。 如果对象传递给它 not iterable,则会引发异常。
要一次添加一个项目,add()方法
应该改用。
自定义存储¶
我们看到缓冲区的上限已限制为前 1000 个元素,我们 传递给它。 要更改大小,我们需要自定义我们的存储。
TorchRL 提出了三种类型的存储:
- stores 元素独立位于 列表。它支持任何数据类型,但这种灵活性是有代价的 效率; - ListStorage
- 存储张量数据 结构。 它与 (或 ) 自然合作 对象。存储在每个张量的基础上是连续的,这意味着 采样会比使用列表时更有效,但是 隐式限制是传递给它的任何数据都必须具有相同的 基本属性(如 shape 和 dtype)作为第一批数据,其中 用于实例化缓冲区。 传递不符合此要求的数据将引发 exception 或导致某些未定义的行为。 - LazyTensorStorage- TensorDict- tensorclass
- 它的工作原理是因为它是 lazy (即 it 期望第一批数据被实例化),并且它需要 data ,则每个存储的批次的 shape 和 dtype 都匹配。是什么造就了 存储的独特之处在于它指向磁盘文件(或使用文件系统 存储),这意味着它可以支持非常大的数据集,同时仍然 以连续方式访问数据。 - LazyMemmapStorage- LazyTensorStorage
让我们看看如何使用这些存储中的每一个:
from torchrl.data import LazyMemmapStorage, LazyTensorStorage, ListStorage
# We define the maximum size of the buffer
size = 100
具有 list storage buffer 的缓冲区可以存储任何类型的数据(但我们必须
更改 the ,因为默认值需要数字数据):collate_fn
buffer_list = ReplayBuffer(storage=ListStorage(size), collate_fn=lambda x: x)
buffer_list.extend(["a", 0, "b"])
print(buffer_list.sample(3))
['a', 0, 'b']
因为它是假设量最低的那个,所以它是 TorchRL 中的默认存储。ListStorage
A 可以连续存储数据。
在处理复杂但
中等大小的不变数据结构:LazyTensorStorage
buffer_lazytensor = ReplayBuffer(storage=LazyTensorStorage(size))
让我们创建一批具有 2 个张量的大小数据
存储在其中:torch.Size([3])
import torch
from tensordict import TensorDict
data = TensorDict(
    {
        "a": torch.arange(12).view(3, 4),
        ("b", "c"): torch.arange(15).view(3, 5),
    },
    batch_size=[3],
)
print(data)
TensorDict(
    fields={
        a: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([3]),
            device=None,
            is_shared=False)},
    batch_size=torch.Size([3]),
    device=None,
    is_shared=False)
对extend()将
实例化 Storage。数据的第一个维度被解绑到
单独的数据点:
buffer_lazytensor.extend(data)
print(f"The buffer has {len(buffer_lazytensor)} elements")
The buffer has 3 elements
让我们从缓冲区中采样,并打印数据:
sample = buffer_lazytensor.sample(5)
print("samples", sample["a"], sample["b", "c"])
samples tensor([[ 0,  1,  2,  3],
        [ 0,  1,  2,  3],
        [ 0,  1,  2,  3],
        [ 8,  9, 10, 11],
        [ 0,  1,  2,  3]]) tensor([[ 0,  1,  2,  3,  4],
        [ 0,  1,  2,  3,  4],
        [ 0,  1,  2,  3,  4],
        [10, 11, 12, 13, 14],
        [ 0,  1,  2,  3,  4]])
A 的创建方式相同:LazyMemmapStorage
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size))
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazytensor.sample(5)
print("samples: a=", sample["a"], "\n('b', 'c'):", sample["b", "c"])
The buffer has 3 elements
samples: a= tensor([[ 8,  9, 10, 11],
        [ 4,  5,  6,  7],
        [ 0,  1,  2,  3],
        [ 4,  5,  6,  7],
        [ 0,  1,  2,  3]])
('b', 'c'): tensor([[10, 11, 12, 13, 14],
        [ 5,  6,  7,  8,  9],
        [ 0,  1,  2,  3,  4],
        [ 5,  6,  7,  8,  9],
        [ 0,  1,  2,  3,  4]])
我们还可以自定义磁盘上的存储位置:
tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size, scratch_dir=tempdir))
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
print("the 'a' tensor is stored in", buffer_lazymemmap._storage._storage["a"].filename)
print(
    "the ('b', 'c') tensor is stored in",
    buffer_lazymemmap._storage._storage["b", "c"].filename,
)
The buffer has 3 elements
the 'a' tensor is stored in /pytorch/rl/docs/source/reference/generated/tutorials/<TemporaryDirectory '/tmp/tmpq45hu6fp'>/a.memmap
the ('b', 'c') tensor is stored in /pytorch/rl/docs/source/reference/generated/tutorials/<TemporaryDirectory '/tmp/tmpq45hu6fp'>/b/c.memmap
与 TensorDict 集成¶
张量位置遵循与 TensorDict 相同的结构,其中 包含它们:这使得在训练期间保存和加载缓冲区变得容易。
充分利用数据载体
potential、TensorDictTensorDictReplayBuffer类可以
被使用。
它的主要优势之一是它能够处理抽样的组织
数据,以及可能需要的任何其他信息
(例如样本索引)。
它可以按照与标准相同的方式构建ReplayBuffer并且可以
通常可以互换使用。
from torchrl.data import TensorDictReplayBuffer
tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = TensorDictReplayBuffer(
    storage=LazyMemmapStorage(size, scratch_dir=tempdir), batch_size=12
)
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazymemmap.sample()
print("sample:", sample)
The buffer has 3 elements
sample: TensorDict(
    fields={
        a: Tensor(shape=torch.Size([12, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([12, 5]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([12]),
            device=cpu,
            is_shared=False),
        index: Tensor(shape=torch.Size([12]), device=cpu, dtype=torch.int64, is_shared=False)},
    batch_size=torch.Size([12]),
    device=cpu,
    is_shared=False)
我们的示例现在有一个额外的键,用于指示哪些索引
进行了抽样。
让我们来看看这些指数:"index"
print(sample["index"])
tensor([0, 2, 0, 2, 2, 1, 1, 0, 1, 0, 1, 0])
与 tensorclass 集成¶
ReplayBuffer 类和关联的子类也可以与类本机一起使用,这些类可以方便地用于
以更明确的方式对数据集进行编码:tensorclass
from tensordict import tensorclass
@tensorclass
class MyData:
    images: torch.Tensor
    labels: torch.Tensor
data = MyData(
    images=torch.randint(
        255,
        (10, 64, 64, 3),
    ),
    labels=torch.randint(100, (10,)),
    batch_size=[10],
)
tempdir = tempfile.TemporaryDirectory()
buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size, scratch_dir=tempdir), batch_size=12
)
buffer_lazymemmap.extend(data)
print(f"The buffer has {len(buffer_lazymemmap)} elements")
sample = buffer_lazymemmap.sample()
print("sample:", sample)
The buffer has 10 elements
sample: MyData(
    images=Tensor(shape=torch.Size([12, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([12]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([12]),
    device=cpu,
    is_shared=False)
不出所料。数据具有适当的类和形状!
与其他张量结构 (PyTrees) 集成¶
TorchRL 的重放缓冲区也适用于任何 pytree 数据结构。
PyTree 是由字典、列表和/或
元组,其中叶子是张量。
这意味着可以在连续内存中存储任何这样的树结构!
可以使用各种存储:TensorStorage,LazyMemmapStorage或LazyTensorStorage所有人都接受这个
类型的数据。
以下是此功能的简要演示:
from torch.utils._pytree import tree_map
让我们在磁盘上构建我们的重放缓冲区:
rb = ReplayBuffer(storage=LazyMemmapStorage(size))
data = {
    "a": torch.randn(3),
    "b": {"c": (torch.zeros(2), [torch.ones(1)])},
    30: -torch.ones(()),  # non-string keys also work
}
rb.add(data)
# The sample has a similar structure to the data (with a leading dimension of 10 for each tensor)
sample = rb.sample(10)
使用 pytrees ,任何可调用对象都可以用作转换:
def transform(x):
    # Zeros all the data in the pytree
    return tree_map(lambda y: y * 0, x)
rb.append_transform(transform)
sample = rb.sample(batch_size=12)
让我们检查一下我们的 transform 是否完成了它的工作:
def assert0(x):
    assert (x == 0).all()
tree_map(assert0, sample)
{'a': None, 'b': {'c': (None, [None])}, 30: None}
对缓冲区进行采样和迭代¶
Replay Buffers 支持多种采样策略:
- 如果 batch-size 是固定的并且可以在构造时定义,则可以 作为 keyword 参数传递给缓冲区; 
- 使用固定的 batch-size,可以迭代重放缓冲区以收集 样品; 
- 如果 batch-size 是动态的,则可以将其传递给 - sample方法 即时。
可以使用多线程进行采样,但这与 last 选项(它要求缓冲区提前知道 next batch) 的
让我们看几个例子:
固定批量大小¶
如果在构造期间传递了 batch-size,则应在 采样:
data = MyData(
    images=torch.randint(
        255,
        (200, 64, 64, 3),
    ),
    labels=torch.randint(100, (200,)),
    batch_size=[200],
)
buffer_lazymemmap = ReplayBuffer(storage=LazyMemmapStorage(size), batch_size=128)
buffer_lazymemmap.extend(data)
buffer_lazymemmap.sample()
MyData(
    images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([128]),
    device=cpu,
    is_shared=False)
这批数据的大小符合我们的预期 (128)。
要启用多线程采样,只需在构造期间将正整数传递给 keyword 参数即可。这应该会加快速度
采样相当多(例如,当
使用优先采样器):prefetch
buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size), batch_size=128, prefetch=10
)  # creates a queue of 10 elements to be prefetched in the background
buffer_lazymemmap.extend(data)
print(buffer_lazymemmap.sample())
MyData(
    images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([128]),
    device=cpu,
    is_shared=False)
迭代具有固定 batch-size 的缓冲区¶
我们也可以像使用常规 dataloader 中,只要 batch-size 是预定义的:
for i, data in enumerate(buffer_lazymemmap):
    if i == 3:
        print(data)
        break
MyData(
    images=Tensor(shape=torch.Size([128, 64, 64, 3]), device=cpu, dtype=torch.int64, is_shared=False),
    labels=Tensor(shape=torch.Size([128]), device=cpu, dtype=torch.int64, is_shared=False),
    batch_size=torch.Size([128]),
    device=cpu,
    is_shared=False)
由于我们的抽样技术是完全随机的,并且不会
防止替换,则有问题的迭代器是无限的。但是,我们可以
使用SamplerWithoutReplacement相反,它会将我们的缓冲区转换为有限迭代器:
from torchrl.data.replay_buffers.samplers import SamplerWithoutReplacement
buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size), batch_size=32, sampler=SamplerWithoutReplacement()
)
我们创建一个足够大的数据来获取几个样本
data = TensorDict(
    {
        "a": torch.arange(64).view(16, 4),
        ("b", "c"): torch.arange(128).view(16, 8),
    },
    batch_size=[16],
)
buffer_lazymemmap.extend(data)
for _i, _ in enumerate(buffer_lazymemmap):
    continue
print(f"A total of {_i+1} batches have been collected")
A total of 1 batches have been collected
动态批量大小¶
与我们之前看到的相反,关键字
参数可以省略并直接传递给方法:batch_sizesample
buffer_lazymemmap = ReplayBuffer(
    storage=LazyMemmapStorage(size), sampler=SamplerWithoutReplacement()
)
buffer_lazymemmap.extend(data)
print("sampling 3 elements:", buffer_lazymemmap.sample(3))
print("sampling 5 elements:", buffer_lazymemmap.sample(5))
sampling 3 elements: TensorDict(
    fields={
        a: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([3, 8]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([3]),
            device=cpu,
            is_shared=False)},
    batch_size=torch.Size([3]),
    device=cpu,
    is_shared=False)
sampling 5 elements: TensorDict(
    fields={
        a: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.int64, is_shared=False),
        b: TensorDict(
            fields={
                c: Tensor(shape=torch.Size([5, 8]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)
优先重放缓冲区¶
TorchRL 还为优先重放缓冲区提供了一个接口。 此缓冲区类根据传递的优先级信号对数据进行采样 通过数据。
尽管此工具与非 tensordict 数据兼容,但我们鼓励 改用 TensorDict,因为它可以将元数据导入 毫不费力地从缓冲区中出来。
让我们首先看看如何在泛型 箱。和 超参数 必须手动设置:
from torchrl.data.replay_buffers.samplers import PrioritizedSampler
size = 100
rb = ReplayBuffer(
    storage=ListStorage(size),
    sampler=PrioritizedSampler(max_capacity=size, alpha=0.8, beta=1.1),
    collate_fn=lambda x: x,
)
扩展重放缓冲区将返回 items 索引,我们将需要 稍后更新优先级:
indices = rb.extend([1, "foo", None])
采样器期望每个元素都有一个优先级。添加到 buffer,则 priority 设置为默认值 1。一旦优先级具有 被计算(通常通过 loss),则必须在缓冲区中更新。
这是通过 method 完成的,该方法需要 indices 和 priority。
我们人为地为数据集中的第二个样本分配了高优先级
要观察它对采样的影响,请执行以下作:update_priority()
rb.update_priority(index=indices, priority=torch.tensor([0, 1_000, 0.1]))
我们观察到,从缓冲区采样主要返回第二个样本
():"foo"
sample, info = rb.sample(10, return_info=True)
print(sample)
['foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo']
该信息包含项目的相对权重以及索引。
print(info)
{'_weight': tensor([2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10,
        2.0893e-10, 2.0893e-10, 2.0893e-10, 2.0893e-10]), 'index': tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])}
我们看到,使用优先重放缓冲区需要一系列额外的 训练循环中的步骤与常规缓冲区的比较:
- 在收集数据并扩展缓冲区后, 项目必须更新; 
- 在计算损失并从中获得“优先信号”之后,我们必须 再次更新缓冲区中项目的优先级。 这需要我们跟踪指数。 
这极大地阻碍了缓冲区的可重用性:如果要编写 一个训练脚本,其中 prioritized 缓冲区和 regular 缓冲区都可以是 created,她必须添加大量的控制流来确保 在适当的位置调用适当的方法,如果和 仅当使用优先缓冲区时。
让我们看看如何使用 .
我们看到,TensorDictTensorDictReplayBuffer返回数据
使用它们的相对存储索引进行增强。我们没有提到的一个功能
是这个类还保证了优先级
信号会自动解析为优先采样器(如果存在),则
外延。
这些功能的组合以多种方式简化了作: - 当扩展缓冲区时,优先级信号会自动为
如果存在,则解析,并且将准确分配优先级;
- 索引将存储在采样的 tensordict 中,因此很容易 在损失计算后更新 Priority。 
- 在计算损失时,优先信号将记录在 tensordict 传递给 loss 模块,从而可以更新 毫不费力的重量: - ..代码 - block::P ython - >>> data = replay_buffer.sample() >>> loss_val = loss_module(data) >>> replay_buffer.update_tensordict_priority(data) 
下面的代码说明了这些概念。我们使用 优先采样器,并在构造函数中指示其中 应该获取 priority 信号:
rb = TensorDictReplayBuffer(
    storage=ListStorage(size),
    sampler=PrioritizedSampler(size, alpha=0.8, beta=1.1),
    priority_key="td_error",
    batch_size=1024,
)
我们选择一个与存储索引成正比的优先级信号:
data["td_error"] = torch.arange(data.numel())
rb.extend(data)
sample = rb.sample()
较高的指数应该更频繁地出现:
from matplotlib import pyplot as plt
plt.hist(sample["index"].numpy())

(array([108.,  54., 128.,  80., 124.,  80., 127.,  56., 127., 140.]), array([ 0. ,  1.5,  3. ,  4.5,  6. ,  7.5,  9. , 10.5, 12. , 13.5, 15. ]), <BarContainer object of 10 artists>)
处理完示例后,我们使用
方法。
为了展示它是如何工作的,让我们恢复
采样项目:torchrl.data.TensorDictReplayBuffer.update_tensordict_priority()
sample = rb.sample()
sample["td_error"] = data.numel() - sample["index"]
rb.update_tensordict_priority(sample)
现在,较高的索引出现的频率应该较低:
sample = rb.sample()
from matplotlib import pyplot as plt
plt.hist(sample["index"].numpy())

(array([223., 102., 189.,  67., 145.,  53., 107.,  47.,  58.,  33.]), array([ 0. ,  1.5,  3. ,  4.5,  6. ,  7.5,  9. , 10.5, 12. , 13.5, 15. ]), <BarContainer object of 10 artists>)
使用转换¶
存储在重放缓冲区中的数据可能尚未准备好呈现给
loss 模块。
在某些情况下,收集器生成的数据可能太重而无法承受
按原样保存。这方面的示例包括将图像从 转换为
浮点张量,或者在使用
决策转换器。uint8
只需将 适当的转换。 以下是一些示例:
保存 Raw 图像¶
uint8-typed 张量的内存消耗比
我们通常提供给模型的浮点张量。因此,
保存 Raw 图像可能很有用。
以下脚本显示了如何构建一个仅返回
原始图像,但使用转换后的图像进行推理,以及这些图像如何
转换可以在 Replay Buffer 中回收:
from torchrl.collectors import SyncDataCollector
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
    Compose,
    GrayScale,
    Resize,
    ToTensorImage,
    TransformedEnv,
)
from torchrl.envs.utils import RandomPolicy
env = TransformedEnv(
    GymEnv("CartPole-v1", from_pixels=True),
    Compose(
        ToTensorImage(in_keys=["pixels"], out_keys=["pixels_trsf"]),
        Resize(in_keys=["pixels_trsf"], w=64, h=64),
        GrayScale(in_keys=["pixels_trsf"]),
    ),
)
让我们看一下推出:
print(env.rollout(3))
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([3, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([3, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([3]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([3, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([3, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([3]),
    device=None,
    is_shared=False)
我们刚刚创建了一个生成像素的环境。这些图像 进行处理以馈送到策略。 我们希望存储原始图像,而不是它们的转换。 为此,我们将向收集器附加一个转换以选择键 我们希望看到出现:
from torchrl.envs.transforms import ExcludeTransform
collector = SyncDataCollector(
    env,
    RandomPolicy(env.action_spec),
    frames_per_batch=10,
    total_frames=1000,
    postproc=ExcludeTransform("pixels_trsf", ("next", "pixels_trsf"), "collector"),
)
让我们看一下一批数据,并控制键是否已被丢弃:"pixels_trsf"
for data in collector:
    print(data)
    break
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=None,
    is_shared=False)
我们创建一个具有与环境相同转换的重放缓冲区。
然而,有一个细节需要解决:转换
在没有环境的情况下使用时,不会注意到数据结构。
将转换附加到环境时,嵌套 tensordict 中的数据首先被转换,然后在
转出执行。在处理静态数据时,情况并非如此。
尽管如此,我们的数据带有一个嵌套的 “next” tensordict,它将是
如果我们没有明确指示它处理
它。我们手动将这些键添加到转换中:"next"
t = Compose(
    ToTensorImage(
        in_keys=["pixels", ("next", "pixels")],
        out_keys=["pixels_trsf", ("next", "pixels_trsf")],
    ),
    Resize(in_keys=["pixels_trsf", ("next", "pixels_trsf")], w=64, h=64),
    GrayScale(in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
)
rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(1000), transform=t, batch_size=16)
rb.extend(data)
tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
我们可以检查方法是否看到转换后的图像重新出现:sample
print(rb.sample())
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([16, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        done: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        index: Tensor(shape=torch.Size([16]), device=cpu, dtype=torch.int64, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([16, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([16, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([16]),
            device=cpu,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([16, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([16, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([16, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([16]),
    device=cpu,
    is_shared=False)
更复杂的示例:使用 CatFrames¶
这CatFramestransform 展开观察结果
随着时间的推移,创建过去事件的 n-back 记忆,使模型
将过去的事件考虑在内(对于 POMDP 或
循环策略,例如 Decision Transformers)。将这些串联存储
帧可能会消耗大量内存。它也可以是
当 n-back 窗口需要不同时(通常更长)时出现问题
在训练和推理期间。我们通过在两个阶段中分别执行转换来解决此问题。CatFrames
from torchrl.envs import CatFrames, UnsqueezeTransform
我们为返回基于像素的环境创建一个标准的转换列表 观察:
env = TransformedEnv(
    GymEnv("CartPole-v1", from_pixels=True),
    Compose(
        ToTensorImage(in_keys=["pixels"], out_keys=["pixels_trsf"]),
        Resize(in_keys=["pixels_trsf"], w=64, h=64),
        GrayScale(in_keys=["pixels_trsf"]),
        UnsqueezeTransform(-4, in_keys=["pixels_trsf"]),
        CatFrames(dim=-4, N=4, in_keys=["pixels_trsf"]),
    ),
)
collector = SyncDataCollector(
    env,
    RandomPolicy(env.action_spec),
    frames_per_batch=10,
    total_frames=1000,
)
for data in collector:
    print(data)
    break
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([10]),
            device=None,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([10]),
    device=None,
    is_shared=False)
缓冲区转换看起来与环境转换非常相似,但使用
像以前一样额外的键:("next", ...)
t = Compose(
    ToTensorImage(
        in_keys=["pixels", ("next", "pixels")],
        out_keys=["pixels_trsf", ("next", "pixels_trsf")],
    ),
    Resize(in_keys=["pixels_trsf", ("next", "pixels_trsf")], w=64, h=64),
    GrayScale(in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
    UnsqueezeTransform(-4, in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
    CatFrames(dim=-4, N=4, in_keys=["pixels_trsf", ("next", "pixels_trsf")]),
)
rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(size), transform=t, batch_size=16)
data_exclude = data.exclude("pixels_trsf", ("next", "pixels_trsf"))
rb.add(data_exclude)
0
让我们从缓冲区中采样一批。变换后的形状 像素键的长度应为 4,沿第 4 个维度从 结束:
s = rb.sample(1)  # the buffer has only one element
print(s)
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1, 10, 2]), device=cpu, dtype=torch.int64, is_shared=False),
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([1, 10]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([1, 10]),
            device=cpu,
            is_shared=False),
        done: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        index: Tensor(shape=torch.Size([1, 10]), device=cpu, dtype=torch.int64, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                pixels: Tensor(shape=torch.Size([1, 10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
                pixels_trsf: Tensor(shape=torch.Size([1, 10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                truncated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([1, 10]),
            device=cpu,
            is_shared=False),
        pixels: Tensor(shape=torch.Size([1, 10, 400, 600, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
        pixels_trsf: Tensor(shape=torch.Size([1, 10, 4, 1, 64, 64]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        truncated: Tensor(shape=torch.Size([1, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([1, 10]),
    device=cpu,
    is_shared=False)
经过一些处理(不包括未使用的 key 等),我们看到 线上线下生成的数据匹配!
assert (data.exclude("collector") == s.squeeze(0).exclude("index", "collector")).all()
存储轨迹¶
在许多情况下,最好从缓冲区访问 trajectes 而不是 比简单的过渡。TorchRL 提供了多种实现此目的的方法。
目前首选的方法是沿第一个
维度,并使用 a 来
对这些批次的数据进行采样。此类只需要几个信息
关于您的数据结构来完成它的工作(并不是说到目前为止它只是
compatible with tensordict-structured data):切片的数量或其
length 和一些关于
episodes 中(例如,回想一下,使用 DataCollector 时,轨迹 ID 存储在 中)。在这个简单的示例中,我们构造一个数据
具有 4 个连续的短轨迹,并从中采样 4 个切片,每个切片
长度 2(因为批量大小为 8,8 个项目 // 4 个切片 = 2 个时间步)。
我们也标记了步骤。SliceSampler("collector", "traj_ids")
from torchrl.data import SliceSampler
rb = TensorDictReplayBuffer(
    storage=LazyMemmapStorage(size),
    sampler=SliceSampler(traj_key="episode", num_slices=4),
    batch_size=8,
)
episode = torch.zeros(10, dtype=torch.int)
episode[:3] = 1
episode[3:5] = 2
episode[5:7] = 3
episode[7:] = 4
steps = torch.cat([torch.arange(3), torch.arange(2), torch.arange(2), torch.arange(3)])
data = TensorDict(
    {
        "episode": episode,
        "obs": torch.randn((3, 4, 5)).expand(10, 3, 4, 5),
        "act": torch.randn((20,)).expand(10, 20),
        "other": torch.randn((20, 50)).expand(10, 20, 50),
        "steps": steps,
    },
    [10],
)
rb.extend(data)
sample = rb.sample()
print("episode are grouped", sample["episode"])
print("steps are successive", sample["steps"])
episode are grouped tensor([1, 1, 4, 4, 3, 3, 3, 3], dtype=torch.int32)
steps are successive tensor([0, 1, 1, 2, 0, 1, 0, 1])
结论¶
我们已经看到了如何在 TorchRL 中使用重放缓冲区,从最简单的 用于需要转换或存储数据的更高级的 特别是以特定的方式。 您现在应该能够:
- 创建 Replay Buffer,自定义其存储、采样器和变换; 
- 为您的问题选择最佳存储类型(基于列表、内存或磁盘); 
- 最大限度地减少缓冲区的内存占用。 
后续步骤¶
- 查看数据 API 参考,了解 TorchRL 中的离线数据集。 它们基于我们的 Replay Buffer API; 
- 检查其他采样器,例如 、 和 或其他写入器 如。 - SamplerWithoutReplacement- PrioritizedSliceSampler- SliceSamplerWithoutReplacement- TensorDictMaxValueWriter
- 在文档中查看如何对 ReplayBuffers 进行检查点作。 
脚本总运行时间:(3 分 8.128 秒)
估计内存使用量:471 MB