• 正文
  • 相关推荐
申请入驻 产业图谱

读论文 | Navigation World Models: 构建机器人视觉导航的“想象力引擎“

04/15 11:12
306
加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论

转载自公众号:敢敢AUTOHUB

0. 摘要

本文深入解析由Meta AI、纽约大学和伯克利AI研究院联合提出的Navigation World Models (NWM)技术。NWM是一种可控视频生成模型,能够根据过去的视觉观测和导航动作预测未来的视觉状态。通过引入条件扩散Transformer架构(CDiT),NWM实现了10亿参数规模的训练,并在已知和未知环境中展现出强大的导航规划能力。本文将从技术原理、架构设计、代码实现到实验结果进行全面剖析,帮助读者理解这一前沿技术的核心思想与实现细节。

论文地址:https://arxiv.org/abs/2412.03572
GitHub项目地址:https://github.com/facebookresearch/nwm

1. 引言:什么是视觉导航?

1.1 导航的本质

导航是任何具有视觉和运动能力的智能体最基本的技能之一。无论是人类、动物还是机器人,要想在环境中自由移动,都需要解决一个核心问题:如何根据当前看到的画面,决定下一步应该往哪里走?对于人类来说,这似乎是一件非常自然的事情——我们看到前方有一扇门,就知道可以走过去;看到墙壁,就知道需要绕开。但对于机器人来说,这个看似简单的任务实际上涉及到复杂的感知、理解和决策过程。

目前主流的机器人视觉导航方法(如NoMaD、GNM等)采用的是监督学习的方式:收集大量人类或机器人的导航数据,然后训练一个神经网络来模仿这些行为。这种方法虽然在很多场景下效果不错,但存在两个根本性的问题。第一个问题是策略的"硬编码"特性——一旦模型训练完成,它的行为模式就固定了。如果我们希望机器人在某些情况下遵守新的规则(比如"不允许左转"或"必须绕开某个区域"),传统模型无法在运行时动态适应这些约束。第二个问题是计算资源无法灵活分配——面对简单任务和复杂任务,模型都使用相同的计算量,无法像人类那样"多想一会儿"来解决困难问题。

Navigation World Models (NWM)正是基于这一思想构建的导航系统。与之前的世界模型(如用于游戏的DIAMOND、GameNGen)不同,NWM面向的是真实世界的机器人导航场景,需要处理更加复杂和多变的环境。NWM的核心创新包括:提出了高效的条件扩散Transformer架构(CDiT),使得模型能够扩展到10亿参数规模;在多种机器人平台和环境上进行联合训练,实现了跨环境、跨机器人的泛化能力;支持约束感知的规划,可以在运行时动态引入新的导航约束;通过利用无标签的人类视频数据,提升了对未知环境的适应能力。

2. 预备知识:理解NWM的基础概念

2.1 什么是扩散模型?

在深入NWM之前,我们需要先理解扩散模型(Diffusion Model)这一关键技术。扩散模型是近年来在图像生成领域取得巨大成功的一类生成模型,其核心思想非常直观:想象你有一张清晰的照片,然后逐渐往上面加噪声,最终变成一团完全随机的雪花点;扩散模型要学习的就是这个过程的"逆过程"——如何从一团噪声中逐步恢复出清晰的图像。数学上,前向加噪过程可以表示为:给定原始数据 ,在时间步  的噪声版本为 ,其中  是标准高斯噪声, 是控制噪声程度的系数。模型的任务是学习一个去噪网络,能够从  预测出  或者噪声 。

2.2 什么是Transformer?

Transformer是另一个需要理解的关键概念。它最初是为自然语言处理设计的神经网络架构,后来被证明在图像、视频等多种数据类型上都非常有效。Transformer的核心机制是"注意力"(Attention) :它允许模型在处理序列中的每个元素时,能够"关注"到序列中的其他相关元素。例如,在处理一个句子时,模型可以学会将"它"这个代词与前面出现的"猫"联系起来。在图像处理中,我们通常将图像分割成若干小块(称为patch),然后将这些patch作为序列输入Transformer。注意力的计算复杂度与序列长度的平方成正比,即 ,这在处理长序列时会带来计算上的挑战。

2.3 什么是VAE?

变分自编码器(Variational Autoencoder,简称VAE)是NWM中用于图像压缩的工具。直接在原始像素空间处理高分辨率图像计算量巨大,VAE提供了一种解决方案:它学习将图像压缩到一个低维的"隐空间"(latent space),在这个空间中进行处理,然后再解压回像素空间。例如,一张  的RGB图像包含约20万个数值,但经过VAE编码后可能只剩下几千个数值,大大降低了后续处理的计算量。NWM使用的是Stable Diffusion项目中预训练的VAE,它将图像压缩为原尺寸的1/8,即  的图像变成  的隐表示。

2.4 导航动作的表示

在NWM中,导航动作被定义为一个三维向量,其中  表示平移(前后移动和左右移动), 表示偏航角(yaw)的变化,即机器人转向的角度。这是一个简化但实用的动作表示:它假设机器人在一个平面上移动,不考虑上下移动(如爬楼梯)和俯仰、翻滚等复杂运动。这种3自由度(3-DoF)的动作空间足以描述大多数地面机器人的基本运动,同时保持了问题的可处理性。在实际应用中,这些动作可以是精确测量的(如在仿真环境中),也可以是根据机器人位置变化近似计算的。

3. NWM的核心公式化

3.1 问题定义

现在我们正式定义NWM要解决的问题。假设我们有一个由第一人称视角视频和对应导航动作组成的数据集:

其中  是时刻  的RGB图像观测, 是时刻 执行的导航动作。我们的目标是学习一个世界模型,它能够根据过去的观测和当前的动作,预测未来的观测。用数学语言表达,就是学习一个条件概率分布 ,其中  是图像  经过VAE编码后的隐表示, 是过去  帧的隐表示序列。

3.2 时间偏移机制

NWM引入了一个重要的扩展:时间偏移(time shift)参数 。传统的世界模型只预测"下一帧",但NWM可以预测"未来第k帧"或者"过去第k帧"。扩展后的动作表示变为 ,其中  指定了要预测的时间偏移量。这带来了几个好处:首先,模型可以学习不同时间尺度的环境动态;其次,在规划时可以灵活选择预测的时间粒度;最后,通过设置负的  值,模型甚至可以"回忆"过去的场景。在实践中,NWM允许最多  秒的时间偏移。

3.3 动作累加

当时间偏移  时,需要将多个时间步的动作累加起来。具体来说,从时刻  到时刻  的累积动作计算如下:

平移动作直接相加,而旋转角度需要取模  以保持在有效范围内。这种累加方式的物理含义很直观:如果你连续向前走了3步,每步1米,那么总的前进距离就是3米;如果你连续右转了3次,每次30度,那么总的转向角度就是90度。

3.4 学习目标

NWM的训练采用标准的扩散模型损失函数。给定当前状态 、上下文状态序列 、动作  和目标状态 ,首先对目标状态加噪得到 ,然后训练模型预测原始状态:

这个损失函数的含义是:模型需要从被噪声污染的状态  中恢复出干净的目标状态 ,同时这个恢复过程需要以上下文  和动作  为条件。通过在不同噪声水平  上训练,模型学会了完整的去噪过程,从而能够从纯噪声生成符合条件的未来状态。

4. 条件扩散Transformer架构(CDiT)

4.1 设计动机

NWM的核心技术创新是条件扩散Transformer(Conditional Diffusion Transformer,简称CDiT)。为什么需要设计新的架构,而不是直接使用现有的Diffusion Transformer(DiT)?问题出在计算效率上。在导航任务中,模型需要根据多帧历史图像(上下文)来预测未来图像。如果使用标准DiT,需要将所有上下文帧的tokens和目标帧的tokens拼接在一起做自注意力计算。假设每帧有  个tokens,共有  帧,标准注意力的计算复杂度为 ,随着上下文长度二次增长。这意味着使用更多的历史帧会导致计算量急剧增加,限制了模型利用长期历史信息的能力。

4.2 CDiT的核心设计

CDiT通过一个巧妙的设计解决了这个问题:将自注意力限制在目标帧的tokens之间,然后通过交叉注意力(Cross-Attention)来整合上下文信息。具体来说,CDiT Block包含三个主要组件:第一是自注意力层,只在目标帧的tokens之间计算注意力,复杂度为 ;第二是交叉注意力层,目标帧的tokens作为Query,上下文帧的tokens作为Key和Value,复杂度为 ;第三是前馈网络(MLP)层。总体复杂度变为 ,与上下文帧数  呈线性关系,而非二次关系。这使得CDiT可以高效地利用更长的历史上下文。

4.3 架构图解

4.4 条件嵌入机制

CDiT需要将多种条件信息(动作、时间偏移、扩散时间步)融入模型。NWM采用了AdaLN(Adaptive Layer Normalization)机制:首先将各种条件信息分别编码为向量,然后相加得到统一的条件embedding :

其中  是动作embedding, 是时间偏移embedding, 是扩散时间步embedding。每种条件都先通过正弦余弦位置编码转换为高维向量,再通过MLP映射到最终维度。这个统一的条件向量  随后被送入一个MLP,生成11个调制系数,用于调节各个归一化层和注意力层的输出。这种设计使得模型能够根据不同的条件灵活调整其内部计算。

4.5 动作嵌入的实现

动作嵌入器将三维动作向量  转换为高维表示。由于动作的三个分量具有不同的物理含义和取值范围,NWM为每个分量设计了独立的嵌入器。以下是来自NWM项目models.py的实际代码:

# 来源: nwm/models.py
class ActionEmbedder(nn.Module):
    """
    Embeds action xy into vector representations.
    将导航动作(x位移, y位移, 偏航角)嵌入为向量表示
    """
    def __init__(self, hidden_size, frequency_embedding_size=256):
        super().__init__()
        hsize = hidden_size//3
        self.x_emb = TimestepEmbedder(hsize, frequency_embedding_size)
        self.y_emb = TimestepEmbedder(hsize, frequency_embedding_size)
        self.angle_emb = TimestepEmbedder(hidden_size - 2*hsize, frequency_embedding_size)

    def forward(self, xya):
        return torch.cat([
            self.x_emb(xya[..., 0:1]),
            self.y_emb(xya[..., 1:2]),
            self.angle_emb(xya[..., 2:3])
        ], dim=-1)

其中TimestepEmbedder使用正弦余弦位置编码将标量转换为高维向量:

# 来源: nwm/models.py
class TimestepEmbedder(nn.Module):
    """
    Embeds scalar timesteps into vector representations.
    """
    def __init__(self, hidden_size, frequency_embedding_size=256):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Linear(frequency_embedding_size, hidden_size, bias=True),
            nn.SiLU(),
            nn.Linear(hidden_size, hidden_size, bias=True),
        )
        self.frequency_embedding_size = frequency_embedding_size

    @staticmethod
    def timestep_embedding(t, dim, max_period=10000):
        """
        Create sinusoidal timestep embeddings.
        """
        half = dim // 2
        freqs = torch.exp(
            -math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half
        ).to(device=t.device)
        args = t.float() * freqs[None]
        embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1)
        if dim % 2:
            embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1)
        return embedding

    def forward(self, t):
        t_freq = self.timestep_embedding(t, self.frequency_embedding_size)
        t_emb = self.mlp(t_freq)
        return t_emb

4.6 CDiT Block的完整实现

以下是来自NWM项目models.py的CDiT Block实际代码,展示了自注意力、交叉注意力和AdaLN调制的具体细节:

# 来源: nwm/models.py
def modulate(x, shift, scale):
    """AdaLN的调制操作"""
    return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)

class CDiTBlock(nn.Module):
    """
    A DiT block with adaptive layer norm zero (adaLN-Zero) conditioning.
    条件扩散Transformer块,关键创新:通过交叉注意力实现线性复杂度的上下文建模
    """
    def __init__(self, hidden_size, num_heads, mlp_ratio=4.0, **block_kwargs):
        super().__init__()
        self.norm1 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
        self.attn = Attention(hidden_size, num_heads=num_heads, qkv_bias=True, **block_kwargs)
        self.norm2 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
        self.norm_cond = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
        self.cttn = nn.MultiheadAttention(
            hidden_size, num_heads=num_heads,
            add_bias_kv=True, bias=True, batch_first=True, **block_kwargs
        )
        self.adaLN_modulation = nn.Sequential(
            nn.SiLU(),
            nn.Linear(hidden_size, 11 * hidden_size, bias=True)
        )

        self.norm3 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
        mlp_hidden_dim = int(hidden_size * mlp_ratio)
        approx_gelu = lambda: nn.GELU(approximate="tanh")
        self.mlp = Mlp(in_features=hidden_size, hidden_features=mlp_hidden_dim, act_layer=approx_gelu, drop=0)

    def forward(self, x, c, x_cond):
        # 从条件c生成11个调制系数
        shift_msa, scale_msa, gate_msa, \
        shift_ca_xcond, scale_ca_xcond, \
        shift_ca_x, scale_ca_x, gate_ca_x, \
        shift_mlp, scale_mlp, gate_mlp = self.adaLN_modulation(c).chunk(11, dim=1)

        # 自注意力(仅在目标帧tokens之间)
        x = x + gate_msa.unsqueeze(1) * self.attn(modulate(self.norm1(x), shift_msa, scale_msa))

        # 交叉注意力(目标帧tokens作为Query,上下文帧tokens作为Key/Value)
        x_cond_norm = modulate(self.norm_cond(x_cond), shift_ca_xcond, scale_ca_xcond)
        x = x + gate_ca_x.unsqueeze(1) * self.cttn(
            query=modulate(self.norm2(x), shift_ca_x, scale_ca_x),
            key=x_cond_norm,
            value=x_cond_norm,
            need_weights=False
        )[0]

        # 前馈网络MLP
        x = x + gate_mlp.unsqueeze(1) * self.mlp(modulate(self.norm3(x), shift_mlp, scale_mlp))
        return x

4.7 模型规模配置

NWM提供了从小到大四种规模的模型配置,以适应不同的计算资源和性能需求:

模型名称 Transformer深度 隐藏维度 注意力头数 参数量
CDiT-S/2 12层 384 6 约3300万
CDiT-B/2 12层 768 12 约1.3亿
CDiT-L/2 24层 1024 16 约4.6亿
CDiT-XL/2 28层 1152 16 约10亿

论文的主要实验使用CDiT-XL/2配置,在8台H100机器(每台8块GPU)上进行分布式训练。"/2"表示patch大小为2,即每  的隐空间区域被编码为一个token。

5. 训练流程详解

5.1 数据集概览

NWM在多个来源的数据集上进行联合训练,涵盖了不同的机器人平台、环境类型和任务场景。SCAND数据集包含8.7小时的社交合规导航数据,使用轮式Jackal机器人和四足Spot机器人在室内外环境中采集。TartanDrive数据集包含5小时的越野驾驶数据,使用改装的全地形车在匹兹堡郊外采集。RECON数据集是最大的一个,包含40小时的开放世界导航数据,覆盖9种不同环境。HuRoN数据集包含75小时的室内社交交互数据,使用Roomba机器人采集。此外,NWM还利用了Ego4D数据集中的908小时无标签人类第一人称视频,用于提升对未知环境的泛化能力。

5.2 数据加载与预处理

训练数据的加载需要特别设计以支持时间偏移和多目标采样。每个训练样本包含:上下文帧序列(过去m帧)、目标帧(未来某一帧)、从上下文到目标的累积动作、以及时间偏移量。为了鼓励模型学习多样化的时间动态,每个上下文可以对应多个不同时间偏移的目标。以下是来自datasets.py的实际代码:

# 来源: nwm/datasets.py
class TrainingDataset(BaseDataset):
    def __getitem__(self, i: int) -> Tuple[torch.Tensor]:
        try:
            f_curr, curr_time, min_goal_dist, max_goal_dist = self.index_to_data[i]

            # 随机采样多个目标时间(关键设计:每个样本对应多个目标)
            goal_offset = np.random.randint(min_goal_dist, max_goal_dist + 1, size=(self.goals_per_obs))
            goal_time = (curr_time + goal_offset).astype('int')

            # 计算相对时间(归一化,128为固定常数)
            rel_time = (goal_offset).astype('float')/(128.)

            # 构建上下文帧序列
            context_times = list(range(curr_time - self.context_size + 1, curr_time + 1))
            context = [(f_curr, t) for t in context_times] + [(f_curr, t) for t in goal_time]

            # 加载图像:上下文帧 + 目标帧
            obs_image = torch.stack([
                self.transform(Image.open(get_data_path(self.data_folder, f, t)))
                for f, t in context
            ])

            # 加载轨迹数据并计算动作
            curr_traj_data = self._get_trajectory(f_curr)
            _, goal_pos = self._compute_actions(curr_traj_data, curr_time, goal_time)
            goal_pos[:, :2] = normalize_data(goal_pos[:, :2], self.ACTION_STATS)

            return (
                torch.as_tensor(obs_image, dtype=torch.float32),
                torch.as_tensor(goal_pos, dtype=torch.float32),
                torch.as_tensor(rel_time, dtype=torch.float32),
            )
        except Exception as e:
            print(f"Exception in {self.dataset_name}", e)
            raise Exception(e)

5.3 训练循环核心逻辑

训练过程遵循标准的扩散模型训练范式,但针对多帧输入和多目标学习进行了适配。以下是来自train.py的实际训练循环代码:

# 来源: nwm/train.py (训练循环核心部分)
for x, y, rel_t in loader:
    x = x.to(device, non_blocking=True)
    y = y.to(device, non_blocking=True)
    rel_t = rel_t.to(device, non_blocking=True)

    with torch.amp.autocast('cuda', enabled=bfloat_enable, dtype=torch.bfloat16):
        with torch.no_grad():
            # VAE编码:将像素图像压缩到隐空间并归一化
            B, T = x.shape[:2]
            x = x.flatten(0, 1)
            x = tokenizer.encode(x).latent_dist.sample().mul_(0.18215)
            x = x.unflatten(0, (B, T))

        # 分离上下文和目标
        num_goals = T - num_cond
        x_start = x[:, num_cond:].flatten(0, 1)  # 目标帧隐表示

        # 扩展上下文以匹配多目标
        x_cond = x[:, :num_cond].unsqueeze(1).expand(
            B, num_goals, num_cond, x.shape[2], x.shape[3], x.shape[4]
        ).flatten(0, 1)

        y = y.flatten(0, 1)
        rel_t = rel_t.flatten(0, 1)

        # 随机采样扩散时间步
        t = torch.randint(0, diffusion.num_timesteps, (x_start.shape[0],), device=device)

        # 计算扩散损失
        model_kwargs = dict(y=y, x_cond=x_cond, rel_t=rel_t)
        loss_dict = diffusion.training_losses(model, x_start, t, model_kwargs)
        loss = loss_dict["loss"].mean()

    # 梯度更新
    opt.zero_grad()
    if not bfloat_enable:
        loss.backward()
        opt.step()
    else:
        scaler.scale(loss).backward()
        if config.get('grad_clip_val', 0) > 0:
            scaler.unscale_(opt)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=config['grad_clip_val'])
        scaler.step(opt)
        scaler.update()

    # EMA更新
    update_ema(ema, model.module)

5.4 EMA与分布式训练

为了提高生成质量的稳定性,NWM使用指数移动平均(EMA)来维护一个平滑的模型副本用于推理。同时,为了处理10亿参数规模的模型和大量数据,采用了多机多卡的分布式训练策略。以下是来自train.py的实际代码:

# 来源: nwm/train.py
@torch.no_grad()
def update_ema(ema_model, model, decay=0.9999):
    """
    Step the EMA model towards the current model.
    将EMA模型向当前模型靠近,decay越接近1更新越慢
    """
    ema_params = OrderedDict(ema_model.named_parameters())
    model_params = OrderedDict(model.named_parameters())

    for name, param in model_params.items():
        name = name.replace('_orig_mod.', '')  # 处理torch.compile的命名
        ema_params[name].mul_(decay).add_(param.data, alpha=1 - decay)

# 分布式训练配置 (来自train.py main函数)
# 初始化分布式环境
_, rank, device, _ = init_distributed()
seed = args.global_seed * dist.get_world_size() + rank
torch.manual_seed(seed)

# 创建模型并包装为DDP
model = CDiT_models[config['model']](context_size=num_cond, input_size=latent_size, in_channels=4).to(device)
ema = deepcopy(model).to(device)
requires_grad(ema, False)

model = DDP(model, device_ids=[device])

# 创建分布式数据采样器
sampler = DistributedSampler(
    train_dataset,
    num_replicas=dist.get_world_size(),
    rank=rank,
    shuffle=True,
    seed=args.global_seed
)

loader = DataLoader(
    train_dataset,
    batch_size=config['batch_size'],
    shuffle=False,
    sampler=sampler,
    num_workers=config['num_workers'],
    pin_memory=True,
    drop_last=True,
    persistent_workers=True
)

6. 导航规划:在想象中寻找最优路径

6.1 规划问题的形式化

训练好的NWM可以用于导航规划。给定当前观测  和目标观测 ,我们需要找到一个动作序列 ,使得执行这些动作后能够到达目标。这可以形式化为一个优化问题:找到使"能量函数"最小的动作序列。能量函数定义为:

这个能量函数由三部分组成:第一项  衡量最终状态与目标的相似度(取负号因为我们要最大化相似度);第二项对违反动作约束的情况施加惩罚;第三项对进入不安全状态的情况施加惩罚。相似度  使用LPIPS(Learned Perceptual Image Patch Similarity)计算,这是一种基于深度学习的感知相似度度量,比简单的像素差异更符合人类对图像相似性的判断。

6.2 交叉熵方法(CEM)

NWM使用交叉熵方法(Cross-Entropy Method,CEM)来求解这个优化问题。CEM是一种无梯度的随机优化方法,特别适合处理目标函数不可微分或存在局部最优的情况。其基本思想是:维护一个动作的概率分布,从中采样多个候选动作序列,评估它们的效果,然后根据最优的那些样本更新分布参数。具体步骤如下:首先,初始化一个高斯分布  作为动作的先验分布;然后,从这个分布中采样N个候选动作序列;接着,使用NWM模拟每个动作序列,计算其能量值;最后,选取能量最低的前K个样本,用它们的均值和方差更新分布参数。

6.3 CEM规划的代码实现

以下是来自planning_eval.py的CEM规划核心实现代码,展示了如何将NWM与优化算法结合:

# 来源: nwm/planning_eval.py
class WM_Planning_Evaluator:
    def __init__(self, args):
        # ... 初始化代码省略 ...
        self.loss_fn = lpips.LPIPS(net='alex').to(self.device)
        self.mode = 'cem'
        self.num_samples = self.args.num_samples
        self.topk = self.args.topk
        self.opt_steps = self.args.opt_steps
        self.num_repeat_eval = self.args.num_repeat_eval
        self.action_dim = 3  # (delta_x, delta_y, delta_yaw)

    def init_mu_sigma(self, obs_0, traj_len):
        """初始化CEM的分布参数"""
        n_evals = obs_0.shape[0]
        mu = torch.zeros(n_evals, self.action_dim)
        mu[:, ] = torch.tensor(data_hyperparams[self.args.datasets]['mu'])
        sigma = torch.ones([n_evals, self.action_dim])
        sigma[:, ] = torch.tensor(data_hyperparams[self.args.datasets]['var_scale'])
        return mu, sigma

    def generate_actions(self, dataset_save_output_dir, dataset_name, idxs,
                         obs_image, goal_image, gt_actions, len_traj_pred):
        """使用CEM生成最优动作序列"""
        n_evals = obs_image.shape[0]
        mu, sigma = self.init_mu_sigma(obs_image, len_traj_pred)
        mu, sigma = mu.to(self.device), sigma.to(self.device)

        for i in range(self.opt_steps):
            losses = []
            for traj in range(n_evals):
                # 从当前分布采样候选动作
                sample = (torch.randn(self.num_samples, self.action_dim).to(self.device)
                          * sigma[traj] + mu[traj])

                # 构建完整轨迹的动作序列
                single_delta = sample[:, :2]
                deltas = single_delta.unsqueeze(1).repeat(1, len_traj_pred, 1)
                unnorm_deltas = unnormalize_data(deltas, ACTION_STATS_TORCH)
                delta_yaw = calculate_delta_yaw(unnorm_deltas)
                deltas = torch.cat((deltas, delta_yaw.to(deltas.device)), dim=-1)
                deltas[:, -1, -1] += sample[:, -1] * np.pi

                cur_obs_image = obs_image[traj].unsqueeze(0).repeat(self.num_samples, 1, 1, 1, 1)
                cur_goal_image = goal_image[traj].unsqueeze(0).repeat(
                    self.num_samples, 1, 1, 1, 1).squeeze(1)

                # 使用NWM模拟轨迹并计算LPIPS损失
                preds = self.autoregressive_rollout(cur_obs_image, deltas, self.args.rollout_stride)
                preds = preds[:, -1]  # 取最后预测帧
                loss = self.loss_fn(preds.to(self.device), cur_goal_image.to(self.device)).flatten(0)

                # 选择top-k最优样本更新分布
                sorted_idx = torch.argsort(loss)
                topk_idx = sorted_idx[:self.topk]
                topk_action = deltas[topk_idx][:, -1]
                losses.append(loss[topk_idx[0]].item())
                mu[traj] = topk_action.mean(dim=0)
                sigma[traj] = topk_action.std(dim=0)

        return pred_actions, pred_yaw

    def autoregressive_rollout(self, obs_image, deltas, rollout_stride):
        """自回归展开轨迹"""
        deltas = deltas.unflatten(1, (-1, rollout_stride)).sum(2)
        preds = []
        curr_obs = obs_image.clone().to(self.device)

        for i in range(deltas.shape[1]):
            curr_delta = deltas[:, i:i+1]
            all_models = self.model, self.diffusion, self.vae
            x_pred_pixels = model_forward_wrapper(
                all_models, curr_obs, curr_delta,
                self.args.rollout_stride, self.latent_size,
                num_cond=self.num_cond, device=self.device
            )
            x_pred_pixels = x_pred_pixels.unsqueeze(1)

            # 滑动窗口更新观测
            curr_obs = torch.cat((curr_obs, x_pred_pixels), dim=1)
            curr_obs = curr_obs[:, 1:]  # 移除最早的帧
            preds.append(x_pred_pixels)

        preds = torch.cat(preds, 1)
        return preds

6.4 约束感知规划

NWM的一个重要优势是支持在规划时动态引入约束。例如,如果我们希望机器人"先直行,再转弯",可以通过修改CEM的采样过程来实现:

def apply_constraints(trajectory, constraint_type):
    """
    对轨迹施加约束

    Args:
        trajectory: 动作序列 [B, T, 3]
        constraint_type: 约束类型

    Returns:
        满足约束的动作序列
    """
    if constraint_type == 'forward_first':
        # 约束:前5步只能前进,后3步可以转向
        trajectory[:, :5, 2] = 0  # 前5步禁止转向
        trajectory[:, 5:, :2] = 0  # 后3步禁止平移

    elif constraint_type == 'no_left_turn':
        # 约束:不允许左转(假设正值为左转)
        trajectory[:, :, 2] = trajectory[:, :, 2].clamp(max=0)

    elif constraint_type == 'stay_in_lane':
        # 约束:限制横向移动范围
        trajectory[:, :, 1] = trajectory[:, :, 1].clamp(-0.5, 0.5)

    return trajectory

6.5 与外部策略结合

除了独立规划,NWM还可以用于改进现有的导航策略。具体做法是:使用现有策略(如NoMaD)生成多个候选轨迹,然后用NWM模拟每个轨迹,选择模拟效果最好的那个。这种方式结合了现有策略的先验知识和NWM的评估能力:

def rank_policy_trajectories(policy, nwm, obs, goal, num_samples=32):
    """
    使用NWM对策略生成的轨迹进行排序

    Args:
        policy: 外部导航策略(如NoMaD)
        nwm: 训练好的NWM模型
        obs: 当前观测
        goal: 目标观测
        num_samples: 采样轨迹数量

    Returns:
        最优轨迹
    """
    # 从策略采样多个候选轨迹
    candidate_trajectories = policy.sample(obs, goal, n=num_samples)

    # 使用NWM模拟并评估每个轨迹
    scores = []
    for traj in candidate_trajectories:
        # 自回归模拟
        simulated_frames = nwm.simulate(obs, traj)
        final_frame = simulated_frames[-1]

        # 计算与目标的相似度
        score = compute_lpips(final_frame, goal)
        scores.append(score)

    # 返回得分最低(最相似)的轨迹
    best_idx = np.argmin(scores)
    return candidate_trajectories[best_idx]

7. 总结

Navigation World Models (NWM)为机器人视觉导航提供了一种全新的范式。与传统的"感知-决策"流水线不同,NWM让机器人学会了"想象"——在行动之前,先在内部模型中模拟各种可能的行动方案,评估它们的效果,然后选择最优的一个。这种基于世界模型的方法带来了三个关键优势:灵活性——可以在运行时动态引入新的约束条件;可解释性——可以可视化模型"想象"的未来场景;可扩展性——通过投入更多计算资源来解决更困难的问题。

相关推荐