Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename user neighbors to user-to-user #930

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (m *Master) CreateWebService() {
Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
Returns(http.StatusOK, "OK", []ScoredItem{}).
Writes([]ScoredItem{}))
ws.Route(ws.GET("/dashboard/user/{user-id}/neighbors").To(m.getUserNeighbors).
ws.Route(ws.GET("/dashboard/user-to-user/neighbors/{user-id}").To(m.getUserToUser).
Doc("get neighbors of a user").
Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
Expand Down Expand Up @@ -853,9 +853,9 @@ func (m *Master) getItemToItem(request *restful.Request, response *restful.Respo
m.SearchDocuments(cache.ItemToItem, cache.Key(cache.Neighbors, itemId), nil, m.GetItem, request, response)
}

func (m *Master) getUserNeighbors(request *restful.Request, response *restful.Response) {
func (m *Master) getUserToUser(request *restful.Request, response *restful.Response) {
userId := request.PathParameter("user-id")
m.SearchDocuments(cache.UserNeighbors, userId, []string{""}, m.GetUser, request, response)
m.SearchDocuments(cache.UserToUser, cache.Key(cache.Neighbors, userId), nil, m.GetUser, request, response)
}

func (m *Master) importExportUsers(response http.ResponseWriter, request *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion master/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestServer_SearchDocumentsOfUsers(t *testing.T) {
}
ctx := context.Background()
operators := []ListOperator{
{cache.UserNeighbors, "0", "/api/dashboard/user/0/neighbors"},
{cache.UserToUser, cache.Key(cache.Neighbors, "0"), "/api/dashboard/user-to-user/neighbors/0/"},
}
for _, operator := range operators {
t.Logf("test RESTful API: %v", operator.Get)
Expand Down
26 changes: 13 additions & 13 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (t *FindUserNeighborsTask) run(ctx context.Context, j *task.JobsAllocator)
progress.Fail(newCtx, err)
FindUserNeighborsTotalSeconds.Set(0)
} else {
if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdateUserNeighborsTime), time.Now())); err != nil {
if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.UserToUserUpdateTime, cache.Neighbors), time.Now())); err != nil {
log.Logger().Error("failed to set neighbors of users update time", zap.Error(err))
}
log.Logger().Info("complete searching neighbors of users",
Expand Down Expand Up @@ -613,19 +613,19 @@ func (m *Master) findUserNeighborsBruteForce(ctx context.Context, dataset *ranki
}
aggregator := cache.NewDocumentAggregator(startSearchTime)
aggregator.Add("", recommends, scores)
if err := m.CacheClient.AddScores(ctx, cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil {
if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), aggregator.ToSlice()); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserNeighbors}, cache.ScoreCondition{
Subset: proto.String(userId),
if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserToUser}, cache.ScoreCondition{
Subset: proto.String(cache.Key(cache.Neighbors, userId)),
Before: &aggregator.Timestamp,
}); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.Set(
ctx,
cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, userId), time.Now()),
cache.String(cache.Key(cache.UserNeighborsDigest, userId), m.Config.UserNeighborDigest())); err != nil {
cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId)), time.Now()),
cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId)), m.Config.UserNeighborDigest())); err != nil {
return errors.Trace(err)
}
findNeighborSeconds.Add(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -678,14 +678,14 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool {
)
ctx := context.Background()
// check cache
if items, err := m.CacheClient.SearchScores(ctx, cache.UserNeighbors, userId, []string{""}, 0, -1); err != nil {
if items, err := m.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, -1); err != nil {
log.Logger().Error("failed to load user neighbors", zap.String("user_id", userId), zap.Error(err))
return true
} else if len(items) == 0 {
return true
}
// read digest
cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserNeighborsDigest, userId)).String()
cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId))).String()
if err != nil {
if !errors.Is(err, errors.NotFound) {
log.Logger().Error("failed to read user neighbors digest", zap.Error(err))
Expand All @@ -704,7 +704,7 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool {
return true
}
// read update time
updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserNeighborsTime, userId)).Time()
updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId))).Time()
if err != nil {
if !errors.Is(err, errors.NotFound) {
log.Logger().Error("failed to read last update user neighbors time", zap.Error(err))
Expand Down Expand Up @@ -1141,9 +1141,9 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca
}
scanCount++
switch splits[0] {
case cache.UserNeighbors, cache.UserNeighborsDigest,
case cache.UserToUser, cache.UserToUserDigest,
cache.OfflineRecommend, cache.OfflineRecommendDigest, cache.CollaborativeRecommend,
cache.LastModifyUserTime, cache.LastUpdateUserNeighborsTime, cache.LastUpdateUserRecommendTime:
cache.LastModifyUserTime, cache.UserToUserUpdateTime, cache.LastUpdateUserRecommendTime:
userId := splits[1]
// check user in dataset
if t.rankingTrainSet != nil && t.rankingTrainSet.UserIndex.ToNumber(userId) != base.NotId {
Expand All @@ -1159,8 +1159,8 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca
}
// delete user cache
switch splits[0] {
case cache.UserNeighborsDigest, cache.OfflineRecommendDigest,
cache.LastModifyUserTime, cache.LastUpdateUserNeighborsTime, cache.LastUpdateUserRecommendTime:
case cache.UserToUserDigest, cache.OfflineRecommendDigest,
cache.LastModifyUserTime, cache.UserToUserUpdateTime, cache.LastUpdateUserRecommendTime:
err = t.CacheClient.Delete(ctx, s)
}
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions master/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated
neighborTask := NewFindUserNeighborsTask(&s.Master)
s.NoError(neighborTask.run(context.Background(), nil))
similar, err := s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
similar, err := s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))

Expand All @@ -336,7 +336,7 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar
neighborTask = NewFindUserNeighborsTask(&s.Master)
s.NoError(neighborTask.run(context.Background(), nil))
similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))

Expand All @@ -348,10 +348,10 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto
neighborTask = NewFindUserNeighborsTask(&s.Master)
s.NoError(neighborTask.run(context.Background(), nil))
similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
}
Expand Down Expand Up @@ -742,31 +742,31 @@ func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() {

// empty cache
s.True(s.checkUserNeighborCacheTimeout("1"))
err := s.CacheClient.AddScores(ctx, cache.UserNeighbors, "1", []cache.Score{
err := s.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "1"), []cache.Score{
{Id: "1", Score: 1, Categories: []string{""}},
{Id: "2", Score: 2, Categories: []string{""}},
{Id: "3", Score: 3, Categories: []string{""}},
})
s.NoError(err)

// digest mismatch
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), "digest"))
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), "digest"))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))

// staled cache
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), s.Config.UserNeighborDigest()))
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), s.Config.UserNeighborDigest()))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "1"), time.Now().Add(-time.Minute)))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now().Add(-time.Hour)))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now().Add(-time.Hour)))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))

// not staled cache
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now()))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now()))
s.NoError(err)
s.False(s.checkUserNeighborCacheTimeout("1"))
}
15 changes: 13 additions & 2 deletions server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,17 @@ func (s *RestServer) CreateWebService() {
Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
Returns(http.StatusOK, "OK", []cache.Score{}).
Writes([]cache.Score{}))
// Get user-to-user recommendation
ws.Route(ws.GET("/user-to-user/neighbors/{user-id}").To(s.getUserNeighbors).
Doc("Get user-to-user recommendation.").
Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
Param(ws.PathParameter("name", "Name of the user-to-user recommendation").DataType("string")).
Param(ws.PathParameter("user-id", "ID of the user to get neighbors").DataType("string")).
Param(ws.QueryParameter("n", "Number of returned users").DataType("integer")).
Param(ws.QueryParameter("offset", "Offset of returned users").DataType("integer")).
Returns(http.StatusOK, "OK", []cache.Score{}).
Writes([]cache.Score{}))
// Get neighbors
ws.Route(ws.GET("/item/{item-id}/neighbors/").To(s.getItemNeighbors).
Doc("Get neighbors of a item").
Expand Down Expand Up @@ -744,7 +755,7 @@ func (s *RestServer) getItemNeighbors(request *restful.Request, response *restfu
func (s *RestServer) getUserNeighbors(request *restful.Request, response *restful.Response) {
// Get item id
userId := request.PathParameter("user-id")
s.SearchDocuments(cache.UserNeighbors, userId, []string{""}, nil, request, response)
s.SearchDocuments(cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, nil, request, response)
}

// getCollaborative gets cached recommended items from database.
Expand Down Expand Up @@ -894,7 +905,7 @@ func (s *RestServer) RecommendUserBased(ctx *recommendContext) error {
start := time.Now()
candidates := make(map[string]float64)
// load similar users
similarUsers, err := s.CacheClient.SearchScores(ctx.context, cache.UserNeighbors, ctx.userId, []string{""}, 0, s.Config.Recommend.CacheSize)
similarUsers, err := s.CacheClient.SearchScores(ctx.context, cache.UserToUser, cache.Key(cache.Neighbors, ctx.userId), []string{""}, 0, s.Config.Recommend.CacheSize)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ func (suite *ServerTestSuite) TestGetRecommendsFallbackUserBasedSimilar() {
Body(`{"RowAffected": 4}`).
End()
// insert similar users
err = suite.CacheClient.AddScores(ctx, cache.UserNeighbors, "0", []cache.Score{
err = suite.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "0"), []cache.Score{
{Id: "1", Score: 2, Categories: []string{""}},
{Id: "2", Score: 1.5, Categories: []string{""}},
{Id: "3", Score: 1, Categories: []string{""}},
Expand Down
14 changes: 3 additions & 11 deletions storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ import (
)

const (
// UserNeighbors is sorted set of neighbors for each user.
// User neighbors - user_neighbors/{user_id}
UserNeighbors = "user_neighbors"

// UserNeighborsDigest is digest of user neighbors configuration
// User neighbors digest - user_neighbors_digest/{user_id}
UserNeighborsDigest = "user_neighbors_digest"

// CollaborativeRecommend is sorted set of collaborative filtering recommendations for each user.
// Global recommendation - collaborative_recommend/{user_id}
// Categorized recommendation - collaborative_recommend/{user_id}/{category}
Expand All @@ -75,6 +67,9 @@ const (
ItemToItem = "item-to-item"
ItemToItemDigest = "item-to-item_digest"
ItemToItemUpdateTime = "item-to-time_update_time"
UserToUser = "user-to-user"
UserToUserDigest = "user-to-user_digest"
UserToUserUpdateTime = "user-to-user_update_time"
Neighbors = "neighbors"

// ItemCategories is the set of item categories. The format of key:
Expand All @@ -84,7 +79,6 @@ const (
LastModifyItemTime = "last_modify_item_time" // the latest timestamp that a user related data was modified
LastModifyUserTime = "last_modify_user_time" // the latest timestamp that an item related data was modified
LastUpdateUserRecommendTime = "last_update_user_recommend_time" // the latest timestamp that a user's recommendation was updated
LastUpdateUserNeighborsTime = "last_update_user_neighbors_time" // the latest timestamp that a user's neighbors item was updated

// GlobalMeta is global meta information
GlobalMeta = "global_meta"
Expand All @@ -100,8 +94,6 @@ const (
LastFitRankingModelTime = "last_fit_ranking_model_time"
LastUpdateLatestItemsTime = "last_update_latest_items_time" // the latest timestamp that latest items were updated
LastUpdatePopularItemsTime = "last_update_popular_items_time" // the latest timestamp that popular items were updated
UserNeighborIndexRecall = "user_neighbor_index_recall"
ItemNeighborIndexRecall = "item_neighbor_index_recall"
MatchingIndexRecall = "matching_index_recall"
)

Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (w *Worker) Recommend(users []data.User) {
localStartTime := time.Now()
scores := make(map[string]float64)
// load similar users
similarUsers, err := w.CacheClient.SearchScores(ctx, cache.UserNeighbors, userId, []string{""}, 0, w.Config.Recommend.CacheSize)
similarUsers, err := w.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, w.Config.Recommend.CacheSize)
if err != nil {
log.Logger().Error("failed to load similar users", zap.Error(err))
return errors.Trace(err)
Expand All @@ -721,7 +721,7 @@ func (w *Worker) Recommend(users []data.User) {
}
}
// load user neighbors digest
digest, err := w.CacheClient.Get(ctx, cache.Key(cache.UserNeighborsDigest, user.Id)).String()
digest, err := w.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, user.Id))).String()
if err != nil {
if !errors.Is(err, errors.NotFound) {
log.Logger().Error("failed to load user neighbors digest", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (suite *WorkerTestSuite) TestRecommendUserBased() {
suite.Config.Recommend.Offline.EnableColRecommend = false
suite.Config.Recommend.Offline.EnableUserBasedRecommend = true
// insert similar users
err := suite.CacheClient.AddScores(ctx, cache.UserNeighbors, "0", []cache.Score{
err := suite.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "0"), []cache.Score{
{Id: "1", Score: 2, Categories: []string{""}},
{Id: "2", Score: 1.5, Categories: []string{""}},
{Id: "3", Score: 1, Categories: []string{""}},
Expand Down
Loading