いものやま。

雑多な知識の寄せ集め

半正定値計画問題と計量学習の話。(その2)

前回の続きで、計量学習の問題を実際にソルバーで解いてみる。

これは数理最適化 Advent Calendar 2023の4日目の記事です。

PICOS

ここではPythonでPICOSというライブラリを使って解いてみる。

PICOSは最適化問題をいい感じに表現できるライブラリで、ソルバーとしてCVXOPTがデフォルトでついてくるので、SDPも解けたりする。

データ

ここでは次のようなトイデータを用意してみた:

import numpy as np

np.random.seed(0)
C = np.random.multivariate_normal(
    mean=(0, 0),
    cov=((0.2, 0.1), (0.1, 0.3)),
    size=20,
)
is_normal = (-0.7 <= C[:, 0]) & (C[:, 0] <= 0.7) & (-1 <= C[:, 1]) & (C[:, 1] <= 1)
C1 = C[is_normal, :]
C2 = C[~is_normal, :]

グラフにするとこんな感じ:

import matplotlib.pyplot as plt
%matplotlib inline

fig, ax = plt.subplots()
ax.scatter(C1[:, 0], C1[:, 1], label="C1")
ax.scatter(C2[:, 0], C2[:, 1], label="C2")
ax.set_aspect("equal")
ax.grid(True)
ax.legend(loc="upper left", bbox_to_anchor=(1, 1));

データの分布の様子

定式化

まずは半正定値行列 Aから距離の2乗の関数を得られるように、関数を定義しておく:

def get_dist_func(A):  # Aは半正定値行列
    def dist_func(x, y):
        d = x - y
        return d.T * A * d
    return dist_func

PICOSでは行列の積は*で計算できることに注意(numpyだと@)。

そして、これで得られる関数を使って、同じグループ内のデータとの距離の2乗の合計を求める関数を定義:

def calc_total_dist_in_same(dist_func, x, same):
    return sum(dist_func(x, y) for y in same)

また、違反度合いの下限を計算する関数を定義:

def calc_dist_violation_lower(dist_func, x, y_in_same, y_in_other):
    same_dist = dist_func(x, y_in_same)
    other_dist = dist_func(x, y_in_other)
    # other_dist >= same_dist + 1 であるべき(1はマージン)
    # 違反量は same_dist + 1 - other_dist (ただし0以下は0とする)となる
    return same_dist + 1 - other_dist

これらを使って問題を解く関数を定義:

import picos

def solve_metric_problem(C1, C2):  # C1とC2はそれぞれ(L, n), (M, n)のndarrayとする
    n_samples_C1, n_features_C1 = C1.shape
    n_samples_C2, n_features_C2 = C2.shape
    assert n_features_C1 == n_features_C2
    n_features = n_features_C1

    problem = picos.Problem("metric")
    
    A = picos.SymmetricVariable("A", (n_features, n_features))
    problem.add_constraint(A >> 0)
    dist_func = get_dist_func(A)

    objective = 0

    for i in range(n_samples_C1):
        total_dist_in_same = calc_total_dist_in_same(dist_func, C1[i], C1)
        objective += total_dist_in_same

        for j in range(n_samples_C1):
            for k in range(n_samples_C2):
                dist_violation_lower = calc_dist_violation_lower(dist_func, C1[i], C1[j], C2[k])
                dist_violation = picos.RealVariable(f"dv_C1_{i}_{j}_{k}", lower=0)
                problem.add_constraint(dist_violation >= dist_violation_lower)
                objective += dist_violation

    for i in range(n_samples_C2):
        total_dist_in_same = calc_total_dist_in_same(dist_func, C2[i], C2)
        objective += total_dist_in_same

        for j in range(n_samples_C2):
            for k in range(n_samples_C1):
                dist_violation_lower = calc_dist_violation_lower(dist_func, C2[i], C2[j], C1[k])
                dist_violation = picos.RealVariable(f"dv_C2_{i}_{j}_{k}", lower=0)
                problem.add_constraint(dist_violation >= dist_violation_lower)
                objective += dist_violation

    problem.set_objective("min", objective)
    solution = problem.solve()

    return np.array(A)

まずproblem = picos.Problem("metric")として最適化問題インスタンスを作っている。

変数は実対称行列なのでA = picos.SymmetricVariable("A", (n_features, n_features))と用意。 これが半正定値であるという制約があるので、problem.add_constraint(A >> 0)と問題に制約を追加している(>> \succeqを意味する関係演算子となっている)。

で、あとは目的関数の各項をループ回しながら作って足し込んでいる。

ここでちょっと注意が必要なのが、違反度合いは0以上ということ。 そこで、

dist_violation_lower = calc_dist_violation_lower(dist_func, C1[i], C1[j], C2[k])
dist_violation = picos.RealVariable(f"dv_C1_{i}_{j}_{k}", lower=0)
problem.add_constraint(dist_violation >= dist_violation_lower)
objective += dist_violation

のように、0以上となる補助変数dist_violationを用意して、目的関数には補助変数を足し込み、dist_violation >= dist_violation_lowerという制約を問題に追加している。 このあたりは線形計画でよく使うテクニック。

あとはproblem.set_objective("min", objective)で目的関数を最小化として問題にセットし、solution = problem.solve()で解を得ている。 この解には主問題だけでなく双対問題の解とかも含まれてたりするんだけど、ここでは得られたAの値を単にnumpyのデータにして返している。

解の分析

実際に解いてみるとこんな感じ:

A = solve_metric_problem(C1, C2)
print(A)
# =>
# [[ 0.90778979 -0.34794114]
#  [-0.34794114  0.4259602 ]]

ちなみにこれくらいの小さい問題でも手元のマシンで30秒弱かかってたりする。 なのでちょっと実用性はなさそう。。。

あとはこうして得られた半正定値行列 Aの意味合いなんだけど、半正定値行列は A = P\top Pのように分解できることが知られている。 なので、 \boldsymbol{x}^\top A\boldsymbol{x} = x^\top P^\top P\boldsymbol{x} = (P\boldsymbol{x})^\top (P\boldsymbol{x})となることから、これは元の空間を行列 P写像した空間で距離を求めていることになる。

そこで、行列 Pを得ることを考えてみる。 これにはコレスキー分解を使えばいい:

def decomp_matrix(A):
    try:
        return np.linalg.cholesky(A).T
    except Exception as e:
        print(e)
        values, vectors = np.linalg.eig(A)
        values = np.clip(values, 0, None)
        if (values == 0).all():
            return np.identity(len(A))
        else:
            return np.diag(np.sqrt(values)) @ vectors.T

ただ、 Aは半正定値行列なので固有値がほぼ0のことがあり、そうすると数値誤差で固有値がわずかに負になっていることもあった。 その場合は例外が発生する。 その対処をするために、上記の関数では例外が発生したときには固有値を0以上に無理やり切り上げ、それで Pを得るようにしている。

この関数を使って Pを求めてみると、次のようになっていた:

P = decomp_matrix(A)
print(P)
# =>
# [[ 0.95278003 -0.36518517]
#  [ 0.          0.54092513]]

この Pでデータを写像してみてグラフにすると、次のようになる:

fig, ax = plt.subplots()
ax.scatter((C1 @ P.T)[:, 0], (C1 @ P.T)[:, 1], label="C1")
ax.scatter((C2 @ P.T)[:, 0], (C2 @ P.T)[:, 1], label="C2")
ax.set_aspect("equal")
ax.grid(True)
ax.legend(loc="upper left", bbox_to_anchor=(1, 1));

写像後のデータの分布の様子

潰れていた分布がちょうどいい感じに伸ばされているのが分かると思う。 これでデータ間の距離がいい感じに計算できるようになる。

まぁ、実務的には上記の通り計算が重すぎるので、実際にはSVMとか使う方がよさそうではあるけど(^^;

今日はここまで!