Skip to content

Commit

Permalink
Rename user neighbors to user-to-user
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Jan 20, 2025
1 parent 010865b commit 4d8cdec
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 43 deletions.
6 changes: 3 additions & 3 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (m *Master) CreateWebService() {
Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
Returns(http.StatusOK, "OK", []ScoredItem{}).
Writes([]ScoredItem{}))
ws.Route(ws.GET("/dashboard/user/{user-id}/neighbors").To(m.getUserNeighbors).
ws.Route(ws.GET("/dashboard/user-to-user/neighbors/{user-id}").To(m.getUserToUser).
Doc("get neighbors of a user").
Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
Expand Down Expand Up @@ -853,9 +853,9 @@ func (m *Master) getItemToItem(request *restful.Request, response *restful.Respo
m.SearchDocuments(cache.ItemToItem, cache.Key(cache.Neighbors, itemId), nil, m.GetItem, request, response)
}

func (m *Master) getUserNeighbors(request *restful.Request, response *restful.Response) {
func (m *Master) getUserToUser(request *restful.Request, response *restful.Response) {
userId := request.PathParameter("user-id")
m.SearchDocuments(cache.UserNeighbors, userId, []string{""}, m.GetUser, request, response)
m.SearchDocuments(cache.UserToUser, cache.Key(cache.Neighbors, userId), nil, m.GetUser, request, response)
}

func (m *Master) importExportUsers(response http.ResponseWriter, request *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion master/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestServer_SearchDocumentsOfUsers(t *testing.T) {
}
ctx := context.Background()
operators := []ListOperator{
{cache.UserNeighbors, "0", "/api/dashboard/user/0/neighbors"},
{cache.UserToUser, cache.Key(cache.Neighbors, "0"), "/api/dashboard/user-to-user/neighbors/0/"},
}
for _, operator := range operators {
t.Logf("test RESTful API: %v", operator.Get)
Expand Down
26 changes: 13 additions & 13 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (t *FindUserNeighborsTask) run(ctx context.Context, j *task.JobsAllocator)
progress.Fail(newCtx, err)
FindUserNeighborsTotalSeconds.Set(0)
} else {
if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdateUserNeighborsTime), time.Now())); err != nil {
if err := t.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.UserToUserUpdateTime, cache.Neighbors), time.Now())); err != nil {
log.Logger().Error("failed to set neighbors of users update time", zap.Error(err))
}
log.Logger().Info("complete searching neighbors of users",
Expand Down Expand Up @@ -613,19 +613,19 @@ func (m *Master) findUserNeighborsBruteForce(ctx context.Context, dataset *ranki
}
aggregator := cache.NewDocumentAggregator(startSearchTime)
aggregator.Add("", recommends, scores)
if err := m.CacheClient.AddScores(ctx, cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil {
if err := m.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), aggregator.ToSlice()); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserNeighbors}, cache.ScoreCondition{
Subset: proto.String(userId),
if err := m.CacheClient.DeleteScores(ctx, []string{cache.UserToUser}, cache.ScoreCondition{
Subset: proto.String(cache.Key(cache.Neighbors, userId)),
Before: &aggregator.Timestamp,
}); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.Set(
ctx,
cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, userId), time.Now()),
cache.String(cache.Key(cache.UserNeighborsDigest, userId), m.Config.UserNeighborDigest())); err != nil {
cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId)), time.Now()),
cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId)), m.Config.UserNeighborDigest())); err != nil {
return errors.Trace(err)
}
findNeighborSeconds.Add(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -678,14 +678,14 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool {
)
ctx := context.Background()
// check cache
if items, err := m.CacheClient.SearchScores(ctx, cache.UserNeighbors, userId, []string{""}, 0, -1); err != nil {
if items, err := m.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, -1); err != nil {
log.Logger().Error("failed to load user neighbors", zap.String("user_id", userId), zap.Error(err))
return true
} else if len(items) == 0 {
return true
}
// read digest
cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserNeighborsDigest, userId)).String()
cacheDigest, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, userId))).String()
if err != nil {
if !errors.Is(err, errors.NotFound) {
log.Logger().Error("failed to read user neighbors digest", zap.Error(err))
Expand All @@ -704,7 +704,7 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool {
return true
}
// read update time
updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserNeighborsTime, userId)).Time()
updateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, userId))).Time()
if err != nil {
if !errors.Is(err, errors.NotFound) {
log.Logger().Error("failed to read last update user neighbors time", zap.Error(err))
Expand Down Expand Up @@ -1141,9 +1141,9 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca
}
scanCount++
switch splits[0] {
case cache.UserNeighbors, cache.UserNeighborsDigest,
case cache.UserToUser, cache.UserToUserDigest,
cache.OfflineRecommend, cache.OfflineRecommendDigest, cache.CollaborativeRecommend,
cache.LastModifyUserTime, cache.LastUpdateUserNeighborsTime, cache.LastUpdateUserRecommendTime:
cache.LastModifyUserTime, cache.UserToUserUpdateTime, cache.LastUpdateUserRecommendTime:
userId := splits[1]
// check user in dataset
if t.rankingTrainSet != nil && t.rankingTrainSet.UserIndex.ToNumber(userId) != base.NotId {
Expand All @@ -1159,8 +1159,8 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca
}
// delete user cache
switch splits[0] {
case cache.UserNeighborsDigest, cache.OfflineRecommendDigest,
cache.LastModifyUserTime, cache.LastUpdateUserNeighborsTime, cache.LastUpdateUserRecommendTime:
case cache.UserToUserDigest, cache.OfflineRecommendDigest,
cache.LastModifyUserTime, cache.UserToUserUpdateTime, cache.LastUpdateUserRecommendTime:
err = t.CacheClient.Delete(ctx, s)
}
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions master/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated
neighborTask := NewFindUserNeighborsTask(&s.Master)
s.NoError(neighborTask.run(context.Background(), nil))
similar, err := s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
similar, err := s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))

Expand All @@ -336,7 +336,7 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar
neighborTask = NewFindUserNeighborsTask(&s.Master)
s.NoError(neighborTask.run(context.Background(), nil))
similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))

Expand All @@ -348,10 +348,10 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto
neighborTask = NewFindUserNeighborsTask(&s.Master)
s.NoError(neighborTask.run(context.Background(), nil))
similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "8"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
similar, err = s.CacheClient.SearchScores(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
similar, err = s.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "9"), []string{""}, 0, 100)
s.NoError(err)
s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
}
Expand Down Expand Up @@ -742,31 +742,31 @@ func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() {

// empty cache
s.True(s.checkUserNeighborCacheTimeout("1"))
err := s.CacheClient.AddScores(ctx, cache.UserNeighbors, "1", []cache.Score{
err := s.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "1"), []cache.Score{
{Id: "1", Score: 1, Categories: []string{""}},
{Id: "2", Score: 2, Categories: []string{""}},
{Id: "3", Score: 3, Categories: []string{""}},
})
s.NoError(err)

// digest mismatch
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), "digest"))
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), "digest"))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))

// staled cache
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), s.Config.UserNeighborDigest()))
err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, "1")), s.Config.UserNeighborDigest()))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "1"), time.Now().Add(-time.Minute)))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now().Add(-time.Hour)))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now().Add(-time.Hour)))
s.NoError(err)
s.True(s.checkUserNeighborCacheTimeout("1"))

// not staled cache
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now()))
err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.UserToUserUpdateTime, cache.Key(cache.Neighbors, "1")), time.Now()))
s.NoError(err)
s.False(s.checkUserNeighborCacheTimeout("1"))
}
15 changes: 13 additions & 2 deletions server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,17 @@ func (s *RestServer) CreateWebService() {
Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
Returns(http.StatusOK, "OK", []cache.Score{}).
Writes([]cache.Score{}))
// Get user-to-user recommendation
ws.Route(ws.GET("/user-to-user/neighbors/{user-id}").To(s.getUserNeighbors).
Doc("Get user-to-user recommendation.").
Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
Param(ws.PathParameter("name", "Name of the user-to-user recommendation").DataType("string")).
Param(ws.PathParameter("user-id", "ID of the user to get neighbors").DataType("string")).
Param(ws.QueryParameter("n", "Number of returned users").DataType("integer")).
Param(ws.QueryParameter("offset", "Offset of returned users").DataType("integer")).
Returns(http.StatusOK, "OK", []cache.Score{}).
Writes([]cache.Score{}))
// Get neighbors
ws.Route(ws.GET("/item/{item-id}/neighbors/").To(s.getItemNeighbors).
Doc("Get neighbors of a item").
Expand Down Expand Up @@ -744,7 +755,7 @@ func (s *RestServer) getItemNeighbors(request *restful.Request, response *restfu
func (s *RestServer) getUserNeighbors(request *restful.Request, response *restful.Response) {
// Get item id
userId := request.PathParameter("user-id")
s.SearchDocuments(cache.UserNeighbors, userId, []string{""}, nil, request, response)
s.SearchDocuments(cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, nil, request, response)
}

// getCollaborative gets cached recommended items from database.
Expand Down Expand Up @@ -894,7 +905,7 @@ func (s *RestServer) RecommendUserBased(ctx *recommendContext) error {
start := time.Now()
candidates := make(map[string]float64)
// load similar users
similarUsers, err := s.CacheClient.SearchScores(ctx.context, cache.UserNeighbors, ctx.userId, []string{""}, 0, s.Config.Recommend.CacheSize)
similarUsers, err := s.CacheClient.SearchScores(ctx.context, cache.UserToUser, cache.Key(cache.Neighbors, ctx.userId), []string{""}, 0, s.Config.Recommend.CacheSize)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ func (suite *ServerTestSuite) TestGetRecommendsFallbackUserBasedSimilar() {
Body(`{"RowAffected": 4}`).
End()
// insert similar users
err = suite.CacheClient.AddScores(ctx, cache.UserNeighbors, "0", []cache.Score{
err = suite.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "0"), []cache.Score{
{Id: "1", Score: 2, Categories: []string{""}},
{Id: "2", Score: 1.5, Categories: []string{""}},
{Id: "3", Score: 1, Categories: []string{""}},
Expand Down
14 changes: 3 additions & 11 deletions storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ import (
)

const (
// UserNeighbors is sorted set of neighbors for each user.
// User neighbors - user_neighbors/{user_id}
UserNeighbors = "user_neighbors"

// UserNeighborsDigest is digest of user neighbors configuration
// User neighbors digest - user_neighbors_digest/{user_id}
UserNeighborsDigest = "user_neighbors_digest"

// CollaborativeRecommend is sorted set of collaborative filtering recommendations for each user.
// Global recommendation - collaborative_recommend/{user_id}
// Categorized recommendation - collaborative_recommend/{user_id}/{category}
Expand All @@ -75,6 +67,9 @@ const (
ItemToItem = "item-to-item"
ItemToItemDigest = "item-to-item_digest"
ItemToItemUpdateTime = "item-to-time_update_time"
UserToUser = "user-to-user"
UserToUserDigest = "user-to-user_digest"
UserToUserUpdateTime = "user-to-user_update_time"
Neighbors = "neighbors"

// ItemCategories is the set of item categories. The format of key:
Expand All @@ -84,7 +79,6 @@ const (
LastModifyItemTime = "last_modify_item_time" // the latest timestamp that a user related data was modified
LastModifyUserTime = "last_modify_user_time" // the latest timestamp that an item related data was modified
LastUpdateUserRecommendTime = "last_update_user_recommend_time" // the latest timestamp that a user's recommendation was updated
LastUpdateUserNeighborsTime = "last_update_user_neighbors_time" // the latest timestamp that a user's neighbors item was updated

// GlobalMeta is global meta information
GlobalMeta = "global_meta"
Expand All @@ -100,8 +94,6 @@ const (
LastFitRankingModelTime = "last_fit_ranking_model_time"
LastUpdateLatestItemsTime = "last_update_latest_items_time" // the latest timestamp that latest items were updated
LastUpdatePopularItemsTime = "last_update_popular_items_time" // the latest timestamp that popular items were updated
UserNeighborIndexRecall = "user_neighbor_index_recall"
ItemNeighborIndexRecall = "item_neighbor_index_recall"
MatchingIndexRecall = "matching_index_recall"
)

Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (w *Worker) Recommend(users []data.User) {
localStartTime := time.Now()
scores := make(map[string]float64)
// load similar users
similarUsers, err := w.CacheClient.SearchScores(ctx, cache.UserNeighbors, userId, []string{""}, 0, w.Config.Recommend.CacheSize)
similarUsers, err := w.CacheClient.SearchScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, userId), []string{""}, 0, w.Config.Recommend.CacheSize)
if err != nil {
log.Logger().Error("failed to load similar users", zap.Error(err))
return errors.Trace(err)
Expand All @@ -721,7 +721,7 @@ func (w *Worker) Recommend(users []data.User) {
}
}
// load user neighbors digest
digest, err := w.CacheClient.Get(ctx, cache.Key(cache.UserNeighborsDigest, user.Id)).String()
digest, err := w.CacheClient.Get(ctx, cache.Key(cache.UserToUserDigest, cache.Key(cache.Neighbors, user.Id))).String()
if err != nil {
if !errors.Is(err, errors.NotFound) {
log.Logger().Error("failed to load user neighbors digest", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (suite *WorkerTestSuite) TestRecommendUserBased() {
suite.Config.Recommend.Offline.EnableColRecommend = false
suite.Config.Recommend.Offline.EnableUserBasedRecommend = true
// insert similar users
err := suite.CacheClient.AddScores(ctx, cache.UserNeighbors, "0", []cache.Score{
err := suite.CacheClient.AddScores(ctx, cache.UserToUser, cache.Key(cache.Neighbors, "0"), []cache.Score{
{Id: "1", Score: 2, Categories: []string{""}},
{Id: "2", Score: 1.5, Categories: []string{""}},
{Id: "3", Score: 1, Categories: []string{""}},
Expand Down

0 comments on commit 4d8cdec

Please sign in to comment.