-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathtest_sparse_PyTorch_CP-APR.py
173 lines (124 loc) · 7.14 KB
/
test_sparse_PyTorch_CP-APR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
Tests CP-APR PyTorch implementation using Sparse Tensor
Run with: python -m unittest test_sparse_PyTorch_CP-APR.py
"""
from pyCP_APR.pyCP_APR import CP_APR
import unittest
import scipy.io as spio
import numpy as np
import torch
class TestPyTorchCP_APR(unittest.TestCase):
def setUp(self):
"""Setup the test."""
# Sparse tensor coordinates and non-zero values
coords = spio.loadmat('../data/test_data/subs.mat', squeeze_me=True)['values'] - 1
nnz_binary = spio.loadmat('../data/test_data/vals_binary.mat', squeeze_me=True)['values']
nnz_count = spio.loadmat('../data/test_data/vals_count.mat', squeeze_me=True)['values']
# Initial factor values
M_init = {"Factors":{}}
dim = 0
for key, values in spio.loadmat('../data/test_data/minit.mat', squeeze_me=True).items():
if 'init_f' in key:
M_init["Factors"][str(dim)] = values
dim += 1
# Expected latent factors and lambda values
M_expected_binary = dict()
dimension = 0
for key, values in spio.loadmat('../data/test_data/m_expected_binary.mat', squeeze_me=True).items():
if 'm_' in key:
M_expected_binary[str(dimension)] = values
dimension += 1
if 'lambd' in key:
M_expected_binary['lambda'] = values
M_expected_count = dict()
dimension = 0
for key, values in spio.loadmat('../data/test_data/m_expected_count.mat', squeeze_me=True).items():
if 'm_' in key:
M_expected_count[str(dimension)] = values
dimension += 1
if 'lambd' in key:
M_expected_count['lambda'] = values
self.sparse = dict()
self.sparse['coords'] = coords
self.sparse['nnz_binary'] = nnz_binary
self.sparse['nnz_count'] = nnz_count
self.sparse['M_init'] = M_init
self.sparse['M_expected_binary'] = M_expected_binary
self.sparse['M_expected_count'] = M_expected_count
# Initilize CP-APR
self.cp_apr_cpu = CP_APR(n_iters=1000, verbose=0, method='torch', device='cpu')
# Make sure CUDA device is available
if torch.cuda.is_available():
self.cp_apr_gpu = CP_APR(n_iters=1000, verbose=0, device='gpu', method='torch')
def take_norm_diff_factor(self, decomposition, d, M_type):
"""Helper function to take norm difference between two factors."""
pred_di = decomposition['Factors'][str(d)]
expected_di = self.sparse[M_type][str(d)]
norm_diff_di = np.linalg.norm(pred_di - expected_di)
return norm_diff_di
def take_norm_diff_weights(self, decomposition, M_type):
"""Helper function to take norm difference between two weight of factors."""
pred_lambd = decomposition['Weights']
expected_lambd = self.sparse[M_type]['lambda']
norm_diff_lambd = np.linalg.norm(np.array(pred_lambd) - np.array(expected_lambd))
return norm_diff_lambd
def test_latent_factors_binary(self):
"""Make sure the resulting latent factors are as expected for binary tensor."""
decomposition = self.cp_apr_cpu.fit(coords=self.sparse['coords'],
values=self.sparse['nnz_binary'],
rank=2, Minit=self.sparse['M_init'])
# Check each latent factor
for d in range(len(self.sparse['coords'][0])):
norm_diff_di = self.take_norm_diff_factor(decomposition, d, 'M_expected_binary')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_di) < 0.001))
# Compare the weights
norm_diff_lambd = self.take_norm_diff_weights(decomposition, 'M_expected_binary')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_lambd) < 0.001))
def test_latent_factors_count(self):
"""Make sure the resulting latent factors are as expected for count tensor."""
decomposition = self.cp_apr_cpu.fit(coords=self.sparse['coords'],
values=self.sparse['nnz_count'],
rank=2, Minit=self.sparse['M_init'])
# Check each latent factor
for d in range(len(self.sparse['coords'][0])):
norm_diff_di = self.take_norm_diff_factor(decomposition, d, 'M_expected_count')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_di) < 0.001))
# Compare the weights
norm_diff_lambd = self.take_norm_diff_weights(decomposition, 'M_expected_count')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_lambd) < 0.001))
def test_latent_factors_binary_GPU(self):
"""Make sure the resulting latent factors are as expected for binary tensor. Tests using GPU."""
# Runs if CUDA available.
if torch.cuda.is_available():
decomposition = self.cp_apr_gpu.fit(coords=self.sparse['coords'],
values=self.sparse['nnz_binary'],
rank=2, Minit=self.sparse['M_init'])
# Check each latent factor
for d in range(len(self.sparse['coords'][0])):
norm_diff_di = self.take_norm_diff_factor(decomposition, d, 'M_expected_binary')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_di) < 0.001))
# Compare the weights
norm_diff_lambd = self.take_norm_diff_weights(decomposition, 'M_expected_binary')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_lambd) < 0.001))
def test_latent_factors_count_GPU(self):
"""Make sure the resulting latent factors are as expected for count tensor. Tests using GPU."""
# Runs if CUDA available.
if torch.cuda.is_available():
decomposition = self.cp_apr_gpu.fit(coords=self.sparse['coords'],
values=self.sparse['nnz_count'],
rank=2, Minit=self.sparse['M_init'])
# Check each latent factor
for d in range(len(self.sparse['coords'][0])):
norm_diff_di = self.take_norm_diff_factor(decomposition, d, 'M_expected_count')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_di) < 0.001))
# Compare the weights
norm_diff_lambd = self.take_norm_diff_weights(decomposition, 'M_expected_count')
# check if norm of difference is very small
self.assertEqual(True ,(np.abs(norm_diff_lambd) < 0.001))