Theseus

Theseus 是一个基于 PyTorch 的可微分非线性优化库。

Theseus 的动机来源于机器人学和计算机视觉中的一些问题,这些问题可以被表述为可微分的非线性最小二乘优化问题,例如同时定位与建图(SLAM)、运动规划以及捆绑调整(bundle adjustment)。这些问题可以被广泛归类为结构化学习(structured learning):在这种学习方式中,神经网络组件可以与已知的先验以模块化的方式结合,从而在传统方法的基础上发挥深度学习的优势。虽然这一领域的兴趣正在迅速增加,但现有的工作仍然是零散的,并且大多基于特定应用的代码库。Theseus 通过提供一个与问题无关的结构化学习平台来填补这一空白,使用户能够轻松地将神经网络与表示为可微分模块的非线性优化先验相结合,并对其进行端到端训练。

本教程将介绍在 Theseus 中解决此类优化问题的基本构建模块;在接下来的教程中,我们将展示如何将这些构建模块组合起来,逐步解决不同方面和复杂度的优化问题。本教程涵盖六个核心概念模块:

  1. 变量(Variables):对 torch 张量的命名封装,是在 Theseus 中定义优化问题的基本数据类型。(第1节)
  2. 代价函数(Cost functions):根据一个或多个变量计算误差项,即 Theseus 优化器要最小化的函数。(第2节)
  3. 代价权重(Cost weights):计算权重,用于调整一个或多个代价函数在整体目标中的贡献。(第3节)
  4. 目标(Objective):将多个代价函数和权重组合在一起,定义一个优化问题的结构。(第4节)
  5. 优化器(Optimizer):实现具体的优化算法(如 Gauss-Newton、Levenberg-Marquardt),用于最小化目标函数。(第5节)
  6. TheseusLayer:将目标和优化器组合在一起,作为 上游/下游 torch 模块与可微分优化问题之间的接口。(第6节)

1. 变量 Variables

Theseus 中,优化目标是 th.Variable 对象 的函数。这些对象是对 torch.tensor 的封装,可以有不同的类型(例如 二维点旋转群 等),并且可以选择性地关联一个名称。在 Theseus 中,我们要求所有变量的第一维必须是批处理维度(这与 PyTorch 模块中的约定类似)。

这里介绍所有 Variable 的两个主要操作:

  1. 创建变量(Creating variables)
  2. 更新变量(Updating Variables)

1.1 创建变量

变量(Variables)可以通过通用的 th.Variable 接口创建,或者通过带有自定义功能的子类创建。
在 Theseus 的应用中,许多变量都是流形(manifolds);因此,Theseus 提供了若干 Variable 子类,用来支持常见的流形类型,例如:

  • 向量(vectors)
  • 二维/三维点(2-D/3-D points)
  • 二维旋转(2-D rotations)
  • 二维刚体变换(2-D rigid transformations)

下面展示了一些示例用法:

1
2
import torch
import theseus as th
1
2
3
4
5
6
7
8
9
10
11
12
# 创建一个批大小为 2、包含 3 维随机数据的变量,并命名为 "x"
x = th.Variable(torch.randn(2, 3), name="x")
print(f"x: 具有批大小 2 的三维命名变量:\n {x}\n")

# 创建一个未命名的变量。系统会为其生成一个默认名称
y = th.Variable(torch.zeros(1, 1))
print(f"y: 未命名变量:\n {y}\n")

# 创建一个命名的 SE2(二维刚体变换)变量,并指定数据(批大小 = 2)
z = th.SE2(x_y_theta=torch.zeros(2, 3).double(), name="se2_1")
print(f"z: 命名的 SE2 变量:\n {z}")

得到的输出结果为:

1
2
3
4
5
6
7
8
9
10
x: Named variable with 3-D data of batch size 2:
Variable(tensor=tensor([[-0.5966, 0.7318, 2.2279],
[ 0.6040, 0.3843, -2.0580]]), name=x)

y: Un-named variable:
Variable(tensor=tensor([[0.]]), name=Variable__1)

z: Named SE2 variable:
SE2(xytheta=tensor([[0., 0., 0.],
[0., 0., 0.]], dtype=torch.float64), name=se2_1)

1.2 更新变量

在创建变量之后,可以通过 update() 方法 来更新其值。
下面我们展示了一些示例,以及在更新变量时需要避免的潜在错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# `update` 的示例用法
print("`update` 的示例用法: ")
print(f" 原始变量: {x}")
x.update(torch.ones(2, 3))
print(f" 更新后的变量: {x}\n")

# 以下输入方式是错误的
print("Variable `update` 的错误输入示例:")
try:
# `update` 要求输入张量必须符合变量的内部数据格式
x.update(torch.zeros(2, 4))
except ValueError as e:
print(f" 内部数据格式不匹配:")
print(f" {e}")
try:
# `update` 要求输入必须包含批处理维度
x.update(torch.zeros(3))
except ValueError as e:
print(f" 缺少批处理维度: ")
print(f" {e}\n")

# 不过,`update` 可以用来改变批大小
print("通过 `update` 改变变量的批大小:")
x.update(torch.ones(4, 3))
print(f" 新形状: {x.shape}")

输出结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
`update` 的示例用法: 
原始变量: Variable(tensor=tensor([[-0.5966, 0.7318, 2.2279],
[ 0.6040, 0.3843, -2.0580]]), name=x)
更新后的变量: Variable(tensor=tensor([[1., 1., 1.],
[1., 1., 1.]]), name=x)

Variable `update` 的错误输入示例:
内部数据格式不匹配:
尝试用与原始张量形状不兼容的数据更新张量 x。给定 torch.Size([4])。期望: torch.Size([3])
缺少批处理维度:
尝试用与原始张量形状不兼容的数据更新张量 x。给定 torch.Size([])。期望: torch.Size([3])

通过 `update` 改变变量的批大小:
新形状: torch.Size([4, 3])

在接下来的几个章节中,我们将看到 Variables 在 Theseus 中的优化问题里被使用的不同方式。

2. 代价函数 Cost functions

Theseus 中,代价函数(cost function) 表示一个或多个 Theseus 变量 的误差函数。
因此,代价函数刻画了 Theseus 中被优化的核心量。

基于这一点,代价函数需要区分:哪些变量是可以被优化的,哪些变量是不允许优化的。
在 Theseus 中,这一概念通过两类变量来表示:

  • 优化变量(optimization variables):可以被 Theseus 优化器修改,用于最小化目标函数。
  • 辅助变量(auxiliary variables):在计算目标时需要,但在 Theseus 优化器看来保持不变。

在 Theseus 中,如果一个 Variable 在创建代价函数时被声明为优化变量,它就会成为优化变量。
所有优化变量必须是 th.Manifold 的子类。

因此,创建代价函数时必须显式声明其优化变量(必需)和辅助变量(可选)。
代价函数提供的核心操作包括:

  • 误差的计算(error computation)
  • 基于变量最新取值的误差雅可比矩阵(Jacobian)计算

th.CostFunction 是一个抽象类(abstract class)。要实例化它,必须实现误差和雅可比矩阵的计算方法。
代价函数必须返回一个 torch 张量 作为误差。

作为一个简单示例,我们将展示如何使用 th.Difference 代价函数。
它是 th.CostFunction 的一个具体子类。下面我们通过两个 Vector 变量 来实例化该代价函数,其中一个是优化变量,另一个是辅助变量。

接下来我们将展示一些在代价函数上的常用操作:

  • 代价函数如何访问其优化变量和辅助变量;
  • 如何计算误差(在 th.Difference 中定义为 optim_var - target);
  • 当底层变量更新时,误差如何发生变化;
  • 最后,展示其雅可比矩阵的计算结果:它会返回一个列表,其中每个优化变量对应一个雅可比矩阵。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 注意:CostWeight(代价权重)是在构建代价函数时必须的一个权重量。
# 我们会在第 3 节中详细解释;在这个示例里,我们只是简单地创建它,但不使用它。
w1 = th.ScaleCostWeight(2.0)

# 创建一个 Difference 代价函数
optim_var = th.Vector(tensor=torch.ones(1, 2), name="x1")
target = th.Vector(tensor=torch.zeros(1, 2), name="target")
cf = th.Difference(optim_var, target, w1)

# 代价函数可以获取其优化变量和辅助变量
print("从代价函数中获取优化变量和辅助变量:")
print(" 优化变量: ", list(cf.optim_vars))
print(" 辅助变量: ", list(cf.aux_vars))
print("")

# 代价函数会使用变量的值来计算误差
error = cf.error()
print(f"原始代价函数(未加权)的误差:\n {error},其形状为 {error.shape}\n")

# 代价函数会使用变量的【最新】值,
# 这可以通过变量更新后误差的变化来体现。
print("将优化变量更新为原来的 2 倍: ")
optim_var.update(2 * torch.ones(1, 2))
print(f" 更新后的变量: {optim_var}")
# 此时误差会变成上面输出的 2 倍
print(f" 更新后的(未加权)误差: {cf.error()}\n")

# 计算(未加权的)雅可比矩阵和误差
# 返回的结果是一个雅可比矩阵列表,每个优化变量对应一个条目。
print("计算代价函数的(未加权)雅可比矩阵:")
jacobians, error = cf.jacobians() # 注意 cf.jacobians 也会返回误差
print(f" 雅可比矩阵: 类型 {type(jacobians)},长度 {len(jacobians)}")
print(f" {jacobians[0]}")
# 第 i 个雅可比矩阵的形状为 (batch_size, cf.dim(), 第 i 个优化变量的自由度)
print(f" 第 0 个雅可比矩阵的形状: {jacobians[0].shape}")

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
从代价函数中获取优化变量和辅助变量:
优化变量: [Vector(dof=2, tensor=tensor([[1., 1.]]), name=x1)]
辅助变量: [Vector(dof=2, tensor=tensor([[0., 0.]]), name=target)]

原始代价函数(未加权)的误差:
tensor([[1., 1.]]),形状为 torch.Size([1, 2])

将优化变量更新为原来的 2 倍:
更新后的变量: Vector(dof=2, tensor=tensor([[2., 2.]]), name=x1)
更新后的(未加权)误差: tensor([[2., 2.]])

计算代价函数的(未加权)雅可比矩阵:
雅可比矩阵: <class 'list'>,长度为 1
tensor([[[1., 0.],
[0., 1.]]])
第 0 个雅可比矩阵的形状: torch.Size([1, 2, 2])

在第 3 章教程中,我们将深入探讨代价函数的内部机制,并展示如何构建自定义代价函数。

3. 代价权重 Cost weights

Theseus 代价权重(cost weight) 是应用于代价函数的一种权重函数:它根据一个或多个变量计算权重,并将其应用到一个或多个代价函数的误差上。
因此,代价权重是在优化问题中修改代价函数误差的一种方式。代价权重提供了另一层抽象,有助于在一个目标函数中不同的代价函数之间进行权衡。

th.CostWeight 是一个抽象类,因为任何关于变量的函数都可以用来创建 CostWeight。
Theseus 目前提供了若干具体的 CostWeight 子类

  • ScaleCostWeight:权重函数是一个标量实数;
  • DiagonalCostWeight:权重函数是一个对角矩阵;
  • th.eb.GPCostWeight:权重函数表示一个严格稀疏高斯过程的逆协方差函数。

代价权重的主要用途是支持代价函数的 weighted_errorweighted_jacobians_and_error 函数;因此这些子类会实现它们定义好的权重函数。

在 CostWeight 中使用的变量可以是命名的或未命名的;然而,使用命名变量可以让我们直接更新 CostWeight 的值;这在代价权重由外部函数(例如 torch.nn.Module)计算时,更新目标函数(Objective)或 TheseusLayer 时尤其有用。

下面我们将通过 ScaleCostWeight 类展示一个 CostWeight 的使用示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
print("创建 ScaleCostWeight:")
# 从一个浮点数创建 ScaleCostWeight
w1 = th.ScaleCostWeight(10.0)
# 这个权重会被封装成一个默认变量
print(f" w1(默认变量): {w1.scale}")

# 也可以直接传入一个 Theseus 变量
w2 = th.ScaleCostWeight(th.Variable(2 * torch.ones(1, 1), name="scale"))
print(f" w2(命名变量): {w2.scale}\n")

# 使用 ScaleCostWeight 对误差和雅可比矩阵加权
print("使用 ScaleCostWeight 直接加权误差/雅可比矩阵:")
weighted_jacobians, weighted_error = w1.weight_jacobians_and_error(jacobians, error)
print(f" 雅可比矩阵:\n 加权后: {weighted_jacobians}\n 原始: {jacobians}")
print(f" 误差:\n 加权后: {weighted_error}\n 原始: {error}\n")

# 如果 ScaleCostWeight 已包含在代价函数中,我们可以直接
# 使用代价函数的 `weighted_error` 和 `weighted_jacobians_and_error` 方法
print("使用之前代价函数的 `weighted_error` 方法:")
print(f" 加权后的代价函数误差: {cf.weighted_error()} vs 未加权误差: {cf.error()}")

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
创建 ScaleCostWeight:
w1(默认变量): Variable(tensor=tensor([[10.]]), name=Variable__17)
w2(命名变量): Variable(tensor=tensor([[2.]]), name=scale)

使用 ScaleCostWeight 直接加权误差/雅可比矩阵:
雅可比矩阵:
加权后: [tensor([[[10., 0.],
[ 0., 10.]]])]
原始: [tensor([[[1., 0.],
[0., 1.]]])]
误差:
加权后: tensor([[20., 20.]])
原始: tensor([[2., 2.]])

使用之前代价函数的 `weighted_error` 方法:
加权后的代价函数误差: tensor([[4., 4.]]) vs 未加权误差: tensor([[2., 2.]])

4. 目标 Objective

th.Objective 定义了一个优化问题的结构,通过向其中添加一个或多个代价函数,每个代价函数都可以关联权重和变量。
th.Objective 会将这些代价函数组合成一个全局误差函数,其内部结构可以被 Theseus 优化器利用,通过修改优化变量来最小化全局误差。

目前,th.Objective 支持非线性最小二乘目标(nonlinear sum of squares objectives),即全局误差由每个代价函数误差的平方和组成,并通过对应的代价权重加权。
我们计划在未来扩展支持其他类型的优化结构。

在创建 Objective 时有一个关键点:Theseus 假设提供的代价权重在最终目标函数中也会被平方。
形式上,我们目前支持的目标函数形式为:

(原文图片丢失)

其中,v 表示变量集合,fᵢ 是代价函数的误差,wᵢ 是其对应的代价权重。

下面我们展示一个创建 Objective 的简单示例。
我们希望最小化如下函数:

(xa)2+4(yb)2(x - a)^2 + 4(y - b)^2

其中 ab 是常数,xy 是变量。

下面我们依次创建:

  1. 优化变量和辅助变量
  2. 代价权重
  3. 代价函数
  4. 目标函数(Objective)

然后,为了评估 Objective,我们将使用它的 error_metric 函数,该函数计算误差向量的平方范数并除以 2。
然而,在评估之前,我们必须至少调用一次 Objective.update 函数(以确保内部数据结构正确初始化)。
通常,update 函数用于方便地修改 Objective 注册的所有变量的值。
该函数接收一个字典,将变量名称映射到对应的 torch 张量,用于更新相应变量的值。

最后,我们展示当前 Objective 对该函数的计算结果是正确的。(在下一节中,我们将优化该目标函数以达到最小值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 步骤 1:构建优化变量和辅助变量
# 构建函数的变量:这些是代价函数的优化变量
x = th.Vector(1, name="x")
y = th.Vector(1, name="y")

# 构建函数常数的辅助变量
a = th.Vector(tensor=torch.randn(1,1), name="a")
b = th.Vector(tensor=torch.randn(1,1), name="b")

# 步骤 2:构建代价权重
# 对于 w1,使用命名变量
w1 = th.ScaleCostWeight(th.Variable(tensor=torch.ones(1, 1), name="w1_sqrt"))
w2 = th.ScaleCostWeight(2.0) # 为 (y-b)^2 项提供 2,作为 sqrt(4)

# 步骤 3:构建表示各误差项的代价函数
# 第一个误差项
cf1 = th.Difference(x, a, w1, name="term_1")
# 第二个误差项
cf2 = th.Difference(y, b, w2, name="term_2")

# 步骤 4:创建目标函数,并添加误差项
objective = th.Objective()
objective.add(cf1)
objective.add(cf2)

# 步骤 5:在当前变量值下评估目标函数
# 注意:评估前需要调用 `objective.update`
# 这里使用 update 函数设置所有变量的值
objective.update({"a": torch.ones(1,1), "b": 2 * torch.ones(1, 1),
"x": 0.5 * torch.ones(1,1), "y": 3 * torch.ones(1, 1)})
# 加权误差应为:cost_weight * weighted_error
print(f"误差项 1: 未加权: {cf1.error()} 加权: {cf1.weighted_error()}")
print(f"误差项 2: 未加权: {cf2.error()} 加权: {cf2.weighted_error()}")
# 目标函数值应为:(error1)^2 + (error2)^2
print(f"目标函数值: {objective.error_metric()}")

输出结果为:

1
2
3
误差项 1: 未加权: tensor([[-0.5000]]) 加权: tensor([[-0.5000]])
误差项 2: 未加权: tensor([[1.]]) 加权: tensor([[2.]])
目标函数值: tensor([4.2500])

将代价函数添加到目标函数(Objective)时,会注册该代价函数的所有优化变量和辅助变量(以及其代价权重中的变量,如果存在的话)。同时,th.Objective 会检查,确保不同的变量或代价函数对象不会使用重复的名称。

1
2
3
4
5
6
7
8
9
10
11
12
try:
objective.add(th.Difference(y, b, w2, name="term_1"))
except ValueError as e:
print(e)

try:
obj2 = th.Objective()
obj2.add(th.Difference(x, a, w1, name="term_1"))
fake_x1 = th.Vector(1, name="x")
obj2.add(th.Difference(fake_x1, b, w2, name="fake_term"))
except ValueError as e:
print(e)

在同一个目标函数中,不允许出现两个名称相同(如 term_1)的不同代价函数对象。
在同一个目标函数中,不允许出现两个名称相同(如 x)的不同变量对象。

5. 优化器 Optimizer

Theseus 提供了一套线性和非线性优化器,用于最小化以 th.Objective 描述的问题。
可以通过调用 optimizer.optimize() 来求解目标函数,该方法会修改优化变量的值,以最小化对应的目标函数。
optimize 会将优化变量保持在找到的最终值,并返回一个 info 对象,其中包含最优解和优化统计信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 回顾我们的目标函数: (x - a)^2 + 4 (y - b)^2
# 最小值在 x = a 和 y = b 处
# 先给它们赋随机值
objective.update({
"x": torch.randn(1, 1),
"y": torch.randn(1, 1)
})

# 现在使用优化器。因为这是一个二次型最小化问题,
# 线性优化器就可以求得最优解
optimizer = th.LinearOptimizer(objective, th.CholeskyDenseSolver)
info = optimizer.optimize()

# 检查 x 和 y 的值
# 这里只打印 Vector 的 tensor 属性以便理解
print(f"x: {x.tensor} vs a: {a.tensor}") # 应该匹配 a = 1
print(f"y: {y.tensor} vs b: {b.tensor}") # 应该匹配 b = 2
print(f"优化后的目标函数值: {objective.error_metric()}")

得到结果:

1
2
3
4
5
6
x: tensor([[1.]]) vs a: tensor([[1.]])
y: tensor([[2.]]) vs b: tensor([[2.]])
优化后的目标函数值: tensor([0.])

警告: /private/home/lep/code/theseus/theseus/optimizer/optimizer.py:42: UserWarning: Vectorization is off by default when not running from TheseusLayer. Using TheseusLayer is the recommended way to run our optimizers.
warnings.warn(

6. TheseusLayer

如上面的警告所示,推荐的优化器使用方式是通过 TheseusLayer
TheseusLayer 提供了 上游/下游 torch 代码Theseus 目标函数和优化器 之间的接口。

forward() 方法将 Objective.update()Optimizer.optimize() 的功能合并为一次调用。
它接收一个更新字典(update dictionary)作为输入,并返回一个字典,其中包含优化变量优化后的 torch 数据 以及优化器的输出信息(info)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建一个 TheseusLayer,传入之前定义的优化器
layer = th.TheseusLayer(optimizer)

# 使用 forward() 方法运行优化器,同时更新变量的初始值
values, info = layer.forward({
"x": torch.randn(1, 1),
"y": torch.randn(1, 1),
"a": torch.ones(1, 1),
"b": 2 * torch.ones(1, 1),
"w1_sqrt": torch.ones(1, 1)
})

# 打印结果
print(f"调用 TheseusLayer 的 forward() 后:")
print(f" 优化变量的值: {values}")
print(f" 优化器信息: {info}")
print(f" 优化后的目标函数值: {objective.error_metric()}")

输出结果如下:

1
2
3
4
调用 TheseusLayer 的 forward() 后:
优化变量的值: {'x': tensor([[1.]]), 'y': tensor([[2.]])}
优化器信息: OptimizerInfo(best_solution={'x': tensor([[1.]]), 'y': tensor([[2.]])}, status=array([<LinearOptimizerStatus.CONVERGED: 1>], dtype=object))
优化后的目标函数值: tensor([0.])

TheseusLayer 支持反向传播,其语义上类似于 PyTorch 神经网络中的一层。
通过 TheseusLayer 进行反向传播,可以学习问题中的各种必要量,例如代价权重、优化变量的初始值,以及优化的其他参数。
接下来的教程将展示使用 TheseusLayer 进行学习的多个应用示例。

为了区分 Theseus 优化器内部进行的优化Theseus 优化器外部进行的优化(例如 PyTorch 的 autograd 在学习过程中执行的优化),我们分别称之为 内循环优化(inner loop optimization)外循环优化(outer loop optimization)
需要注意的是,内循环优化仅优化 优化变量,而外循环优化可以优化与 PyTorch autograd 优化器关联的指定变量的张量。
调用 TheseusLayer.forward() 只执行内循环优化;通常,PyTorch 的 autograd 学习步骤会执行外循环优化。接下来的教程中会有具体示例。

在外循环过程中,我们通常希望在执行内循环优化之前更新 Theseus 变量;例如,为优化变量设置初始值,或使用外循环学习得到的张量更新辅助变量。
我们建议通过 TheseusLayer.forward() 来完成这些更新。虽然变量和目标函数可以独立更新,而不通过 TheseusLayer.forward(),但遵循此规范可以明确最新输入给 TheseusLayer 的内容,有助于避免隐藏错误和不期望的行为。
因此,我们推荐在学习过程中对变量的任何更新都通过 TheseusLayer 来完成。