defgenerate_data(num_points=100, a=1, b=0.5, noise_factor=0.01): # 生成数据:从上面定义的二次曲线中采样 num_points 个点 data_x = torch.rand((1, num_points)) # 生成均匀分布的 x 值 noise = torch.randn((1, num_points)) * noise_factor # 生成高斯噪声 data_y = a * data_x.square() + b + noise # 计算 y 值并加入噪声 return data_x, data_y
defgenerate_learning_data(num_points, num_models): a, b = 3, 1 data_batches = [] for i inrange(num_models): b = b + 2# 每个模型增加不同的常数项 b data = generate_data(num_points, a, b) # 生成数据 data_batches.append(data) # 保存数据批次 return data_batches
# Plot the learned functions fig, ax = plt.subplots()
for i inrange(num_models): ax.scatter(data_batches[i][0], data_batches[i][1])
a_ = a.tensor.squeeze().detach() b = epoch_b[i] x = torch.linspace(0., 1., steps=100) y = a_*x*x + b ax.plot(x, y, color='k', lw=4, linestyle='--', label='Learned quadratics'if i == 0elseNone) ax.legend()
if epoch == 0: min_loss = loss if loss <= min_loss: min_loss = loss best_model_a = a.tensor.item() best_model_b = [b.tensor.item() for b in all_b] print(f"Epoch: {epoch} 损失: {loss.item()}") if epoch % 10 == 9: print(f" ---------------- 第 {epoch:02d} 轮解 ---------------- ") print(" a 值:", a.tensor.item()) print(" b 值: ", [b.tensor.item() for b in all_b]) print(f" ----------------------------------------------------- ")
return best_model_a, best_model_b
步骤 3.1:示例版本 A
现在我们展示版本 A。该版本相对于原始代码片段做了如下修改:
创建 10 个 x,y 和 b 变量。
创建 10 个 AutoDiffCostFunction,每个使用相同的 a 但对应的 bi,xi,yi。所有代价函数都被添加到目标函数中。
构建 theseus_inputs 字典,将 bi,xi,yi 映射到正确的数据批次。
注意,a 及其在 PyTorch 中的优化器设置保持不变。一旦优化问题设置完成,我们就调用 optimize_and_learn_models_jointly 子程序来计算 a 和 b 的值。
# replace x and y with 10 different variables x0 ... x9, y0 ... y9 all_x, all_y = [], [] for i inrange(num_models): all_x.append(th.Variable(data_x, name=f"x{i}")) all_y.append(th.Variable(data_y, name=f"y{i}"))
# replace b with 10 different variables b0 ... b9 all_b = [] for i inrange(num_models): all_b.append(th.Vector(1, name=f"b{i}"))
# a remains the same a = th.Vector(1, name="a")
# objective now has 10 different cost functions objective = th.Objective() for i inrange(num_models): # each cost function i uses b_i as optim_var and x_i, y_i and a as aux_var optim_vars = [all_b[i]] aux_vars = a, all_x[i], all_y[i] cost_function = th.AutoDiffCostFunction( optim_vars, quad_error_fn2, 100, aux_vars=aux_vars, name=f"quadratic_cost_fn_{i}" ) objective.add(cost_function)
# optimizer, TheseusLayer and model optimizer remains the same optimizer = th.GaussNewton( objective, max_iterations=50, step_size=0.4, ) theseus_optim = th.TheseusLayer(optimizer) a_tensor = torch.nn.Parameter(torch.rand(1, 1)) model_optimizer = torch.optim.Adam([a_tensor], lr=0.15)
# TheseusLayer dictionary now needs to construct b0 ... b9, x0 ... x9, y0 ... y9 defconstruct_theseus_layer_inputs(): theseus_inputs = {} for i inrange(num_models): data_x, data_y = data_batches[i] theseus_inputs.update({ f"x{i}": data_x, f"y{i}": data_y, f"b{i}": torch.ones((1, 1)), }) theseus_inputs.update({"a": a_tensor}) return theseus_inputs
# Run Theseus optimization and learning best_model = optimize_and_learn_models_jointly(theseus_optim, model_optimizer)
print(f" ---------------- Final Solutions -------------- ") print(" a value:", best_model[0]) print(" b values: ", best_model[1]) print(f" ----------------------------------------------- ")
# Plot the learned functions fig, ax = plt.subplots()
for i inrange(num_models): ax.scatter(data_batches[i][0], data_batches[i][1])
a = best_model[0] b = best_model[1][i] x = torch.linspace(0., 1., steps=100) y = a*x*x + b ax.plot(x, y, color='k', lw=4, linestyle='--', label='Learned quadratics'if i == 0elseNone) ax.legend()
# Plot the learned functions fig, ax = plt.subplots()
for i inrange(num_models): ax.scatter(data_batches[i][0], data_batches[i][1])
a = best_model[0] b = best_model[1][i] x = torch.linspace(0., 1., steps=100) y = a*x*x + b ax.plot(x, y, color='k', lw=4, linestyle='--', label='Learned quadratics'if i == 0elseNone) ax.legend()