前回の続きで、計量学習の問題を実際にソルバーで解いてみる。
これは数理最適化 Advent Calendar 2023の4日目の記事です。
PICOS
ここではPythonでPICOSというライブラリを使って解いてみる。
PICOSは最適化問題をいい感じに表現できるライブラリで、ソルバーとしてCVXOPTがデフォルトでついてくるので、SDPも解けたりする。
データ
ここでは次のようなトイデータを用意してみた:
import numpy as np np.random.seed(0) C = np.random.multivariate_normal( mean=(0, 0), cov=((0.2, 0.1), (0.1, 0.3)), size=20, ) is_normal = (-0.7 <= C[:, 0]) & (C[:, 0] <= 0.7) & (-1 <= C[:, 1]) & (C[:, 1] <= 1) C1 = C[is_normal, :] C2 = C[~is_normal, :]
グラフにするとこんな感じ:
import matplotlib.pyplot as plt %matplotlib inline fig, ax = plt.subplots() ax.scatter(C1[:, 0], C1[:, 1], label="C1") ax.scatter(C2[:, 0], C2[:, 1], label="C2") ax.set_aspect("equal") ax.grid(True) ax.legend(loc="upper left", bbox_to_anchor=(1, 1));
定式化
まずは半正定値行列から距離の2乗の関数を得られるように、関数を定義しておく:
def get_dist_func(A): # Aは半正定値行列 def dist_func(x, y): d = x - y return d.T * A * d return dist_func
PICOSでは行列の積は*
で計算できることに注意(numpyだと@
)。
そして、これで得られる関数を使って、同じグループ内のデータとの距離の2乗の合計を求める関数を定義:
def calc_total_dist_in_same(dist_func, x, same): return sum(dist_func(x, y) for y in same)
また、違反度合いの下限を計算する関数を定義:
def calc_dist_violation_lower(dist_func, x, y_in_same, y_in_other): same_dist = dist_func(x, y_in_same) other_dist = dist_func(x, y_in_other) # other_dist >= same_dist + 1 であるべき(1はマージン) # 違反量は same_dist + 1 - other_dist (ただし0以下は0とする)となる return same_dist + 1 - other_dist
これらを使って問題を解く関数を定義:
import picos def solve_metric_problem(C1, C2): # C1とC2はそれぞれ(L, n), (M, n)のndarrayとする n_samples_C1, n_features_C1 = C1.shape n_samples_C2, n_features_C2 = C2.shape assert n_features_C1 == n_features_C2 n_features = n_features_C1 problem = picos.Problem("metric") A = picos.SymmetricVariable("A", (n_features, n_features)) problem.add_constraint(A >> 0) dist_func = get_dist_func(A) objective = 0 for i in range(n_samples_C1): total_dist_in_same = calc_total_dist_in_same(dist_func, C1[i], C1) objective += total_dist_in_same for j in range(n_samples_C1): for k in range(n_samples_C2): dist_violation_lower = calc_dist_violation_lower(dist_func, C1[i], C1[j], C2[k]) dist_violation = picos.RealVariable(f"dv_C1_{i}_{j}_{k}", lower=0) problem.add_constraint(dist_violation >= dist_violation_lower) objective += dist_violation for i in range(n_samples_C2): total_dist_in_same = calc_total_dist_in_same(dist_func, C2[i], C2) objective += total_dist_in_same for j in range(n_samples_C2): for k in range(n_samples_C1): dist_violation_lower = calc_dist_violation_lower(dist_func, C2[i], C2[j], C1[k]) dist_violation = picos.RealVariable(f"dv_C2_{i}_{j}_{k}", lower=0) problem.add_constraint(dist_violation >= dist_violation_lower) objective += dist_violation problem.set_objective("min", objective) solution = problem.solve() return np.array(A)
まずproblem = picos.Problem("metric")
として最適化問題のインスタンスを作っている。
変数は実対称行列なのでA = picos.SymmetricVariable("A", (n_features, n_features))
と用意。
これが半正定値であるという制約があるので、problem.add_constraint(A >> 0)
と問題に制約を追加している(>>
がを意味する関係演算子となっている)。
で、あとは目的関数の各項をループ回しながら作って足し込んでいる。
ここでちょっと注意が必要なのが、違反度合いは0以上ということ。 そこで、
dist_violation_lower = calc_dist_violation_lower(dist_func, C1[i], C1[j], C2[k]) dist_violation = picos.RealVariable(f"dv_C1_{i}_{j}_{k}", lower=0) problem.add_constraint(dist_violation >= dist_violation_lower) objective += dist_violation
のように、0以上となる補助変数dist_violation
を用意して、目的関数には補助変数を足し込み、dist_violation >= dist_violation_lower
という制約を問題に追加している。
このあたりは線形計画でよく使うテクニック。
あとはproblem.set_objective("min", objective)
で目的関数を最小化として問題にセットし、solution = problem.solve()
で解を得ている。
この解には主問題だけでなく双対問題の解とかも含まれてたりするんだけど、ここでは得られたA
の値を単にnumpyのデータにして返している。
解の分析
実際に解いてみるとこんな感じ:
A = solve_metric_problem(C1, C2) print(A) # => # [[ 0.90778979 -0.34794114] # [-0.34794114 0.4259602 ]]
ちなみにこれくらいの小さい問題でも手元のマシンで30秒弱かかってたりする。 なのでちょっと実用性はなさそう。。。
あとはこうして得られた半正定値行列の意味合いなんだけど、半正定値行列はのように分解できることが知られている。 なので、となることから、これは元の空間を行列で写像した空間で距離を求めていることになる。
そこで、行列を得ることを考えてみる。 これにはコレスキー分解を使えばいい:
def decomp_matrix(A): try: return np.linalg.cholesky(A).T except Exception as e: print(e) values, vectors = np.linalg.eig(A) values = np.clip(values, 0, None) if (values == 0).all(): return np.identity(len(A)) else: return np.diag(np.sqrt(values)) @ vectors.T
ただ、は半正定値行列なので固有値がほぼ0のことがあり、そうすると数値誤差で固有値がわずかに負になっていることもあった。 その場合は例外が発生する。 その対処をするために、上記の関数では例外が発生したときには固有値を0以上に無理やり切り上げ、それでを得るようにしている。
この関数を使ってを求めてみると、次のようになっていた:
P = decomp_matrix(A) print(P) # => # [[ 0.95278003 -0.36518517] # [ 0. 0.54092513]]
このでデータを写像してみてグラフにすると、次のようになる:
fig, ax = plt.subplots() ax.scatter((C1 @ P.T)[:, 0], (C1 @ P.T)[:, 1], label="C1") ax.scatter((C2 @ P.T)[:, 0], (C2 @ P.T)[:, 1], label="C2") ax.set_aspect("equal") ax.grid(True) ax.legend(loc="upper left", bbox_to_anchor=(1, 1));
潰れていた分布がちょうどいい感じに伸ばされているのが分かると思う。 これでデータ間の距離がいい感じに計算できるようになる。
まぁ、実務的には上記の通り計算が重すぎるので、実際にはSVMとか使う方がよさそうではあるけど(^^;
今日はここまで!