From 4d8cdec0dd67fc390815b1dbc5f71a216f7bc42c Mon Sep 17 00:00:00 2001 From: Zhenghao Zhang Date: Mon, 20 Jan 2025 21:04:11 +0800 Subject: [PATCH] Rename user neighbors to user-to-user --- master/rest.go | 6 +++--- master/rest_test.go | 2 +- master/tasks.go | 26 +++++++++++++------------- master/tasks_test.go | 18 +++++++++--------- server/rest.go | 15 +++++++++++++-- server/rest_test.go | 2 +- storage/cache/database.go | 14 +++----------- worker/worker.go | 4 ++-- worker/worker_test.go | 2 +- 9 files changed, 46 insertions(+), 43 deletions(-) diff --git a/master/rest.go b/master/rest.go index d2fde9719..3ad9fcdc7 100644 --- a/master/rest.go +++ b/master/rest.go @@ -180,7 +180,7 @@ func (m *Master) CreateWebService() { Param(ws.QueryParameter("offset", "offset of the list").DataType("int")). Returns(http.StatusOK, "OK", []ScoredItem{}). Writes([]ScoredItem{})) - ws.Route(ws.GET("/dashboard/user/{user-id}/neighbors").To(m.getUserNeighbors). + ws.Route(ws.GET("/dashboard/user-to-user/neighbors/{user-id}").To(m.getUserToUser). Doc("get neighbors of a user"). Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}). Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")). @@ -853,9 +853,9 @@ func (m *Master) getItemToItem(request *restful.Request, response *restful.Respo m.SearchDocuments(cache.ItemToItem, cache.Key(cache.Neighbors, itemId), nil, m.GetItem, request, response) } -func (m *Master) getUserNeighbors(request *restful.Request, response *restful.Response) { +func (m *Master) getUserToUser(request *restful.Request, response *restful.Response) { userId := request.PathParameter("user-id") - m.SearchDocuments(cache.UserNeighbors, userId, []string{""}, m.GetUser, request, response) + m.SearchDocuments(cache.UserToUser, cache.Key(cache.Neighbors, userId), nil, m.GetUser, request, response) } func (m *Master) importExportUsers(response http.ResponseWriter, request *http.Request) { diff --git a/master/rest_test.go b/master/rest_test.go index e4affc4a0..83121f990 100644 --- a/master/rest_test.go +++ b/master/rest_test.go @@ -569,7 +569,7 @@ func TestServer_SearchDocumentsOfUsers(t *testing.T) { } ctx := context.Background() operators := []ListOperator{ - {cache.UserNeighbors, "0", "/api/dashboard/user/0/neighbors"}, + {cache.UserToUser, cache.Key(cache.Neighbors, "0"), "/api/dashboard/user-to-user/neighbors/0/"}, } for _, operator := range operators { t.Logf("test RESTful API: %v", operator.Get) diff --git a/master/tasks.go b/master/tasks.go index 53d540392..680da86c5 100644 --- a/master/tasks.go +++ b/master/tasks.go @@ -544,7 +544,7 @@ func (t *FindUserNeighborsTask) run(ctx context.Context, j *task.JobsAllocator) progress.Fail(newCtx, err) FindUserNeighborsTotalSeconds.Set(0) } else { - if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdateUserNeighborsTime), time.Now())); err != nil { + if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.UserToUserUpdateTime, cache.Neighbors), time.Now())); err != nil { log.Logger().Error("failed to set neighbors of users update time", zap.Error(err)) } log.Logger().Info("complete searching neighbors of users", @@ -613,19 +613,19 @@ func (m *Master) findUserNeighborsBruteForce(ctx context.Context, dataset *ranki } aggregator := cache.NewDocumentAggregator(startSearchTime) aggregator.Add("", recommends, scores) - if err := m.CacheClient.AddScores(ctx, cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil { + if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), aggregator.ToSlice()); err != nil { return errors.Trace(err) } - if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserNeighbors}, cache.ScoreCondition{ - Subset: proto.String(userId), + if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserToUser}, cache.ScoreCondition{ + Subset: proto.String(cache.Key(cache.Neighbors, userId)), Before: &aggregator.Timestamp, }); err != nil { return errors.Trace(err) } if err := m.CacheClient.Set( ctx, - cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, userId), time.Now()), - cache.String(cache.Key(cache.UserNeighborsDigest, userId), m.Config.UserNeighborDigest())); err != nil { + cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId)), time.Now()), + cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId)), m.Config.UserNeighborDigest())); err != nil { return errors.Trace(err) } findNeighborSeconds.Add(time.Since(startTime).Seconds()) @@ -678,14 +678,14 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool { ) ctx := context.Background() // check cache - if items, err := m.CacheClient.SearchScores(ctx, cache.UserNeighbors, userId, []string{""}, 0, -1); err != nil { + if items, err := m.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, -1); err != nil { log.Logger().Error("failed to load user neighbors", zap.String("user_id", userId), zap.Error(err)) return true } else if len(items) == 0 { return true } // read digest - cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserNeighborsDigest, userId)).String() + cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId))).String() if err != nil { if !errors.Is(err, errors.NotFound) { log.Logger().Error("failed to read user neighbors digest", zap.Error(err)) @@ -704,7 +704,7 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool { return true } // read update time - updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserNeighborsTime, userId)).Time() + updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId))).Time() if err != nil { if !errors.Is(err, errors.NotFound) { log.Logger().Error("failed to read last update user neighbors time", zap.Error(err)) @@ -1141,9 +1141,9 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca } scanCount++ switch splits[0] { - case cache.UserNeighbors, cache.UserNeighborsDigest, + case cache.UserToUser, cache.UserToUserDigest, cache.OfflineRecommend, cache.OfflineRecommendDigest, cache.CollaborativeRecommend, - cache.LastModifyUserTime, cache.LastUpdateUserNeighborsTime, cache.LastUpdateUserRecommendTime: + cache.LastModifyUserTime, cache.UserToUserUpdateTime, cache.LastUpdateUserRecommendTime: userId := splits[1] // check user in dataset if t.rankingTrainSet != nil && t.rankingTrainSet.UserIndex.ToNumber(userId) != base.NotId { @@ -1159,8 +1159,8 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca } // delete user cache switch splits[0] { - case cache.UserNeighborsDigest, cache.OfflineRecommendDigest, - cache.LastModifyUserTime, cache.LastUpdateUserNeighborsTime, cache.LastUpdateUserRecommendTime: + case cache.UserToUserDigest, cache.OfflineRecommendDigest, + cache.LastModifyUserTime, cache.UserToUserUpdateTime, cache.LastUpdateUserRecommendTime: err = t.CacheClient.Delete(ctx, s) } if err != nil { diff --git a/master/tasks_test.go b/master/tasks_test.go index eac3a7ef9..1ea3c49e6 100644 --- a/master/tasks_test.go +++ b/master/tasks_test.go @@ -326,7 +326,7 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated neighborTask := NewFindUserNeighborsTask(&s.Master) s.NoError(neighborTask.run(context.Background(), nil)) - similar, err := s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100) + similar, err := s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) s.NoError(err) s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) @@ -336,7 +336,7 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar neighborTask = NewFindUserNeighborsTask(&s.Master) s.NoError(neighborTask.run(context.Background(), nil)) - similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100) + similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) s.NoError(err) s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) @@ -348,10 +348,10 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() { s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto neighborTask = NewFindUserNeighborsTask(&s.Master) s.NoError(neighborTask.run(context.Background(), nil)) - similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100) + similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100) s.NoError(err) s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar)) - similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100) + similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100) s.NoError(err) s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar)) } @@ -742,7 +742,7 @@ func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() { // empty cache s.True(s.checkUserNeighborCacheTimeout("1")) - err := s.CacheClient.AddScores(ctx, cache.UserNeighbors, "1", []cache.Score{ + err := s.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "1"), []cache.Score{ {Id: "1", Score: 1, Categories: []string{""}}, {Id: "2", Score: 2, Categories: []string{""}}, {Id: "3", Score: 3, Categories: []string{""}}, @@ -750,23 +750,23 @@ func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() { s.NoError(err) // digest mismatch - err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), "digest")) + err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), "digest")) s.NoError(err) s.True(s.checkUserNeighborCacheTimeout("1")) // staled cache - err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), s.Config.UserNeighborDigest())) + err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), s.Config.UserNeighborDigest())) s.NoError(err) s.True(s.checkUserNeighborCacheTimeout("1")) err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "1"), time.Now().Add(-time.Minute))) s.NoError(err) s.True(s.checkUserNeighborCacheTimeout("1")) - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now().Add(-time.Hour))) + err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now().Add(-time.Hour))) s.NoError(err) s.True(s.checkUserNeighborCacheTimeout("1")) // not staled cache - err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now())) + err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now())) s.NoError(err) s.False(s.checkUserNeighborCacheTimeout("1")) } diff --git a/server/rest.go b/server/rest.go index d593fe326..f262f5693 100644 --- a/server/rest.go +++ b/server/rest.go @@ -495,6 +495,17 @@ func (s *RestServer) CreateWebService() { Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")). Returns(http.StatusOK, "OK", []cache.Score{}). Writes([]cache.Score{})) + // Get user-to-user recommendation + ws.Route(ws.GET("/user-to-user/neighbors/{user-id}").To(s.getUserNeighbors). + Doc("Get user-to-user recommendation."). + Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}). + Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")). + Param(ws.PathParameter("name", "Name of the user-to-user recommendation").DataType("string")). + Param(ws.PathParameter("user-id", "ID of the user to get neighbors").DataType("string")). + Param(ws.QueryParameter("n", "Number of returned users").DataType("integer")). + Param(ws.QueryParameter("offset", "Offset of returned users").DataType("integer")). + Returns(http.StatusOK, "OK", []cache.Score{}). + Writes([]cache.Score{})) // Get neighbors ws.Route(ws.GET("/item/{item-id}/neighbors/").To(s.getItemNeighbors). Doc("Get neighbors of a item"). @@ -744,7 +755,7 @@ func (s *RestServer) getItemNeighbors(request *restful.Request, response *restfu func (s *RestServer) getUserNeighbors(request *restful.Request, response *restful.Response) { // Get item id userId := request.PathParameter("user-id") - s.SearchDocuments(cache.UserNeighbors, userId, []string{""}, nil, request, response) + s.SearchDocuments(cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, nil, request, response) } // getCollaborative gets cached recommended items from database. @@ -894,7 +905,7 @@ func (s *RestServer) RecommendUserBased(ctx *recommendContext) error { start := time.Now() candidates := make(map[string]float64) // load similar users - similarUsers, err := s.CacheClient.SearchScores(ctx.context, cache.UserNeighbors, ctx.userId, []string{""}, 0, s.Config.Recommend.CacheSize) + similarUsers, err := s.CacheClient.SearchScores(ctx.context, cache.UserToUser, cache.Key(cache.Neighbors, ctx.userId), []string{""}, 0, s.Config.Recommend.CacheSize) if err != nil { return errors.Trace(err) } diff --git a/server/rest_test.go b/server/rest_test.go index 92b45fe36..68df3eb3e 100644 --- a/server/rest_test.go +++ b/server/rest_test.go @@ -1349,7 +1349,7 @@ func (suite *ServerTestSuite) TestGetRecommendsFallbackUserBasedSimilar() { Body(`{"RowAffected": 4}`). End() // insert similar users - err = suite.CacheClient.AddScores(ctx, cache.UserNeighbors, "0", []cache.Score{ + err = suite.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "0"), []cache.Score{ {Id: "1", Score: 2, Categories: []string{""}}, {Id: "2", Score: 1.5, Categories: []string{""}}, {Id: "3", Score: 1, Categories: []string{""}}, diff --git a/storage/cache/database.go b/storage/cache/database.go index c615ad25d..00dbece81 100644 --- a/storage/cache/database.go +++ b/storage/cache/database.go @@ -46,14 +46,6 @@ import ( ) const ( - // UserNeighbors is sorted set of neighbors for each user. - // User neighbors - user_neighbors/{user_id} - UserNeighbors = "user_neighbors" - - // UserNeighborsDigest is digest of user neighbors configuration - // User neighbors digest - user_neighbors_digest/{user_id} - UserNeighborsDigest = "user_neighbors_digest" - // CollaborativeRecommend is sorted set of collaborative filtering recommendations for each user. // Global recommendation - collaborative_recommend/{user_id} // Categorized recommendation - collaborative_recommend/{user_id}/{category} @@ -75,6 +67,9 @@ const ( ItemToItem = "item-to-item" ItemToItemDigest = "item-to-item_digest" ItemToItemUpdateTime = "item-to-time_update_time" + UserToUser = "user-to-user" + UserToUserDigest = "user-to-user_digest" + UserToUserUpdateTime = "user-to-user_update_time" Neighbors = "neighbors" // ItemCategories is the set of item categories. The format of key: @@ -84,7 +79,6 @@ const ( LastModifyItemTime = "last_modify_item_time" // the latest timestamp that a user related data was modified LastModifyUserTime = "last_modify_user_time" // the latest timestamp that an item related data was modified LastUpdateUserRecommendTime = "last_update_user_recommend_time" // the latest timestamp that a user's recommendation was updated - LastUpdateUserNeighborsTime = "last_update_user_neighbors_time" // the latest timestamp that a user's neighbors item was updated // GlobalMeta is global meta information GlobalMeta = "global_meta" @@ -100,8 +94,6 @@ const ( LastFitRankingModelTime = "last_fit_ranking_model_time" LastUpdateLatestItemsTime = "last_update_latest_items_time" // the latest timestamp that latest items were updated LastUpdatePopularItemsTime = "last_update_popular_items_time" // the latest timestamp that popular items were updated - UserNeighborIndexRecall = "user_neighbor_index_recall" - ItemNeighborIndexRecall = "item_neighbor_index_recall" MatchingIndexRecall = "matching_index_recall" ) diff --git a/worker/worker.go b/worker/worker.go index fd3609a67..36156b73c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -700,7 +700,7 @@ func (w *Worker) Recommend(users []data.User) { localStartTime := time.Now() scores := make(map[string]float64) // load similar users - similarUsers, err := w.CacheClient.SearchScores(ctx, cache.UserNeighbors, userId, []string{""}, 0, w.Config.Recommend.CacheSize) + similarUsers, err := w.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, w.Config.Recommend.CacheSize) if err != nil { log.Logger().Error("failed to load similar users", zap.Error(err)) return errors.Trace(err) @@ -721,7 +721,7 @@ func (w *Worker) Recommend(users []data.User) { } } // load user neighbors digest - digest, err := w.CacheClient.Get(ctx, cache.Key(cache.UserNeighborsDigest, user.Id)).String() + digest, err := w.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, user.Id))).String() if err != nil { if !errors.Is(err, errors.NotFound) { log.Logger().Error("failed to load user neighbors digest", zap.Error(err)) diff --git a/worker/worker_test.go b/worker/worker_test.go index 34d38a616..d44c302d7 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -383,7 +383,7 @@ func (suite *WorkerTestSuite) TestRecommendUserBased() { suite.Config.Recommend.Offline.EnableColRecommend = false suite.Config.Recommend.Offline.EnableUserBasedRecommend = true // insert similar users - err := suite.CacheClient.AddScores(ctx, cache.UserNeighbors, "0", []cache.Score{ + err := suite.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "0"), []cache.Score{ {Id: "1", Score: 2, Categories: []string{""}}, {Id: "2", Score: 1.5, Categories: []string{""}}, {Id: "3", Score: 1, Categories: []string{""}},