Back to notes
Field notebook
/
23:18:55
/
Note

ReKep 深度学习笔记与二次开发指南

ReKep 深度学习笔记与二次开发指南 Relational Keypoint Constraints: 用"关键点约束"让机器人理解自然语言指令 论文: ReKep: Spatio Temporal Reasoning of Relational Keypoint Constraints for Robotic Manipulation CoRL 2024 作者团队:Wenlong Huang, Chen Wang, Yunzhu L…

Reading signalRoute: 机器人33 sections3 notes nearby
Follow 机器人 route

At a glance

Reading effort and structure before you settle in.

Reading time
10 min
Images
8
Views
403

Reader briefing

Primary route: 机器人33 sections
Section 1 of 4

Reading deck

Quiet body10 min33 sections机器人 route

ReKep 深度学习笔记与二次开发指南

Relational Keypoint Constraints: 用"关键点约束"让机器人理解自然语言指令

论文:ReKep: Spatio-Temporal Reasoning of Relational Keypoint Constraints for Robotic Manipulation (CoRL 2024)

作者团队:Wenlong Huang, Chen Wang, Yunzhu Li, Ruohan Zhang, Li Fei-Fei (Stanford Vision & Learning Lab)

本笔记面向机器人工程本科生,兼具学术严谨性与技术博客的易读性。


目录


1. 算法核心原理解析

1.1 为什么输出"关键点约束"而非直接输出动作?

传统端到端方法(如 BC、Diffusion Policy)直接从图像映射到机器人关节角度或末端执行器速度。这看似简洁,但存在三个根本性问题:

维度端到端方法ReKep
泛化性杯子换了形状就要重新收集数据、重新训练只要关键点能识别,约束自动适配新物体
可解释性黑盒网络,出错后无法定位原因约束函数是 Python 代码,可读、可调试、可手动修改
组合性每个任务需要独立的数据和训练约束可自由组合,零样本完成全新任务
数据需求需要大量 demonstration 数据零样本——只需一句自然语言指令

ReKep 的核心洞见:与其让模型学习"怎么动"(How to move),不如让模型定义"什么是好的状态"(What is a good state)。

类比:你告诉一个人"把笔竖直插到笔筒里"。你不需要告诉他每个关节怎么转(动作空间);你只需要告诉他两个条件:(1) 笔要竖直;(2) 笔尖要在笔筒正上方。这两个条件就是约束。具体怎么移动手臂,由"优化器"自动求解。

用一条公式概括 ReKep 的方法论:

自然语言指令GPT-4o关键点约束函数(Python 代码)数值优化机器人动作\text{自然语言指令} \xrightarrow{\text{GPT-4o}} \text{关键点约束函数(Python 代码)} \xrightarrow{\text{数值优化}} \text{机器人动作}

1.2 三大核心阶段总览

ReKep 的完整流程分为三个阶段——"看 → 想 → 做"

阶段模块输入输出核心技术
(Keypoint Proposal)keypoint_proposal.pyRGB-D + 分割 MaskK 个 3D 关键点DINOv2 + K-Means
(Constraint Generation)constraint_generation.py标注图 + 指令Python 约束函数GPT-4o Vision
(Optimization Control)subgoal_solver.py + path_solver.py约束 + 状态机器人轨迹Dual Annealing + SLSQP

下面逐一深入。


1.3 Stage 1: 关键点提取 (Keypoint Proposal)

目标:从 RGB-D 图像中提取一组语义关键点——这些点不是随机的几何角点,而是物体上语义有意义的位置(如笔的尖端、茶壶的壶嘴、杯子的把手)。

1.3.1 为什么用 DINOv2?

DINOv2 是 Meta 推出的自监督 Vision Transformer。它的特殊之处在于:即使没有标注数据,它也能学到"同一物体的不同部件应该有不同的特征表示"。例如:

  • 茶壶的壶嘴和壶把虽然都属于"茶壶",但在 DINOv2 的特征空间中被映射到不同的区域
  • 这使得后续的聚类能自然地找到语义上有意义的代表点

1.3.2 逐行代码解析

Step 1: 预处理 (keypoint_proposal.py:48-66)

def _preprocess(self, rgb, points, masks):
    # 将分割图转为 binary masks 列表
    # 例如 masks 中有 3 个物体 (id=0,1,2),则生成 3 个 binary mask
    masks = [masks == uid for uid in np.unique(masks)]
    
    # DINOv2 要求图像尺寸能被 patch_size=14 整除
    # 480 ÷ 14 = 34.28,取整为 34,所以 new_H = 34 × 14 = 476
    H, W, _ = rgb.shape
    patch_h = int(H // self.patch_size)  # = 34
    patch_w = int(W // self.patch_size)  # = 34
    new_H = patch_h * self.patch_size     # = 476
    new_W = patch_w * self.patch_size     # = 476
    transformed_rgb = cv2.resize(rgb, (new_W, new_H))
    transformed_rgb = transformed_rgb.astype(np.float32) / 255.0  # 归一化到 [0,1]

关键细节:DINOv2 使用 14×14 的 patch embedding,所以输入图像的高和宽必须是 14 的整数倍。这一步裁剪掉了边缘几个像素。

Step 2: 特征提取 (keypoint_proposal.py:86-104)

@torch.inference_mode()       # 禁用梯度计算,加速推理
@torch.amp.autocast('cuda')   # 使用半精度 FP16,节省显存
def _get_features(self, transformed_rgb, shape_info):
    # 将 numpy 图像转为 PyTorch tensor: [H,W,3] → [1,3,H,W]
    img_tensors = torch.from_numpy(transformed_rgb).permute(2, 0, 1).unsqueeze(0).to(self.device)
    
    # DINOv2 前向传播,提取 patch tokens
    features_dict = self.dinov2.forward_features(img_tensors)
    # 输出形状: [1, 34*34, 384] = [1, 1156, 384]
    # 每个 14×14 patch 得到一个 384 维的特征向量
    raw_feature_grid = features_dict['x_norm_patchtokens']
    raw_feature_grid = raw_feature_grid.reshape(1, patch_h, patch_w, -1)  # [1, 34, 34, 384]
    
    # 双线性插值:将 34×34 的 patch 级特征上采样到 480×480 的像素级
    interpolated_feature_grid = interpolate(
        raw_feature_grid.permute(0, 3, 1, 2),   # [1, 384, 34, 34]
        size=(img_h, img_w),                      # 上采样到 (480, 480)
        mode='bilinear'
    ).permute(0, 2, 3, 1).squeeze(0)              # [480, 480, 384]
    
    features_flat = interpolated_feature_grid.reshape(-1, 384)  # [230400, 384]
    return features_flat

数学原理:双线性插值将离散的 patch 特征"铺"到每个像素上。对于像素 (u,v)(u, v),它的特征是周围 4 个 patch 特征的加权平均:

f(u,v)=(1α)(1β)f00+α(1β)f10+(1α)βf01+αβf11\mathbf{f}(u,v) = (1-\alpha)(1-\beta)\mathbf{f}_{00} + \alpha(1-\beta)\mathbf{f}_{10} + (1-\alpha)\beta\mathbf{f}_{01} + \alpha\beta\mathbf{f}_{11}

其中 α,β[0,1)\alpha, \beta \in [0,1) 是小数偏移量。

Step 3: 特征聚类 (keypoint_proposal.py:106-152)

def _cluster_features(self, points, features_flat, masks):
    candidate_keypoints = []
    
    for rigid_group_id, binary_mask in enumerate(masks):
        # 跳过面积过大的 mask(通常是桌面等背景)
        if np.mean(binary_mask) > self.config['max_mask_ratio']:  # > 50%
            continue
        
        # 只取该物体 mask 区域内的特征
        obj_features_flat = features_flat[binary_mask.reshape(-1)]  # [N_obj, 384]
        feature_points = points[binary_mask]  # 对应的 3D 坐标 [N_obj, 3]
        
        # ============ PCA 降维 ============
        # 384D → 3D:去掉噪声,保留最重要的 3 个主成分
        obj_features_flat = obj_features_flat.double()
        (u, s, v) = torch.pca_lowrank(obj_features_flat, center=False)
        features_pca = torch.mm(obj_features_flat, v[:, :3])  # [N_obj, 3]
        # 归一化到 [0, 1]
        features_pca = (features_pca - features_pca.min(0)[0]) / \
                        (features_pca.max(0)[0] - features_pca.min(0)[0])
        
        # ============ 拼接空间坐标 ============
        # 将 3D PCA 特征和归一化的 3D 坐标拼接,形成 6D 特征向量
        # 这让聚类同时考虑"长什么样"和"在哪里"
        X = features_pca  # [N_obj, 3] 语义特征
        feature_points_torch = torch.tensor(feature_points, ...)
        feature_points_torch = (feature_points_torch - feature_points_torch.min(0)[0]) / \
                                (feature_points_torch.max(0)[0] - feature_points_torch.min(0)[0])
        X = torch.cat([X, feature_points_torch], dim=-1)  # [N_obj, 6]
        
        # ============ GPU K-Means 聚类 ============
        # 在 6D 空间中聚类,每个物体找 5 个簇
        cluster_ids_x, cluster_centers = kmeans(
            X=X,
            num_clusters=self.config['num_candidates_per_mask'],  # k=5
            distance='euclidean',
            device=self.device,
        )
        
        # 对每个簇,选择"最接近簇中心的真实点"作为关键点
        for cluster_id in range(5):
            cluster_center = cluster_centers[cluster_id][:3]   # 只取特征维度
            member_features = features_pca[cluster_ids_x == cluster_id]
            dist = torch.norm(member_features - cluster_center, dim=-1)
            closest_idx = torch.argmin(dist)
            # 记录该点的 3D 坐标和像素位置
            candidate_keypoints.append(member_points[closest_idx])

为什么先 PCA 再 K-Means?

DINOv2 的 384 维特征中,很多维度编码的是纹理、光照等对我们无用的信息。PCA 降维到 3D 相当于只保留"最大方差方向",即最能区分物体不同部件的维度。这让后续的 K-Means 聚类更鲁棒。

为什么拼接 3D 空间坐标?

仅凭语义特征聚类,可能把"颜色相似但位置不同的点"归为一簇。拼接坐标后,聚类同时考虑"外观"和"位置",确保每个簇对应一个空间上紧凑的区域。

Step 4: 空间合并 (keypoint_proposal.py:154-160)

def _merge_clusters(self, candidate_keypoints):
    # MeanShift 聚类:合并 3D 空间中距离 < 0.06m 的关键点
    self.mean_shift.fit(candidate_keypoints)  # bandwidth=0.06m
    cluster_centers = self.mean_shift.cluster_centers_
    # 对每个簇中心,找最近的真实候选点
    merged_indices = []
    for center in cluster_centers:
        dist = np.linalg.norm(candidate_keypoints - center, axis=-1)
        merged_indices.append(np.argmin(dist))
    return merged_indices

MeanShift 的作用:如果两个来自不同物体的候选点在 3D 空间中非常接近(< 6cm),说明它们实际上指向同一个位置,合并为一个关键点即可。这避免了冗余。

1.3.3 数学总结

设图像为 IRH×W×3I \in \mathbb{R}^{H \times W \times 3},DINOv2 编码为:

F=DINOv2(I)RH14×W14×384\mathbf{F} = \text{DINOv2}(I) \in \mathbb{R}^{\frac{H}{14} \times \frac{W}{14} \times 384}

经过双线性插值到 F^RH×W×384\hat{\mathbf{F}} \in \mathbb{R}^{H \times W \times 384},对物体 oo 的 mask 区域做 PCA 降维:

F^oPCA=F^oV:,:3RNo×3\hat{\mathbf{F}}_o^{PCA} = \hat{\mathbf{F}}_o \cdot \mathbf{V}_{:,:3} \in \mathbb{R}^{N_o \times 3}

拼接归一化的 3D 坐标后在 6D 空间做 K-Means:

{c1,...,ck}=K-Means([F^oPCA;Pˉo],k=5)\{\mathbf{c}_1, ..., \mathbf{c}_k\} = \text{K-Means}([\hat{\mathbf{F}}_o^{PCA}; \bar{\mathbf{P}}_o], k=5)

最终输出 KK 个关键点 K={k1,...,kK}RK×3\mathbf{K} = \{\mathbf{k}_1, ..., \mathbf{k}_K\} \in \mathbb{R}^{K \times 3}


1.4 Stage 2: 约束生成 (Constraint Generation)

目标:将自然语言指令转化为可执行的 Python 约束函数。

这是 ReKep 最精妙的部分——它把 GPT-4o 当作一个**"需求分析师 + 程序员"**:

"把笔竖直插入笔筒"+标注图像GPT-4o{f1:xeek10(对准笔)f2:cross(vpen,z^)0(竖直)f3:k1kˉholder[0,0,0.2]0(对准笔筒)\text{"把笔竖直插入笔筒"} + \text{标注图像} \xrightarrow{\text{GPT-4o}} \begin{cases} f_1: \|\mathbf{x}_{ee} - \mathbf{k}_1\| \leq 0 & \text{(对准笔)} \\ f_2: \|\text{cross}(\vec{v}_{pen}, \hat{z})\| \leq 0 & \text{(竖直)} \\ f_3: \|\mathbf{k}_1 - \bar{\mathbf{k}}_{holder} - [0,0,0.2]\| \leq 0 & \text{(对准笔筒)} \end{cases}

1.4.1 Prompt 模板结构

Prompt 模板位于 vlm_query/prompt_template.txt(共 109 行),包含三个关键部分:

Part 1: 任务分解规则 (L1-38)

告诉 GPT-4o 如何将任务分解为多个阶段,以及"抓取必须是独立阶段"等硬规则:

- Determine how many stages are involved in the task.
  Grasping must be an independent stage. Some examples:
  - "pouring tea from teapot":
    - 3 stages: "grasp teapot", "align teapot with cup opening", "pour liquid"
  - "put red block on top of blue block":
    - 3 stages: "grasp red block", "drop the red block on top of blue block"

Part 2: 约束编写规范 (L40-61)

定义了约束函数的精确接口和编写规则:

- Each constraint takes a dummy end-effector point and a set of keypoints 
  as input and returns a numerical cost, where the constraint is satisfied 
  if the cost is smaller than or equal to zero.
- Inputs:
  - end_effector: np.array of shape (3,)    ← 末端执行器位置
  - keypoints: np.array of shape (K, 3)     ← 关键点位置矩阵
- Avoid using "if" statements in your constraints.  ← 必须可微!
- For grasping constraint, use: get_grasping_cost_by_keypoint_idx(i)

Part 3: 输出格式模板 (L62-105)

严格规定了输出的代码结构:

num_stages = ?
def stage1_subgoal_constraint1(end_effector, keypoints): ...
def stage1_path_constraint1(end_effector, keypoints): ...
grasp_keypoints = [?, ..., ?]      # 每个阶段抓哪个关键点,-1 表示不抓
release_keypoints = [?, ..., ?]    # 每个阶段末释放哪个关键点,-1 表示不释放

1.4.2 代码逐行解析

GPT-4o 调用 (constraint_generation.py:120-158)

def generate(self, img, instruction, metadata):
    # 创建任务目录,按时间戳命名
    fname = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + instruction.lower().replace(" ", "_")
    self.task_dir = os.path.join(self.base_dir, fname)
    os.makedirs(self.task_dir, exist_ok=True)
    
    # 保存标注图像
    image_path = os.path.join(self.task_dir, 'query_img.png')
    cv2.imwrite(image_path, img[..., ::-1])   # RGB → BGR for cv2
    
    # 构建 OpenAI Vision API 请求
    messages = self._build_prompt(image_path, instruction)
    
    # 流式调用 GPT-4o
    stream = self.client.chat.completions.create(
        model=self.config['model'],       # "chatgpt-4o-latest"
        messages=messages,
        temperature=self.config['temperature'],  # 0.0 → 确定性输出
        max_tokens=self.config['max_tokens'],    # 2048
        stream=True
    )
    output = ""
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            output += chunk.choices[0].delta.content

Prompt 构建 (constraint_generation.py:24-47)

def _build_prompt(self, image_path, instruction):
    img_base64 = encode_image(image_path)  # 图像编码为 base64
    prompt_text = self.prompt_template.format(instruction=instruction)
    
    # OpenAI Vision API 的多模态消息格式
    messages = [{
        "role": "user",
        "content": [
            {"type": "text", "text": prompt_text},         # 文字 prompt
            {"type": "image_url", "image_url": {            # 标注图像
                "url": f"data:image/png;base64,{img_base64}"
            }}
        ]
    }]
    return messages

约束解析与保存 (constraint_generation.py:49-73)

def _parse_and_save_constraints(self, output, save_dir):
    lines = output.split("\n")
    functions = dict()
    
    # 逐行扫描,找到所有 "def ... return" 函数块
    for i, line in enumerate(lines):
        if line.startswith("def "):
            start = i
            name = line.split("(")[0].split("def ")[1]  # 提取函数名
        if line.startswith("    return "):
            end = i
            functions[name] = lines[start:end+1]
    
    # 按函数名分组:stage1_subgoal → 一个文件,stage1_path → 一个文件
    groupings = dict()
    for name in functions:
        parts = name.split("_")[:-1]  # "stage1_subgoal_constraint1" → "stage1_subgoal"
        key = "_".join(parts)
        if key not in groupings:
            groupings[key] = []
        groupings[key].append(name)
    
    # 保存为 .txt 文件
    for key in groupings:
        with open(os.path.join(save_dir, f"{key}_constraints.txt"), "w") as f:
            for name in groupings[key]:
                f.write("\n".join(functions[name]) + "\n\n")

1.4.3 实际约束示例:Pen-in-Holder 任务

以项目自带的 vlm_query/pen/ 缓存为例,GPT-4o 为指令 "reorient the white pen and drop it upright into the black pen holder" 生成了以下约束:

Stage 1: 抓取 (stage1_subgoal_constraints.txt)

def stage1_subgoal_constraint1(end_effector, keypoints):
    """Align the end-effector with the white pen's grasping point (keypoint 1)."""
    grasp_point = keypoints[1]                        # 笔的中间部位
    cost = np.linalg.norm(end_effector - grasp_point) # 欧氏距离
    return cost                                        # ≤ 0 时满足

数学解读f1(xee)=xeek12f_1(\mathbf{x}_{ee}) = \|\mathbf{x}_{ee} - \mathbf{k}_1\|_2。这是一个以关键点 k1\mathbf{k}_1 为中心的球形代价场。当 xee\mathbf{x}_{ee} 精确位于 k1\mathbf{k}_1 时,f1=0f_1 = 0(约束满足)。

Stage 2: 翻转使笔竖直 (stage2_subgoal_constraints.txt)

def stage2_subgoal_constraint1(end_effector, keypoints):
    """Ensure the white pen is upright by aligning the vector 
    from keypoint 0 to keypoint 1 with the z-axis."""
    pen_vector = keypoints[1] - keypoints[0]          # 笔的方向向量
    z_axis = np.array([0, 0, 1])                      # 世界坐标系 z 轴
    cost = np.linalg.norm(np.cross(pen_vector, z_axis))  # 叉积的模
    return cost

数学解读:令 v=k1k0\vec{v} = \mathbf{k}_1 - \mathbf{k}_0(笔的方向向量),则:

f2=v×z^2=vsinθf_2 = \|\vec{v} \times \hat{z}\|_2 = \|\vec{v}\| \cdot |\sin\theta|

其中 θ\thetav\vec{v}zz 轴的夹角。当笔完全竖直时(θ=0\theta = 0π\pi),sinθ=0\sin\theta = 0,约束满足。

为什么用叉积而不是点积? 叉积的模 =vsinθ= \|\vec{v}\| |\sin\theta|,在 θ=0\theta = 0 附近是平滑递减的。而 1cosθ1 - \cos\thetaθ=0\theta = 0 附近斜率为 0(平坦),不利于梯度优化。

Stage 2: 路径约束——保持抓取 (stage2_path_constraints.txt)

def stage2_path_constraint1(end_effector, keypoints):
    """The robot must still be grasping the white pen (keypoint 1)."""
    return get_grasping_cost_by_keypoint_idx(1)
    # 内部逻辑:return -env.is_grasping(keypoint_obj) + 1
    # 抓着 → 返回 0(满足),没抓 → 返回 1(违反)

Subgoal vs Path 约束的区别

  • Subgoal 约束:只在阶段结束时检查("最终状态"约束)
  • Path 约束:在阶段整个过程中持续检查("过程中"约束)

例如 Stage 2 中,"笔竖直"是 subgoal(只需最终竖直),"保持抓取"是 path(全程都要抓着)。

Stage 3: 插入笔筒 (stage3_subgoal_constraints.txt)

def stage3_subgoal_constraint1(end_effector, keypoints):
    """Ensure the white pen is above the black pen holder opening."""
    holder_opening = np.mean(keypoints[3:7], axis=0)  # 4 个笔筒关键点的质心
    above_holder = holder_opening + np.array([0, 0, 0.2])  # 质心上方 20cm
    cost = np.linalg.norm(keypoints[1] - above_holder)
    return cost

def stage3_subgoal_constraint2(end_effector, keypoints):
    """Ensure the white pen is upright."""
    pen_vector = keypoints[1] - keypoints[0]
    z_axis = np.array([0, 0, 1])
    cost = np.linalg.norm(np.cross(pen_vector, z_axis))
    return cost

数学解读f3=k1(kˉholder+Δz)2f_3 = \|\mathbf{k}_1 - (\bar{\mathbf{k}}_{holder} + \Delta z)\|_2,其中 kˉholder=14i=36ki\bar{\mathbf{k}}_{holder} = \frac{1}{4}\sum_{i=3}^{6}\mathbf{k}_i 是笔筒开口的质心,Δz=[0,0,0.2]T\Delta z = [0,0,0.2]^T 是竖直偏移。

元数据 (metadata.json)

{
  "num_stages": 3,
  "grasp_keypoints": [1, -1, -1],     // Stage 1 抓关键点 1,后续不抓
  "release_keypoints": [-1, -1, 1],   // Stage 3 末尾释放关键点 1
  "num_keypoints": 7,
  "init_keypoint_positions": [[-0.258, -0.236, 0.691], ...]  // 7 个关键点初始 3D 坐标
}

1.4.4 安全执行机制

GPT-4o 生成的代码不能直接 exec()——这有安全风险。项目使用 exec_safe() 做了沙箱化处理:

# utils.py:214-232
def exec_safe(code_str, gvars=None, lvars=None):
    banned_phrases = ['import', '__']    # 禁止 import 和双下划线(如 __builtins__)
    for phrase in banned_phrases:
        assert phrase not in code_str    # 如果包含则直接报错
    
    # 覆盖危险的内建函数
    custom_gvars = merge_dicts([
        gvars,
        {'exec': empty_fn, 'eval': empty_fn}  # 禁用 exec/eval 嵌套调用
    ])
    exec(code_str, custom_gvars, lvars)

加载约束时,只注入 np(NumPy)和 get_grasping_cost_by_keypoint_idx 两个外部符号:

# utils.py:234-247
def load_functions_from_txt(txt_path, get_grasping_cost_fn):
    with open(txt_path, 'r') as f:
        functions_text = f.read()
    gvars_dict = {
        'np': np,
        'get_grasping_cost_by_keypoint_idx': get_grasping_cost_fn,
    }
    lvars_dict = dict()
    exec_safe(functions_text, gvars=gvars_dict, lvars=lvars_dict)
    return list(lvars_dict.values())  # 返回所有定义的函数

1.5 Stage 3: 优化求解 (Optimization-based Control)

这是 ReKep 最复杂也最核心的部分。约束函数只定义了"什么是好状态",而如何到达那个状态需要通过数值优化来求解。

系统采用分层优化架构:先用 SubgoalSolver 确定"去哪",再用 PathSolver 规划"怎么去"。

1.5.1 SubgoalSolver 详解

核心问题:给定当前状态和约束函数,求解一个最优的末端执行器目标位姿。

优化变量p=(x,y,z,α,β,γ)R6\mathbf{p} = (x, y, z, \alpha, \beta, \gamma) \in \mathbb{R}^6(位置 + 欧拉角),归一化到 [1,1]6[-1, 1]^6

完整目标函数 (subgoal_solver.py:15-112):

Etotal(p)=0.8Ccol(p)碰撞避免+1.0Cinit(p)平滑性+20.0Cik(p)可达性+0.2Creg(p)正则化+10.0Cgrasp(p)抓取偏好+200.0i[fi(p)]+约束代价E_{total}(\mathbf{p}) = \underbrace{0.8 \cdot C_{col}(\mathbf{p})}_{\text{碰撞避免}} + \underbrace{1.0 \cdot C_{init}(\mathbf{p})}_{\text{平滑性}} + \underbrace{20.0 \cdot C_{ik}(\mathbf{p})}_{\text{可达性}} + \underbrace{0.2 \cdot C_{reg}(\mathbf{p})}_{\text{正则化}} + \underbrace{10.0 \cdot C_{grasp}(\mathbf{p})}_{\text{抓取偏好}} + \underbrace{200.0 \cdot \sum_i [f_i(\mathbf{p})]_+}_{\text{约束代价}}

其中 [x]+=max(0,x)[x]_+ = \max(0, x) 是 ReLU 函数(只惩罚违反的约束)。

逐项解析:

1. 碰撞代价 CcolC_{col}(权重 0.8)

# subgoal_solver.py:37-39 → utils.py:34-42
collision_cost = 0.8 * calculate_collision_cost(
    opt_pose_homo[None],       # 候选位姿 [1, 4, 4]
    sdf_func,                  # SDF 插值函数
    collision_points_centered, # 夹持器点云 [N, 3]
    0.10                       # 安全阈值(10cm)
)

# utils.py:34-42 的实现:
def calculate_collision_cost(poses, sdf_func, collision_points, threshold):
    # 将夹持器点云变换到候选位姿下
    transformed_pcs = batch_transform_points(collision_points, poses)  # [M, N, 3]
    transformed_pcs_flatten = transformed_pcs.reshape(-1, 3)           # [M*N, 3]
    
    # 查询 SDF 值(正值=在物体外部,负值=在物体内部)
    # 加上 threshold:即使不接触,太近也要惩罚
    signed_distance = sdf_func(transformed_pcs_flatten) + threshold    # [M*N]
    
    # 只惩罚 SDF > 0 的情况(即"离物体太近"或"在物体内")
    non_zero_mask = signed_distance > 0
    collision_cost = np.sum(signed_distance[non_zero_mask])
    return collision_cost

SDF (Signed Distance Field) 原理

SDF 是一个标量场 ϕ:R3R\phi: \mathbb{R}^3 \to \mathbb{R},定义为每个点到最近表面的有符号距离

  • ϕ(x)>0\phi(\mathbf{x}) > 0:点在物体外部,距离 ϕ|\phi|
  • ϕ(x)=0\phi(\mathbf{x}) = 0:点在物体表面
  • ϕ(x)<0\phi(\mathbf{x}) < 0:点在物体内部,距离 ϕ|\phi|

碰撞代价 = imax(0,ϕ(pi)+ϵ)\sum_{i} \max(0, -\phi(\mathbf{p}_i) + \epsilon),其中 ϵ=0.10\epsilon = 0.10m 是安全距离。注意:项目中 SDF 的符号约定与 Open3D 相反(sdf_voxels = -sdf_voxels,见 environment.py:135),所以代码中是 sdf + threshold 而非 -sdf + threshold

2. 初始位姿代价 CinitC_{init}(权重 1.0)

# subgoal_solver.py:43-45 → utils.py:44-58
init_pose_cost = 1.0 * consistency(opt_pose_homo[None], init_pose_homo[None], rot_weight=1.5)

数学公式

Cinit=topttcurr2+1.5arccos(tr(RoptTRcurr)12)C_{init} = \|\mathbf{t}_{opt} - \mathbf{t}_{curr}\|_2 + 1.5 \cdot \arccos\left(\frac{\text{tr}(\mathbf{R}_{opt}^T \mathbf{R}_{curr}) - 1}{2}\right)

这鼓励优化结果不要偏离当前位姿太远,避免突然大幅运动。

3. IK 可达性代价 CikC_{ik}(权重 20.0)

# subgoal_solver.py:48-58
max_iterations = 20
ik_result = ik_solver.solve(opt_pose_homo, max_iterations=max_iterations, ...)
ik_cost = 20.0 * (ik_result.num_descents / max_iterations)

数学原理:使用 CCD(Cyclic Coordinate Descent,循环坐标下降法)求解逆运动学。CCD 迭代次数越多,说明目标位姿越难达到。用迭代次数的归一化值作为可达性的代理指标:

Cik=20×ndescentsnmaxC_{ik} = 20 \times \frac{n_{descents}}{n_{max}}

如果 IK 求解完全失败(success=False),正则化代价也取最大值。

4. 抓取偏好代价 CgraspC_{grasp}(权重 10.0,仅抓取阶段)

# subgoal_solver.py:69-74
if is_grasp_stage:
    preferred_dir = np.array([0, 0, -1])  # 从上方抓取
    # opt_pose_homo[:3, 0] 是末端执行器 x 轴方向(即"接近方向")
    grasp_cost = -np.dot(opt_pose_homo[:3, 0], preferred_dir) + 1  # [0, 2]
    grasp_cost = 10.0 * grasp_cost

含义:鼓励机器人从正上方向下接近物体。x^eez^down=cosα\hat{\mathbf{x}}_{ee} \cdot \hat{\mathbf{z}}_{down} = -\cos\alpha,其中 α\alpha 是接近方向与竖直方向的夹角。α=0\alpha = 0(竖直向下)时代价最小。

5. 约束违反代价(权重 200.0)

# subgoal_solver.py:79-105
subgoal_constraint_cost = 0
transformed_keypoints = transform_keypoints(opt_pose_homo, keypoints_centered, keypoint_movable_mask)
for constraint in goal_constraints:
    violation = constraint(transformed_keypoints[0], transformed_keypoints[1:])
    subgoal_constraint_cost += np.clip(violation, 0, np.inf)  # ReLU: 只惩罚正值
subgoal_constraint_cost = 200.0 * subgoal_constraint_cost

关键操作transform_keypoints 的作用

当物体被抓在手中时,移动末端执行器会带动物体一起移动。transform_keypoints 根据末端执行器的位姿变化,更新所有"可移动"关键点的位置:

# utils.py:60-65
def transform_keypoints(transform, keypoints, movable_mask):
    transformed_keypoints = keypoints.copy()
    if movable_mask.sum() > 0:
        # 只变换 movable=True 的关键点
        transformed_keypoints[movable_mask] = \
            np.dot(keypoints[movable_mask], transform[:3, :3].T) + transform[:3, 3]
    return transformed_keypoints

这是 ReKep 能做闭链操作(如抓着茶壶倒水)的关键——约束中引用的关键点位置会随末端执行器变化而自动更新。

1.5.2 两阶段优化策略

# subgoal_solver.py:248-272
if from_scratch:
    # Phase 1: 全局搜索(首次求解)
    opt_result = dual_annealing(
        func=objective,
        bounds=bounds,           # 每个维度 [-1, 1]
        args=aux_args,
        maxfun=5000,             # 最多评估 5000 次目标函数
        x0=init_sol,             # 初始猜测(当前位姿)
        no_local_search=False,   # 允许局部搜索
        minimizer_kwargs={
            'method': 'SLSQP',
            'options': {'maxiter': 200},
        },
    )
else:
    # Phase 2: 局部精化(后续迭代)
    opt_result = minimize(
        fun=objective,
        x0=init_sol,             # 热启动:使用上次求解结果
        args=aux_args,
        bounds=bounds,
        method='SLSQP',
        options={'maxiter': 200},
    )

为什么需要两阶段?

  • Dual Annealing(模拟退火的改进版):通过随机扰动探索整个搜索空间,能跳出局部最优。适合首次求解时的全局搜索,但计算开销大(~5000 次函数评估,每次包含 IK 求解)。
  • SLSQP(Sequential Least Squares Programming):基于梯度的局部优化,从上次解出发快速收敛。适合场景微小变化后的增量更新(~200 次迭代)。

类比:Dual Annealing 像是"在整个地图上找餐厅"(全局搜索),SLSQP 像是"从上次吃饭的地方沿着街走到最近的新餐厅"(局部搜索)。

1.5.3 变量归一化

优化变量被归一化到 [1,1][-1, 1] 范围,这对优化器的稳定性至关重要:

# utils.py:16-32
def normalize_vars(vars, og_bounds):
    """将原始变量 [b_min, b_max] 映射到 [-1, 1]"""
    normalized_vars = np.empty_like(vars)
    for i, (b_min, b_max) in enumerate(og_bounds):
        normalized_vars[i] = (vars[i] - b_min) / (b_max - b_min) * 2 - 1
    return normalized_vars

def unnormalize_vars(normalized_vars, og_bounds):
    """将 [-1, 1] 反映射回 [b_min, b_max]"""
    vars = np.empty_like(normalized_vars)
    for i, (b_min, b_max) in enumerate(og_bounds):
        vars[i] = (normalized_vars[i] + 1) / 2 * (b_max - b_min) + b_min
    return vars

原始范围

  • 位置:x[0.45,0.10]x \in [-0.45, 0.10]y[0.75,0.60]y \in [-0.75, 0.60]z[0.698,1.2]z \in [0.698, 1.2](工作空间边界)
  • 旋转:α,β,γ[π,π]\alpha, \beta, \gamma \in [-\pi, \pi](欧拉角全范围)

归一化后,优化器可以用统一的步长处理位置和旋转,不会因量纲不同导致某些维度被忽略。

1.5.4 PathSolver 详解

核心问题:给定起点(当前位姿)和终点(SubgoalSolver 的输出),规划一条无碰撞、满足约束的路径。

优化变量Nctrl2N_{ctrl} - 2 个中间控制点的 6D 位姿(起止点固定)。

控制点数量自适应计算

# path_solver.py:210-211
num_control_points = get_linear_interpolation_steps(
    start_pose, end_pose,
    self.config['opt_pos_step_size'],   # 0.20m
    self.config['opt_rot_step_size']    # 0.78 rad ≈ 45°
)
num_control_points = np.clip(num_control_points, 3, 6)  # 限制在 3~6 个

数学Nctrl=clip(max(Δp0.20,Δθ0.78),3,6)N_{ctrl} = \text{clip}\left(\max\left(\lceil\frac{\Delta p}{0.20}\rceil, \lceil\frac{\Delta\theta}{0.78}\rceil\right), 3, 6\right)

直觉:起止位姿差距越大,需要越多的中间点来描述复杂路径。

路径代价函数

Epath=0.5Ccol碰撞+4.0Lpath路径长度+20.0Cik(k)每个控制点的可达性+200.0j,t[gj(pt)]+路径约束E_{path} = \underbrace{0.5 \cdot C_{col}}_{\text{碰撞}} + \underbrace{4.0 \cdot L_{path}}_{\text{路径长度}} + \underbrace{20.0 \cdot \sum C_{ik}^{(k)}}_{\text{每个控制点的可达性}} + \underbrace{200.0 \cdot \sum_{j,t} [g_j(\mathbf{p}_t)]_+}_{\text{路径约束}}

路径长度代价 LpathL_{path}(权重 4.0):

# utils.py:123-131
@njit(cache=True, fastmath=True)
def path_length(samples_homo):
    pos_length = 0
    rot_length = 0
    for i in range(len(samples_homo) - 1):
        pos_length += np.linalg.norm(
            samples_homo[i, :3, 3] - samples_homo[i+1, :3, 3])  # 位置距离
        rot_length += angle_between_rotmat(
            samples_homo[i, :3, :3], samples_homo[i+1, :3, :3])  # 旋转距离
    return pos_length, rot_length

Lpath=i=0T2ti+1ti2+1.0i=0T2arccos(tr(Ri+1TRi)12)L_{path} = \sum_{i=0}^{T-2} \|\mathbf{t}_{i+1} - \mathbf{t}_i\|_2 + 1.0 \cdot \sum_{i=0}^{T-2} \arccos\left(\frac{\text{tr}(\mathbf{R}_{i+1}^T \mathbf{R}_i) - 1}{2}\right)

稠密采样与碰撞检测 (utils.py:84-121):

控制点通过 SLERP 插值生成稠密路径点,碰撞检测在这些稠密点上进行:

# utils.py:84-121 (JIT 编译加速)
@njit(cache=True, fastmath=True)
def get_samples_jitted(control_points_homo, control_points_quat, 
                       opt_interpolate_pos_step_size, opt_interpolate_rot_step_size):
    # 对每一段,计算需要多少个插值点
    for i in range(len(control_points_homo) - 1):
        pos_diff = np.linalg.norm(start_pos - end_pos)
        rot_diff = angle_between_rotmat(start_rotmat, end_rotmat)
        pos_num_steps = np.ceil(pos_diff / 0.02)   # 每 2cm 一个检测点
        rot_num_steps = np.ceil(rot_diff / 0.10)    # 每 ~5.7° 一个检测点
        num_path_poses = int(max(pos_num_steps, rot_num_steps))
    
    # SLERP 插值生成中间位姿
    for j in range(num_samples):
        alpha = j / (num_samples - 1)
        pos = start_pos * (1 - alpha) + end_pos * alpha          # 线性插值位置
        blended_xyzw = T.quat_slerp_jitted(start_xyzw, end_xyzw, alpha)  # 球面插值旋转

SLERP (Spherical Linear Interpolation) 公式:

q(α)=sin((1α)Ω)sinΩq0+sin(αΩ)sinΩq1\mathbf{q}(\alpha) = \frac{\sin((1-\alpha)\Omega)}{\sin\Omega}\mathbf{q}_0 + \frac{\sin(\alpha\Omega)}{\sin\Omega}\mathbf{q}_1

其中 Ω=arccos(q0q1)\Omega = \arccos(\mathbf{q}_0 \cdot \mathbf{q}_1)。SLERP 保证旋转插值在四元数球面上均匀进行,避免了欧拉角插值的万向锁和不均匀性问题。

最终路径的 B-spline 平滑 (utils.py:318-385):

def spline_interpolate_poses(control_points, num_steps):
    # 1. 对位置分量拟合 B-spline 曲线
    pos_spline = fit_b_spline(control_points_pos)   # 三次 B-spline
    
    # 2. 对旋转分量拟合 RotationSpline(基于 scipy 的旋转样条)
    rotations = R.from_matrix(control_points_rotmat)
    rot_spline = RotationSpline(times, rotations)
    
    # 3. 在样条上均匀采样 num_steps 个点
    pos_samples = sample_from_spline(pos_spline, num_steps)  # [num_steps, 3]
    rot_samples = sample_from_spline(rot_spline, num_steps)  # [num_steps, 3, 3]

B-spline vs 线性插值

B-spline 生成的路径是光滑的(连续可微),避免了控制点处的速度突变。这对实物机器人非常重要——突变的速度指令会导致机械振动和电机过载。


1.6 闭环执行与回溯机制

ReKep 的执行不是"规划一次,执行到底",而是每几步就重新感知、重新优化。这是它面对扰动依然鲁棒的关键。

主循环 (main.py:110-182):

while True:
    # ========= 1. 感知 =========
    scene_keypoints = self.env.get_keypoint_positions()  # 实时跟踪关键点
    self.keypoints = np.concatenate([
        [self.env.get_ee_pos()],  # 第 0 个"关键点"是末端执行器自身
        scene_keypoints
    ], axis=0)
    self.curr_ee_pose = self.env.get_ee_pose()
    self.sdf_voxels = self.env.get_sdf_voxels(0.01)     # 更新碰撞场
    
    # ========= 2. 检查约束 → 决定是否回溯 =========
    backtrack = False
    if self.stage > 1:
        for constraint in self.constraint_fns[self.stage]['path']:
            violation = constraint(self.keypoints[0], self.keypoints[1:])
            if violation > self.config['constraint_tolerance']:  # > 0.10
                backtrack = True
                break
    
    if backtrack:
        # 逆序检查更早的阶段,找到第一个"安全"的阶段
        for new_stage in range(self.stage - 1, 0, -1):
            path_constraints = self.constraint_fns[new_stage]['path']
            if len(path_constraints) == 0:
                break  # 无约束 → 安全
            all_satisfied = all(
                c(self.keypoints[0], self.keypoints[1:]) <= tolerance
                for c in path_constraints
            )
            if all_satisfied:
                break
        self._update_stage(new_stage)  # 回溯到那个阶段
    else:
        # ========= 3. 正常流程:优化 + 执行 =========
        next_subgoal = self._get_next_subgoal(from_scratch=self.first_iter)
        next_path = self._get_next_path(next_subgoal, from_scratch=self.first_iter)
        self.first_iter = False
        self.action_queue = next_path.tolist()
        
        # ========= 4. 执行有限步动作 =========
        count = 0
        while len(self.action_queue) > 0 and count < 5:  # action_steps_per_iter=5
            next_action = self.action_queue.pop(0)
            precise = len(self.action_queue) == 0  # 最后一步用精确模式
            self.env.execute_action(next_action, precise=precise)
            count += 1
        
        # ========= 5. 阶段转换 =========
        if len(self.action_queue) == 0:
            if self.is_grasp_stage:
                self._execute_grasp_action()  # 执行抓取
            elif self.is_release_stage:
                self._execute_release_action()  # 执行释放
            if self.stage == self.program_info['num_stages']:
                return  # 所有阶段完成
            self._update_stage(self.stage + 1)  # 前进到下一阶段

回溯机制的工程细节

时间线:
t=0     t=5     t=10    t=15    t=20
|-------|-------|-------|-------|
 Stage1   Stage2  ← 检测到约束违反!
 (抓取)   (翻转)    笔掉了
                    │
                    ▼ 回溯到 Stage1
                    Stage1 → Stage2 → Stage3
                    (重新抓取)  (重新翻转)

回溯的触发条件:当前阶段的 path constraint 违反度 > constraint_tolerance = 0.10

回溯的目标:找到最近的一个"路径约束全部满足"或"没有路径约束"的阶段,从那里重新开始。

抓取动作的实现 (main.py:260-268):

def _execute_grasp_action(self):
    pregrasp_pose = self.env.get_ee_pose()
    grasp_pose = pregrasp_pose.copy()
    # 沿末端执行器 x 轴(接近方向)前进 grasp_depth
    grasp_pose[:3] += T.quat2mat(pregrasp_pose[3:]) @ np.array([0.10, 0, 0])
    grasp_action = np.concatenate([grasp_pose, [self.env.get_gripper_close_action()]])
    self.env.execute_action(grasp_action, precise=True)

注意:SubgoalSolver 输出的是预抓取位姿(退后了 grasp_depth/2 = 5cm),实际抓取时再前进 grasp_depth = 10cm 并合上夹爪。这个两步策略避免了在优化阶段就要考虑接触物理。


Section 1 of 4

Route control

After Reading

Choose the next trail: follow the same topic route, open the research shelf, or continue through nearby notes.

机器人3 nearby notes