转载自公众号:敢敢AUTOHUB
0. 摘要
本文深入解析由Meta AI、纽约大学和伯克利AI研究院联合提出的Navigation World Models (NWM)技术。NWM是一种可控视频生成模型,能够根据过去的视觉观测和导航动作预测未来的视觉状态。通过引入条件扩散Transformer架构(CDiT),NWM实现了10亿参数规模的训练,并在已知和未知环境中展现出强大的导航规划能力。本文将从技术原理、架构设计、代码实现到实验结果进行全面剖析,帮助读者理解这一前沿技术的核心思想与实现细节。
论文地址:https://arxiv.org/abs/2412.03572
GitHub项目地址:https://github.com/facebookresearch/nwm
1. 引言:什么是视觉导航?
1.1 导航的本质
导航是任何具有视觉和运动能力的智能体最基本的技能之一。无论是人类、动物还是机器人,要想在环境中自由移动,都需要解决一个核心问题:如何根据当前看到的画面,决定下一步应该往哪里走?对于人类来说,这似乎是一件非常自然的事情——我们看到前方有一扇门,就知道可以走过去;看到墙壁,就知道需要绕开。但对于机器人来说,这个看似简单的任务实际上涉及到复杂的感知、理解和决策过程。
目前主流的机器人视觉导航方法(如NoMaD、GNM等)采用的是监督学习的方式:收集大量人类或机器人的导航数据,然后训练一个神经网络来模仿这些行为。这种方法虽然在很多场景下效果不错,但存在两个根本性的问题。第一个问题是策略的"硬编码"特性——一旦模型训练完成,它的行为模式就固定了。如果我们希望机器人在某些情况下遵守新的规则(比如"不允许左转"或"必须绕开某个区域"),传统模型无法在运行时动态适应这些约束。第二个问题是计算资源无法灵活分配——面对简单任务和复杂任务,模型都使用相同的计算量,无法像人类那样"多想一会儿"来解决困难问题。
Navigation World Models (NWM)正是基于这一思想构建的导航系统。与之前的世界模型(如用于游戏的DIAMOND、GameNGen)不同,NWM面向的是真实世界的机器人导航场景,需要处理更加复杂和多变的环境。NWM的核心创新包括:提出了高效的条件扩散Transformer架构(CDiT),使得模型能够扩展到10亿参数规模;在多种机器人平台和环境上进行联合训练,实现了跨环境、跨机器人的泛化能力;支持约束感知的规划,可以在运行时动态引入新的导航约束;通过利用无标签的人类视频数据,提升了对未知环境的适应能力。
2. 预备知识:理解NWM的基础概念
2.1 什么是扩散模型?
在深入NWM之前,我们需要先理解扩散模型(Diffusion Model)这一关键技术。扩散模型是近年来在图像生成领域取得巨大成功的一类生成模型,其核心思想非常直观:想象你有一张清晰的照片,然后逐渐往上面加噪声,最终变成一团完全随机的雪花点;扩散模型要学习的就是这个过程的"逆过程"——如何从一团噪声中逐步恢复出清晰的图像。数学上,前向加噪过程可以表示为:给定原始数据 ,在时间步 的噪声版本为 ,其中 是标准高斯噪声, 是控制噪声程度的系数。模型的任务是学习一个去噪网络,能够从 预测出 或者噪声 。
2.2 什么是Transformer?
Transformer是另一个需要理解的关键概念。它最初是为自然语言处理设计的神经网络架构,后来被证明在图像、视频等多种数据类型上都非常有效。Transformer的核心机制是"注意力"(Attention) :它允许模型在处理序列中的每个元素时,能够"关注"到序列中的其他相关元素。例如,在处理一个句子时,模型可以学会将"它"这个代词与前面出现的"猫"联系起来。在图像处理中,我们通常将图像分割成若干小块(称为patch),然后将这些patch作为序列输入Transformer。注意力的计算复杂度与序列长度的平方成正比,即 ,这在处理长序列时会带来计算上的挑战。
2.3 什么是VAE?
变分自编码器(Variational Autoencoder,简称VAE)是NWM中用于图像压缩的工具。直接在原始像素空间处理高分辨率图像计算量巨大,VAE提供了一种解决方案:它学习将图像压缩到一个低维的"隐空间"(latent space),在这个空间中进行处理,然后再解压回像素空间。例如,一张 的RGB图像包含约20万个数值,但经过VAE编码后可能只剩下几千个数值,大大降低了后续处理的计算量。NWM使用的是Stable Diffusion项目中预训练的VAE,它将图像压缩为原尺寸的1/8,即 的图像变成 的隐表示。
2.4 导航动作的表示
在NWM中,导航动作被定义为一个三维向量,其中 表示平移(前后移动和左右移动), 表示偏航角(yaw)的变化,即机器人转向的角度。这是一个简化但实用的动作表示:它假设机器人在一个平面上移动,不考虑上下移动(如爬楼梯)和俯仰、翻滚等复杂运动。这种3自由度(3-DoF)的动作空间足以描述大多数地面机器人的基本运动,同时保持了问题的可处理性。在实际应用中,这些动作可以是精确测量的(如在仿真环境中),也可以是根据机器人位置变化近似计算的。
3. NWM的核心公式化
3.1 问题定义
现在我们正式定义NWM要解决的问题。假设我们有一个由第一人称视角视频和对应导航动作组成的数据集:
其中 是时刻 的RGB图像观测, 是时刻 执行的导航动作。我们的目标是学习一个世界模型,它能够根据过去的观测和当前的动作,预测未来的观测。用数学语言表达,就是学习一个条件概率分布 ,其中 是图像 经过VAE编码后的隐表示, 是过去 帧的隐表示序列。
3.2 时间偏移机制
NWM引入了一个重要的扩展:时间偏移(time shift)参数 。传统的世界模型只预测"下一帧",但NWM可以预测"未来第k帧"或者"过去第k帧"。扩展后的动作表示变为 ,其中 指定了要预测的时间偏移量。这带来了几个好处:首先,模型可以学习不同时间尺度的环境动态;其次,在规划时可以灵活选择预测的时间粒度;最后,通过设置负的 值,模型甚至可以"回忆"过去的场景。在实践中,NWM允许最多 秒的时间偏移。
3.3 动作累加
当时间偏移 时,需要将多个时间步的动作累加起来。具体来说,从时刻 到时刻 的累积动作计算如下:
平移动作直接相加,而旋转角度需要取模 以保持在有效范围内。这种累加方式的物理含义很直观:如果你连续向前走了3步,每步1米,那么总的前进距离就是3米;如果你连续右转了3次,每次30度,那么总的转向角度就是90度。
3.4 学习目标
NWM的训练采用标准的扩散模型损失函数。给定当前状态 、上下文状态序列 、动作 和目标状态 ,首先对目标状态加噪得到 ,然后训练模型预测原始状态:
这个损失函数的含义是:模型需要从被噪声污染的状态 中恢复出干净的目标状态 ,同时这个恢复过程需要以上下文 和动作 为条件。通过在不同噪声水平 上训练,模型学会了完整的去噪过程,从而能够从纯噪声生成符合条件的未来状态。
4. 条件扩散Transformer架构(CDiT)
4.1 设计动机
NWM的核心技术创新是条件扩散Transformer(Conditional Diffusion Transformer,简称CDiT)。为什么需要设计新的架构,而不是直接使用现有的Diffusion Transformer(DiT)?问题出在计算效率上。在导航任务中,模型需要根据多帧历史图像(上下文)来预测未来图像。如果使用标准DiT,需要将所有上下文帧的tokens和目标帧的tokens拼接在一起做自注意力计算。假设每帧有 个tokens,共有 帧,标准注意力的计算复杂度为 ,随着上下文长度二次增长。这意味着使用更多的历史帧会导致计算量急剧增加,限制了模型利用长期历史信息的能力。
4.2 CDiT的核心设计
CDiT通过一个巧妙的设计解决了这个问题:将自注意力限制在目标帧的tokens之间,然后通过交叉注意力(Cross-Attention)来整合上下文信息。具体来说,CDiT Block包含三个主要组件:第一是自注意力层,只在目标帧的tokens之间计算注意力,复杂度为 ;第二是交叉注意力层,目标帧的tokens作为Query,上下文帧的tokens作为Key和Value,复杂度为 ;第三是前馈网络(MLP)层。总体复杂度变为 ,与上下文帧数 呈线性关系,而非二次关系。这使得CDiT可以高效地利用更长的历史上下文。
4.3 架构图解
4.4 条件嵌入机制
CDiT需要将多种条件信息(动作、时间偏移、扩散时间步)融入模型。NWM采用了AdaLN(Adaptive Layer Normalization)机制:首先将各种条件信息分别编码为向量,然后相加得到统一的条件embedding :
其中 是动作embedding, 是时间偏移embedding, 是扩散时间步embedding。每种条件都先通过正弦余弦位置编码转换为高维向量,再通过MLP映射到最终维度。这个统一的条件向量 随后被送入一个MLP,生成11个调制系数,用于调节各个归一化层和注意力层的输出。这种设计使得模型能够根据不同的条件灵活调整其内部计算。
4.5 动作嵌入的实现
动作嵌入器将三维动作向量 转换为高维表示。由于动作的三个分量具有不同的物理含义和取值范围,NWM为每个分量设计了独立的嵌入器。以下是来自NWM项目models.py的实际代码:
# 来源: nwm/models.py
class ActionEmbedder(nn.Module):
"""
Embeds action xy into vector representations.
将导航动作(x位移, y位移, 偏航角)嵌入为向量表示
"""
def __init__(self, hidden_size, frequency_embedding_size=256):
super().__init__()
hsize = hidden_size//3
self.x_emb = TimestepEmbedder(hsize, frequency_embedding_size)
self.y_emb = TimestepEmbedder(hsize, frequency_embedding_size)
self.angle_emb = TimestepEmbedder(hidden_size - 2*hsize, frequency_embedding_size)
def forward(self, xya):
return torch.cat([
self.x_emb(xya[..., 0:1]),
self.y_emb(xya[..., 1:2]),
self.angle_emb(xya[..., 2:3])
], dim=-1)
其中TimestepEmbedder使用正弦余弦位置编码将标量转换为高维向量:
# 来源: nwm/models.py
class TimestepEmbedder(nn.Module):
"""
Embeds scalar timesteps into vector representations.
"""
def __init__(self, hidden_size, frequency_embedding_size=256):
super().__init__()
self.mlp = nn.Sequential(
nn.Linear(frequency_embedding_size, hidden_size, bias=True),
nn.SiLU(),
nn.Linear(hidden_size, hidden_size, bias=True),
)
self.frequency_embedding_size = frequency_embedding_size
@staticmethod
def timestep_embedding(t, dim, max_period=10000):
"""
Create sinusoidal timestep embeddings.
"""
half = dim // 2
freqs = torch.exp(
-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half
).to(device=t.device)
args = t.float() * freqs[None]
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
if dim % 2:
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1)
return embedding
def forward(self, t):
t_freq = self.timestep_embedding(t, self.frequency_embedding_size)
t_emb = self.mlp(t_freq)
return t_emb
4.6 CDiT Block的完整实现
以下是来自NWM项目models.py的CDiT Block实际代码,展示了自注意力、交叉注意力和AdaLN调制的具体细节:
# 来源: nwm/models.py
def modulate(x, shift, scale):
"""AdaLN的调制操作"""
return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)
class CDiTBlock(nn.Module):
"""
A DiT block with adaptive layer norm zero (adaLN-Zero) conditioning.
条件扩散Transformer块,关键创新:通过交叉注意力实现线性复杂度的上下文建模
"""
def __init__(self, hidden_size, num_heads, mlp_ratio=4.0, **block_kwargs):
super().__init__()
self.norm1 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.attn = Attention(hidden_size, num_heads=num_heads, qkv_bias=True, **block_kwargs)
self.norm2 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.norm_cond = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.cttn = nn.MultiheadAttention(
hidden_size, num_heads=num_heads,
add_bias_kv=True, bias=True, batch_first=True, **block_kwargs
)
self.adaLN_modulation = nn.Sequential(
nn.SiLU(),
nn.Linear(hidden_size, 11 * hidden_size, bias=True)
)
self.norm3 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
mlp_hidden_dim = int(hidden_size * mlp_ratio)
approx_gelu = lambda: nn.GELU(approximate="tanh")
self.mlp = Mlp(in_features=hidden_size, hidden_features=mlp_hidden_dim, act_layer=approx_gelu, drop=0)
def forward(self, x, c, x_cond):
# 从条件c生成11个调制系数
shift_msa, scale_msa, gate_msa, \
shift_ca_xcond, scale_ca_xcond, \
shift_ca_x, scale_ca_x, gate_ca_x, \
shift_mlp, scale_mlp, gate_mlp = self.adaLN_modulation(c).chunk(11, dim=1)
# 自注意力(仅在目标帧tokens之间)
x = x + gate_msa.unsqueeze(1) * self.attn(modulate(self.norm1(x), shift_msa, scale_msa))
# 交叉注意力(目标帧tokens作为Query,上下文帧tokens作为Key/Value)
x_cond_norm = modulate(self.norm_cond(x_cond), shift_ca_xcond, scale_ca_xcond)
x = x + gate_ca_x.unsqueeze(1) * self.cttn(
query=modulate(self.norm2(x), shift_ca_x, scale_ca_x),
key=x_cond_norm,
value=x_cond_norm,
need_weights=False
)[0]
# 前馈网络MLP
x = x + gate_mlp.unsqueeze(1) * self.mlp(modulate(self.norm3(x), shift_mlp, scale_mlp))
return x
4.7 模型规模配置
NWM提供了从小到大四种规模的模型配置,以适应不同的计算资源和性能需求:
| 模型名称 | Transformer深度 | 隐藏维度 | 注意力头数 | 参数量 |
|---|---|---|---|---|
| CDiT-S/2 | 12层 | 384 | 6 | 约3300万 |
| CDiT-B/2 | 12层 | 768 | 12 | 约1.3亿 |
| CDiT-L/2 | 24层 | 1024 | 16 | 约4.6亿 |
| CDiT-XL/2 | 28层 | 1152 | 16 | 约10亿 |
论文的主要实验使用CDiT-XL/2配置,在8台H100机器(每台8块GPU)上进行分布式训练。"/2"表示patch大小为2,即每 的隐空间区域被编码为一个token。
5. 训练流程详解
5.1 数据集概览
NWM在多个来源的数据集上进行联合训练,涵盖了不同的机器人平台、环境类型和任务场景。SCAND数据集包含8.7小时的社交合规导航数据,使用轮式Jackal机器人和四足Spot机器人在室内外环境中采集。TartanDrive数据集包含5小时的越野驾驶数据,使用改装的全地形车在匹兹堡郊外采集。RECON数据集是最大的一个,包含40小时的开放世界导航数据,覆盖9种不同环境。HuRoN数据集包含75小时的室内社交交互数据,使用Roomba机器人采集。此外,NWM还利用了Ego4D数据集中的908小时无标签人类第一人称视频,用于提升对未知环境的泛化能力。
5.2 数据加载与预处理
训练数据的加载需要特别设计以支持时间偏移和多目标采样。每个训练样本包含:上下文帧序列(过去m帧)、目标帧(未来某一帧)、从上下文到目标的累积动作、以及时间偏移量。为了鼓励模型学习多样化的时间动态,每个上下文可以对应多个不同时间偏移的目标。以下是来自datasets.py的实际代码:
# 来源: nwm/datasets.py
class TrainingDataset(BaseDataset):
def __getitem__(self, i: int) -> Tuple[torch.Tensor]:
try:
f_curr, curr_time, min_goal_dist, max_goal_dist = self.index_to_data[i]
# 随机采样多个目标时间(关键设计:每个样本对应多个目标)
goal_offset = np.random.randint(min_goal_dist, max_goal_dist + 1, size=(self.goals_per_obs))
goal_time = (curr_time + goal_offset).astype('int')
# 计算相对时间(归一化,128为固定常数)
rel_time = (goal_offset).astype('float')/(128.)
# 构建上下文帧序列
context_times = list(range(curr_time - self.context_size + 1, curr_time + 1))
context = [(f_curr, t) for t in context_times] + [(f_curr, t) for t in goal_time]
# 加载图像:上下文帧 + 目标帧
obs_image = torch.stack([
self.transform(Image.open(get_data_path(self.data_folder, f, t)))
for f, t in context
])
# 加载轨迹数据并计算动作
curr_traj_data = self._get_trajectory(f_curr)
_, goal_pos = self._compute_actions(curr_traj_data, curr_time, goal_time)
goal_pos[:, :2] = normalize_data(goal_pos[:, :2], self.ACTION_STATS)
return (
torch.as_tensor(obs_image, dtype=torch.float32),
torch.as_tensor(goal_pos, dtype=torch.float32),
torch.as_tensor(rel_time, dtype=torch.float32),
)
except Exception as e:
print(f"Exception in {self.dataset_name}", e)
raise Exception(e)
5.3 训练循环核心逻辑
训练过程遵循标准的扩散模型训练范式,但针对多帧输入和多目标学习进行了适配。以下是来自train.py的实际训练循环代码:
# 来源: nwm/train.py (训练循环核心部分)
for x, y, rel_t in loader:
x = x.to(device, non_blocking=True)
y = y.to(device, non_blocking=True)
rel_t = rel_t.to(device, non_blocking=True)
with torch.amp.autocast('cuda', enabled=bfloat_enable, dtype=torch.bfloat16):
with torch.no_grad():
# VAE编码:将像素图像压缩到隐空间并归一化
B, T = x.shape[:2]
x = x.flatten(0, 1)
x = tokenizer.encode(x).latent_dist.sample().mul_(0.18215)
x = x.unflatten(0, (B, T))
# 分离上下文和目标
num_goals = T - num_cond
x_start = x[:, num_cond:].flatten(0, 1) # 目标帧隐表示
# 扩展上下文以匹配多目标
x_cond = x[:, :num_cond].unsqueeze(1).expand(
B, num_goals, num_cond, x.shape[2], x.shape[3], x.shape[4]
).flatten(0, 1)
y = y.flatten(0, 1)
rel_t = rel_t.flatten(0, 1)
# 随机采样扩散时间步
t = torch.randint(0, diffusion.num_timesteps, (x_start.shape[0],), device=device)
# 计算扩散损失
model_kwargs = dict(y=y, x_cond=x_cond, rel_t=rel_t)
loss_dict = diffusion.training_losses(model, x_start, t, model_kwargs)
loss = loss_dict["loss"].mean()
# 梯度更新
opt.zero_grad()
if not bfloat_enable:
loss.backward()
opt.step()
else:
scaler.scale(loss).backward()
if config.get('grad_clip_val', 0) > 0:
scaler.unscale_(opt)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=config['grad_clip_val'])
scaler.step(opt)
scaler.update()
# EMA更新
update_ema(ema, model.module)
5.4 EMA与分布式训练
为了提高生成质量的稳定性,NWM使用指数移动平均(EMA)来维护一个平滑的模型副本用于推理。同时,为了处理10亿参数规模的模型和大量数据,采用了多机多卡的分布式训练策略。以下是来自train.py的实际代码:
# 来源: nwm/train.py
@torch.no_grad()
def update_ema(ema_model, model, decay=0.9999):
"""
Step the EMA model towards the current model.
将EMA模型向当前模型靠近,decay越接近1更新越慢
"""
ema_params = OrderedDict(ema_model.named_parameters())
model_params = OrderedDict(model.named_parameters())
for name, param in model_params.items():
name = name.replace('_orig_mod.', '') # 处理torch.compile的命名
ema_params[name].mul_(decay).add_(param.data, alpha=1 - decay)
# 分布式训练配置 (来自train.py main函数)
# 初始化分布式环境
_, rank, device, _ = init_distributed()
seed = args.global_seed * dist.get_world_size() + rank
torch.manual_seed(seed)
# 创建模型并包装为DDP
model = CDiT_models[config['model']](context_size=num_cond, input_size=latent_size, in_channels=4).to(device)
ema = deepcopy(model).to(device)
requires_grad(ema, False)
model = DDP(model, device_ids=[device])
# 创建分布式数据采样器
sampler = DistributedSampler(
train_dataset,
num_replicas=dist.get_world_size(),
rank=rank,
shuffle=True,
seed=args.global_seed
)
loader = DataLoader(
train_dataset,
batch_size=config['batch_size'],
shuffle=False,
sampler=sampler,
num_workers=config['num_workers'],
pin_memory=True,
drop_last=True,
persistent_workers=True
)
6. 导航规划:在想象中寻找最优路径
6.1 规划问题的形式化
训练好的NWM可以用于导航规划。给定当前观测 和目标观测 ,我们需要找到一个动作序列 ,使得执行这些动作后能够到达目标。这可以形式化为一个优化问题:找到使"能量函数"最小的动作序列。能量函数定义为:
这个能量函数由三部分组成:第一项 衡量最终状态与目标的相似度(取负号因为我们要最大化相似度);第二项对违反动作约束的情况施加惩罚;第三项对进入不安全状态的情况施加惩罚。相似度 使用LPIPS(Learned Perceptual Image Patch Similarity)计算,这是一种基于深度学习的感知相似度度量,比简单的像素差异更符合人类对图像相似性的判断。
6.2 交叉熵方法(CEM)
NWM使用交叉熵方法(Cross-Entropy Method,CEM)来求解这个优化问题。CEM是一种无梯度的随机优化方法,特别适合处理目标函数不可微分或存在局部最优的情况。其基本思想是:维护一个动作的概率分布,从中采样多个候选动作序列,评估它们的效果,然后根据最优的那些样本更新分布参数。具体步骤如下:首先,初始化一个高斯分布 作为动作的先验分布;然后,从这个分布中采样N个候选动作序列;接着,使用NWM模拟每个动作序列,计算其能量值;最后,选取能量最低的前K个样本,用它们的均值和方差更新分布参数。
6.3 CEM规划的代码实现
以下是来自planning_eval.py的CEM规划核心实现代码,展示了如何将NWM与优化算法结合:
# 来源: nwm/planning_eval.py
class WM_Planning_Evaluator:
def __init__(self, args):
# ... 初始化代码省略 ...
self.loss_fn = lpips.LPIPS(net='alex').to(self.device)
self.mode = 'cem'
self.num_samples = self.args.num_samples
self.topk = self.args.topk
self.opt_steps = self.args.opt_steps
self.num_repeat_eval = self.args.num_repeat_eval
self.action_dim = 3 # (delta_x, delta_y, delta_yaw)
def init_mu_sigma(self, obs_0, traj_len):
"""初始化CEM的分布参数"""
n_evals = obs_0.shape[0]
mu = torch.zeros(n_evals, self.action_dim)
mu[:, ] = torch.tensor(data_hyperparams[self.args.datasets]['mu'])
sigma = torch.ones([n_evals, self.action_dim])
sigma[:, ] = torch.tensor(data_hyperparams[self.args.datasets]['var_scale'])
return mu, sigma
def generate_actions(self, dataset_save_output_dir, dataset_name, idxs,
obs_image, goal_image, gt_actions, len_traj_pred):
"""使用CEM生成最优动作序列"""
n_evals = obs_image.shape[0]
mu, sigma = self.init_mu_sigma(obs_image, len_traj_pred)
mu, sigma = mu.to(self.device), sigma.to(self.device)
for i in range(self.opt_steps):
losses = []
for traj in range(n_evals):
# 从当前分布采样候选动作
sample = (torch.randn(self.num_samples, self.action_dim).to(self.device)
* sigma[traj] + mu[traj])
# 构建完整轨迹的动作序列
single_delta = sample[:, :2]
deltas = single_delta.unsqueeze(1).repeat(1, len_traj_pred, 1)
unnorm_deltas = unnormalize_data(deltas, ACTION_STATS_TORCH)
delta_yaw = calculate_delta_yaw(unnorm_deltas)
deltas = torch.cat((deltas, delta_yaw.to(deltas.device)), dim=-1)
deltas[:, -1, -1] += sample[:, -1] * np.pi
cur_obs_image = obs_image[traj].unsqueeze(0).repeat(self.num_samples, 1, 1, 1, 1)
cur_goal_image = goal_image[traj].unsqueeze(0).repeat(
self.num_samples, 1, 1, 1, 1).squeeze(1)
# 使用NWM模拟轨迹并计算LPIPS损失
preds = self.autoregressive_rollout(cur_obs_image, deltas, self.args.rollout_stride)
preds = preds[:, -1] # 取最后预测帧
loss = self.loss_fn(preds.to(self.device), cur_goal_image.to(self.device)).flatten(0)
# 选择top-k最优样本更新分布
sorted_idx = torch.argsort(loss)
topk_idx = sorted_idx[:self.topk]
topk_action = deltas[topk_idx][:, -1]
losses.append(loss[topk_idx[0]].item())
mu[traj] = topk_action.mean(dim=0)
sigma[traj] = topk_action.std(dim=0)
return pred_actions, pred_yaw
def autoregressive_rollout(self, obs_image, deltas, rollout_stride):
"""自回归展开轨迹"""
deltas = deltas.unflatten(1, (-1, rollout_stride)).sum(2)
preds = []
curr_obs = obs_image.clone().to(self.device)
for i in range(deltas.shape[1]):
curr_delta = deltas[:, i:i+1]
all_models = self.model, self.diffusion, self.vae
x_pred_pixels = model_forward_wrapper(
all_models, curr_obs, curr_delta,
self.args.rollout_stride, self.latent_size,
num_cond=self.num_cond, device=self.device
)
x_pred_pixels = x_pred_pixels.unsqueeze(1)
# 滑动窗口更新观测
curr_obs = torch.cat((curr_obs, x_pred_pixels), dim=1)
curr_obs = curr_obs[:, 1:] # 移除最早的帧
preds.append(x_pred_pixels)
preds = torch.cat(preds, 1)
return preds
6.4 约束感知规划
NWM的一个重要优势是支持在规划时动态引入约束。例如,如果我们希望机器人"先直行,再转弯",可以通过修改CEM的采样过程来实现:
def apply_constraints(trajectory, constraint_type):
"""
对轨迹施加约束
Args:
trajectory: 动作序列 [B, T, 3]
constraint_type: 约束类型
Returns:
满足约束的动作序列
"""
if constraint_type == 'forward_first':
# 约束:前5步只能前进,后3步可以转向
trajectory[:, :5, 2] = 0 # 前5步禁止转向
trajectory[:, 5:, :2] = 0 # 后3步禁止平移
elif constraint_type == 'no_left_turn':
# 约束:不允许左转(假设正值为左转)
trajectory[:, :, 2] = trajectory[:, :, 2].clamp(max=0)
elif constraint_type == 'stay_in_lane':
# 约束:限制横向移动范围
trajectory[:, :, 1] = trajectory[:, :, 1].clamp(-0.5, 0.5)
return trajectory
6.5 与外部策略结合
除了独立规划,NWM还可以用于改进现有的导航策略。具体做法是:使用现有策略(如NoMaD)生成多个候选轨迹,然后用NWM模拟每个轨迹,选择模拟效果最好的那个。这种方式结合了现有策略的先验知识和NWM的评估能力:
def rank_policy_trajectories(policy, nwm, obs, goal, num_samples=32):
"""
使用NWM对策略生成的轨迹进行排序
Args:
policy: 外部导航策略(如NoMaD)
nwm: 训练好的NWM模型
obs: 当前观测
goal: 目标观测
num_samples: 采样轨迹数量
Returns:
最优轨迹
"""
# 从策略采样多个候选轨迹
candidate_trajectories = policy.sample(obs, goal, n=num_samples)
# 使用NWM模拟并评估每个轨迹
scores = []
for traj in candidate_trajectories:
# 自回归模拟
simulated_frames = nwm.simulate(obs, traj)
final_frame = simulated_frames[-1]
# 计算与目标的相似度
score = compute_lpips(final_frame, goal)
scores.append(score)
# 返回得分最低(最相似)的轨迹
best_idx = np.argmin(scores)
return candidate_trajectories[best_idx]
7. 总结
Navigation World Models (NWM)为机器人视觉导航提供了一种全新的范式。与传统的"感知-决策"流水线不同,NWM让机器人学会了"想象"——在行动之前,先在内部模型中模拟各种可能的行动方案,评估它们的效果,然后选择最优的一个。这种基于世界模型的方法带来了三个关键优势:灵活性——可以在运行时动态引入新的约束条件;可解释性——可以可视化模型"想象"的未来场景;可扩展性——通过投入更多计算资源来解决更困难的问题。
306