diff --git a/domain/entity/session.go b/domain/entity/session.go index 7bfcd307..e6ebfbad 100644 --- a/domain/entity/session.go +++ b/domain/entity/session.go @@ -99,7 +99,7 @@ func (s *Session) IsCreator(userID string) bool { func (s *Session) GoNextTrack() error { s.SetProgressWhenPaused(0 * time.Second) if len(s.QueueTracks) <= s.QueueHead+1 { - s.QueueHead++ // https://github.com/camphor-/relaym-server/blob/master/docs/definition.md#%E7%8F%BE%E5%9C%A8%E5%AF%BE%E8%B1%A1%E3%81%AE%E6%9B%B2%E3%81%AE%E3%82%A4%E3%83%B3%E3%83%87%E3%83%83%E3%82%AF%E3%82%B9-head + s.QueueHead = len(s.QueueTracks) // https://github.com/camphor-/relaym-server/blob/master/docs/definition.md#%E7%8F%BE%E5%9C%A8%E5%AF%BE%E8%B1%A1%E3%81%AE%E6%9B%B2%E3%81%AE%E3%82%A4%E3%83%B3%E3%83%87%E3%83%83%E3%82%AF%E3%82%B9-head s.StateType = Stop return ErrSessionAllTracksFinished } diff --git a/domain/entity/sync_check_timer.go b/domain/entity/sync_check_timer.go index 976e6970..904cc597 100644 --- a/domain/entity/sync_check_timer.go +++ b/domain/entity/sync_check_timer.go @@ -49,7 +49,7 @@ func newSyncCheckTimer() *SyncCheckTimer { return &SyncCheckTimer{ stopCh: make(chan struct{}, 2), - nextCh: make(chan struct{}, 1), + nextCh: make(chan struct{}, 10), isTimerExpired: true, timer: timer, } @@ -65,6 +65,14 @@ func (s *SyncCheckTimer) SetDuration(d time.Duration) { s.timer.Reset(d) } +// sendToNextTrackNotToExceedCap は チャネルのキャパシティを超えないようにしながら、nextChに構造体を送ります。 +// キャパシティを超えるとチャネルにメッセージが送られないので、API Rate Limitの役割を果たしています。 +func (s *SyncCheckTimer) sendToNextTrackNotToExceedCap() { + if len(s.nextCh) < cap(s.nextCh) { + s.nextCh <- struct{}{} + } +} + // SyncCheckTimerManager はSpotifyとの同期チェック用のタイマーを一括して管理する構造体です。 type SyncCheckTimerManager struct { timers map[string]*SyncCheckTimer @@ -137,7 +145,7 @@ func (m *SyncCheckTimerManager) SendToNextCh(sessionID string) error { logger.Debugj(map[string]interface{}{"message": "call next ch", "sessionID": sessionID}) if timer, ok := m.timers[sessionID]; ok { - timer.nextCh <- struct{}{} + timer.sendToNextTrackNotToExceedCap() return nil } diff --git a/usecase/session_state.go b/usecase/session_state.go index 38d2e85c..5241ad30 100644 --- a/usecase/session_state.go +++ b/usecase/session_state.go @@ -28,7 +28,7 @@ func NewSessionStateUseCase(sessionRepo repository.Session, playerCli spotify.Pl // NextTrack は指定されたidのsessionを次の曲に進めます func (s *SessionStateUseCase) NextTrack(ctx context.Context, sessionID string) error { - session, err := s.sessionRepo.FindByID(ctx, sessionID) + session, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID) if err != nil { return fmt.Errorf("find session id=%s: %w", sessionID, err) } @@ -40,15 +40,15 @@ func (s *SessionStateUseCase) NextTrack(ctx context.Context, sessionID string) e switch session.StateType { case entity.Play: - if err = s.nextTrackInPlay(ctx, session); err != nil { + if err = s.nextTrackInPlay(ctx, sessionID); err != nil { return fmt.Errorf("go next track in play session id=%s: %w", session.ID, err) } case entity.Pause: - if err = s.nextTrackInPause(ctx, session); err != nil { + if err = s.nextTrackInPause(ctx, sessionID); err != nil { return fmt.Errorf("go next track in pause session id=%s: %w", session.ID, err) } case entity.Stop: - if err = s.nextTrackInStop(ctx, session); err != nil { + if err = s.nextTrackInStop(ctx, sessionID); err != nil { return fmt.Errorf("go next track in stop session id=%s: %w", session.ID, err) } case entity.Archived: @@ -59,13 +59,9 @@ func (s *SessionStateUseCase) NextTrack(ctx context.Context, sessionID string) e } // nextTrackInPlay はsessionのstateがPLAYの時のnextTrackの処理を行います -func (s *SessionStateUseCase) nextTrackInPlay(ctx context.Context, session *entity.Session) error { - if err := s.playerCli.GoNextTrack(ctx, session.DeviceID); err != nil { - return fmt.Errorf("GoNextTrack: %w", err) - } - +func (s *SessionStateUseCase) nextTrackInPlay(ctx context.Context, sessionID string) error { // NextChを通してstartTrackEndTriggerに次の曲への遷移を通知 - if err := s.timerUC.sendToNextCh(session.ID); err != nil { + if err := s.timerUC.sendToNextCh(sessionID); err != nil { return fmt.Errorf("send to next ch: %w", err) } @@ -73,68 +69,97 @@ func (s *SessionStateUseCase) nextTrackInPlay(ctx context.Context, session *enti } // nextTrackInPause はsessionのstateがPAUSEの時のnextTrackの処理を行います -func (s *SessionStateUseCase) nextTrackInPause(ctx context.Context, session *entity.Session) error { - if err := s.playerCli.GoNextTrack(ctx, session.DeviceID); err != nil { - return fmt.Errorf("GoNextTrack: %w", err) +func (s *SessionStateUseCase) nextTrackInPause(ctx context.Context, sessionID string) error { + _, err := s.sessionRepo.DoInTx(ctx, s.nextTrackInPauseTx(sessionID)) + if err != nil { + return fmt.Errorf("nextTrackInPause transaction: %w", err) } - if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) { - s.timerUC.handleAllTrackFinish(session) - if err := s.sessionRepo.Update(ctx, session); err != nil { - return fmt.Errorf("update session id=%s: %w", session.ID, err) + return nil +} + +func (s *SessionStateUseCase) nextTrackInPauseTx(sessionID string) func(ctx context.Context) (interface{}, error) { + return func(ctx context.Context) (interface{}, error) { + session, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID) + if err != nil { + return nil, fmt.Errorf("find session: %w", err) + } + if err := s.playerCli.GoNextTrack(ctx, session.DeviceID); err != nil { + return nil, fmt.Errorf("GoNextTrack: %w", err) } - return nil - } - // GoNextTrackだけだと次の曲の再生が始まってしまう - if err := s.playerCli.Pause(ctx, session.DeviceID); err != nil { - return fmt.Errorf("call pause api: %w", err) - } + if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) { + s.timerUC.handleAllTrackFinish(session) + if err := s.sessionRepo.Update(ctx, session); err != nil { + return nil, fmt.Errorf("update session id=%s: %w", session.ID, err) + } + return nil, nil + } - if err := s.sessionRepo.Update(ctx, session); err != nil { - return fmt.Errorf("update session id=%s: %w", session.ID, err) - } + // GoNextTrackだけだと次の曲の再生が始まってしまう + if err := s.playerCli.Pause(ctx, session.DeviceID); err != nil { + return nil, fmt.Errorf("call pause api: %w", err) + } - track := session.TrackURIShouldBeAddedWhenHandleTrackEnd() - if track != "" { - if err := s.playerCli.Enqueue(ctx, track, session.DeviceID); err != nil { - return fmt.Errorf("enqueue error session id=%s: %w", session.ID, err) + if err := s.sessionRepo.Update(ctx, session); err != nil { + return nil, fmt.Errorf("update session id=%s: %w", session.ID, err) } - } - s.pusher.Push(&event.PushMessage{ - SessionID: session.ID, - Msg: entity.NewEventNextTrack(session.QueueHead), - }) + track := session.TrackURIShouldBeAddedWhenHandleTrackEnd() + if track != "" { + if err := s.playerCli.Enqueue(ctx, track, session.DeviceID); err != nil { + return nil, fmt.Errorf("enqueue error session id=%s: %w", session.ID, err) + } + } - return nil + s.pusher.Push(&event.PushMessage{ + SessionID: session.ID, + Msg: entity.NewEventNextTrack(session.QueueHead), + }) + return nil, nil + } } // nextTrackInStop はsessionのstateがSTOPの時のnextTrackの処理を行います // stopToPlayで曲がResetされ、再度Spotifyのキューに積まれるため、Enqueueを行っていません -func (s *SessionStateUseCase) nextTrackInStop(ctx context.Context, session *entity.Session) error { - if !session.IsNextTrackExistWhenStateIsStop() { - return fmt.Errorf("nextTrackInStop: %w", entity.ErrNextQueueTrackNotFound) +func (s *SessionStateUseCase) nextTrackInStop(ctx context.Context, sessionID string) error { + _, err := s.sessionRepo.DoInTx(ctx, s.nextTrackInStopTx(sessionID)) + if err != nil { + return fmt.Errorf("nextTrackInStop transaction: %w", err) } - if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) { - s.timerUC.handleAllTrackFinish(session) - if err := s.sessionRepo.Update(ctx, session); err != nil { - return fmt.Errorf("update session id=%s: %w", session.ID, err) + return nil +} + +func (s *SessionStateUseCase) nextTrackInStopTx(sessionID string) func(ctx context.Context) (interface{}, error) { + return func(ctx context.Context) (interface{}, error) { + session, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID) + if err != nil { + return nil, fmt.Errorf("find session :%w", err) + } + if !session.IsNextTrackExistWhenStateIsStop() { + return nil, fmt.Errorf("nextTrackInStop: %w", entity.ErrNextQueueTrackNotFound) } - return nil - } - if err := s.sessionRepo.Update(ctx, session); err != nil { - return fmt.Errorf("update session id=%s: %w", session.ID, err) - } + if err := session.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) { + s.timerUC.handleAllTrackFinish(session) + if err := s.sessionRepo.Update(ctx, session); err != nil { + return nil, fmt.Errorf("update session id=%s: %w", session.ID, err) + } + return nil, nil + } - s.pusher.Push(&event.PushMessage{ - SessionID: session.ID, - Msg: entity.NewEventNextTrack(session.QueueHead), - }) + if err := s.sessionRepo.Update(ctx, session); err != nil { + return nil, fmt.Errorf("update session id=%s: %w", session.ID, err) + } - return nil + s.pusher.Push(&event.PushMessage{ + SessionID: sessionID, + Msg: entity.NewEventNextTrack(session.QueueHead), + }) + + return nil, nil + } } // ChangeSessionState は与えられたセッションのstateを操作します。 diff --git a/usecase/session_state_test.go b/usecase/session_state_test.go new file mode 100644 index 00000000..4b8aebcd --- /dev/null +++ b/usecase/session_state_test.go @@ -0,0 +1,381 @@ +package usecase + +import ( + "context" + "testing" + "time" + + "github.com/camphor-/relaym-server/domain/service" + + "github.com/camphor-/relaym-server/domain/entity" + "github.com/camphor-/relaym-server/domain/event" + "github.com/camphor-/relaym-server/domain/mock_event" + "github.com/camphor-/relaym-server/domain/mock_repository" + "github.com/camphor-/relaym-server/domain/mock_spotify" + "github.com/golang/mock/gomock" +) + +func TestSessionStateUseCase_nextTrackInPauseTx(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sessionID string + userID string + addToTimerSessionID string + prepareMockPlayerCliFn func(m *mock_spotify.MockPlayer) + prepareMockSessionRepoFn func(m *mock_repository.MockSession) + prepareMockPusherFn func(m *mock_event.MockPusher) + prepareMockTrackCliFn func(m *mock_spotify.MockTrackClient) + prepareMockUserRepoFn func(m *mock_repository.MockUser) + wantErr bool + }{ + { + name: "Pauseかつ次の曲が存在すると次の曲に遷移し、202", + sessionID: "sessionID", + userID: "userID", + addToTimerSessionID: "sessionID", + prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) { + m.EXPECT().GoNextTrack(gomock.Any(), "deviceID").Return(nil) + m.EXPECT().Pause(gomock.Any(), "deviceID").Return(nil) + }, + prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( + &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "PAUSE", + QueueHead: 0, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }, nil) + m.EXPECT().Update(gomock.Any(), &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "PAUSE", + QueueHead: 1, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }) + }, + prepareMockPusherFn: func(m *mock_event.MockPusher) { + m.EXPECT().Push(&event.PushMessage{ + SessionID: "sessionID", + Msg: entity.NewEventNextTrack(1), + }) + }, + prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, + prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, + wantErr: false, + }, + { + name: "Pauseかつ次の曲が3曲存在すると次の曲に遷移し、三曲先がEnqueueされ、202", + sessionID: "sessionID", + userID: "userID", + addToTimerSessionID: "sessionID", + prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) { + m.EXPECT().GoNextTrack(gomock.Any(), "deviceID").Return(nil) + m.EXPECT().Pause(gomock.Any(), "deviceID").Return(nil) + m.EXPECT().Enqueue(gomock.Any(), "spotify:track:track_uri4", "deviceID").Return(nil) + }, + prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( + &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "PAUSE", + QueueHead: 0, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + { + Index: 2, + URI: "spotify:track:track_uri3", + SessionID: "sessionID", + }, + { + Index: 3, + URI: "spotify:track:track_uri4", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }, nil) + m.EXPECT().Update(gomock.Any(), &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "PAUSE", + QueueHead: 1, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + { + Index: 2, + URI: "spotify:track:track_uri3", + SessionID: "sessionID", + }, + { + Index: 3, + URI: "spotify:track:track_uri4", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }) + }, + prepareMockPusherFn: func(m *mock_event.MockPusher) { + m.EXPECT().Push(&event.PushMessage{ + SessionID: "sessionID", + Msg: entity.NewEventNextTrack(1), + }) + }, + prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, + prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // モックの準備 + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + uc := newSessionStateUseCaseForTest(t, ctrl, tt.prepareMockPlayerCliFn, tt.prepareMockTrackCliFn, + tt.prepareMockPusherFn, tt.prepareMockUserRepoFn, tt.prepareMockSessionRepoFn, tt.addToTimerSessionID) + + ctx := context.Background() + ctx = service.SetUserIDToContext(ctx, tt.userID) + + _, err := uc.nextTrackInPauseTx(tt.sessionID)(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("NextTrack() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestSessionStateUseCase_nextTrackInStopTx(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sessionID string + userID string + addToTimerSessionID string + prepareMockPlayerCliFn func(m *mock_spotify.MockPlayer) + prepareMockSessionRepoFn func(m *mock_repository.MockSession) + prepareMockPusherFn func(m *mock_event.MockPusher) + prepareMockTrackCliFn func(m *mock_spotify.MockTrackClient) + prepareMockUserRepoFn func(m *mock_repository.MockUser) + wantErr bool + }{ + { + name: "STOPかつ次の曲が存在する時に次の曲にSTOPのまま遷移,202", + sessionID: "sessionID", + userID: "userID", + addToTimerSessionID: "sessionID", + prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) {}, + prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( + &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "STOP", + QueueHead: 0, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }, nil) + m.EXPECT().Update(gomock.Any(), &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "STOP", + QueueHead: 1, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }).Return(nil) + }, + prepareMockPusherFn: func(m *mock_event.MockPusher) { + m.EXPECT().Push(&event.PushMessage{ + SessionID: "sessionID", + Msg: entity.NewEventNextTrack(1), + }) + }, + prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, + prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, + wantErr: false, + }, + { + name: "STOPかつ次の曲が存在しない時にErrNextQueueTrackNotFound,400", + sessionID: "sessionID", + userID: "userID", + addToTimerSessionID: "sessionID", + prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) {}, + prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( + &entity.Session{ + ID: "sessionID", + Name: "name", + CreatorID: "creatorID", + DeviceID: "deviceID", + StateType: "STOP", + QueueHead: 2, + QueueTracks: []*entity.QueueTrack{ + { + Index: 0, + URI: "spotify:track:track_uri1", + SessionID: "sessionID", + }, + { + Index: 1, + URI: "spotify:track:track_uri2", + SessionID: "sessionID", + }, + }, + ExpiredAt: time.Time{}, + AllowToControlByOthers: true, + ProgressWhenPaused: 0, + }, nil) + }, + prepareMockPusherFn: func(m *mock_event.MockPusher) {}, + prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, + prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // モックの準備 + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + uc := newSessionStateUseCaseForTest(t, ctrl, tt.prepareMockPlayerCliFn, tt.prepareMockTrackCliFn, + tt.prepareMockPusherFn, tt.prepareMockUserRepoFn, tt.prepareMockSessionRepoFn, tt.addToTimerSessionID) + + ctx := context.Background() + ctx = service.SetUserIDToContext(ctx, tt.userID) + + _, err := uc.nextTrackInStopTx(tt.sessionID)(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("NextTrack() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +// モックの準備 +func newSessionStateUseCaseForTest( + t *testing.T, + ctrl *gomock.Controller, + prepareMockPlayerFn func(m *mock_spotify.MockPlayer), + prepareMockTrackFun func(m *mock_spotify.MockTrackClient), + prepareMockPusherFn func(m *mock_event.MockPusher), + prepareMockUserRepoFn func(m *mock_repository.MockUser), + prepareMockSessionRepoFn func(m *mock_repository.MockSession), + sessionID string) *SessionStateUseCase { + t.Helper() + + mockPlayer := mock_spotify.NewMockPlayer(ctrl) + prepareMockPlayerFn(mockPlayer) + mockTrackCli := mock_spotify.NewMockTrackClient(ctrl) + prepareMockTrackFun(mockTrackCli) + mockPusher := mock_event.NewMockPusher(ctrl) + prepareMockPusherFn(mockPusher) + mockUserRepo := mock_repository.NewMockUser(ctrl) + prepareMockUserRepoFn(mockUserRepo) + mockSessionRepo := mock_repository.NewMockSession(ctrl) + prepareMockSessionRepoFn(mockSessionRepo) + syncCheckTimerManager := entity.NewSyncCheckTimerManager() + if sessionID != "" { + timer := syncCheckTimerManager.CreateExpiredTimer(sessionID) + timer.SetDuration(5 * time.Minute) + } + timerUC := NewSessionTimerUseCase(mockSessionRepo, mockPlayer, mockPusher, syncCheckTimerManager) + return NewSessionStateUseCase(mockSessionRepo, mockPlayer, mockPusher, timerUC) + +} diff --git a/usecase/session_timer.go b/usecase/session_timer.go index aff3c581..1e27da07 100644 --- a/usecase/session_timer.go +++ b/usecase/session_timer.go @@ -52,7 +52,7 @@ func (s *SessionTimerUseCase) startTrackEndTrigger(ctx context.Context, sessionI case <-triggerAfterTrackEnd.NextCh(): logger.Debugj(map[string]interface{}{"message": "call to move next track", "sessionID": sessionID}) waitTimer.Stop() - nextTrack, err := s.handleTrackEnd(ctx, sessionID) + nextTrack, err := s.handleNext(ctx, sessionID) if err != nil { if errors.Is(err, entity.ErrSessionPlayingDifferentTrack) { logger.Infoj(map[string]interface{}{"message": "handleTrackEnd detects interrupt", "sessionID": sessionID, "error": err.Error()}) @@ -128,21 +128,6 @@ func (s *SessionTimerUseCase) handleWaitTimerExpired(ctx context.Context, sessio return fmt.Errorf("session interrupt") } - track := sess.TrackURIShouldBeAddedWhenHandleTrackEnd() - if track != "" { - if err := s.playerCli.Enqueue(ctx, track, sess.DeviceID); err != nil { - s.handleInterrupt(sess) - if err := s.sessionRepo.Update(ctx, sess); err != nil { - logger.Errorj(map[string]interface{}{ - "message": "handleWaitTimerExpired: failed to update session after Enqueue and handleInterrupt", - "sessionID": sessionID, - "error": err.Error(), - }) - return fmt.Errorf("failed to enqueue track") - } - } - } - switch currentOperation { case operationNextTrack: s.pusher.Push(&event.PushMessage{ @@ -182,6 +167,23 @@ func (s *SessionTimerUseCase) handleTrackEnd(ctx context.Context, sessionID stri return false, nil } +// handleNext は曲がスキップされたときの処理を行います。 +func (s *SessionTimerUseCase) handleNext(ctx context.Context, sessionID string) (bool, error) { + triggerAfterTrackEndResponse, err := s.sessionRepo.DoInTx(ctx, s.handleNextTx(sessionID)) + if v, ok := triggerAfterTrackEndResponse.(*handleTrackEndResponse); ok { + // これはトランザクションが失敗してRollbackしたとき + if err != nil { + return false, fmt.Errorf("handle track end in transaction: %w", err) + } + return v.nextTrack, v.err + } + // これはトランザクションが失敗してRollbackしたとき + if err != nil { + return false, fmt.Errorf("handle track end in transaction: %w", err) + } + return false, nil +} + // handleTrackEndTx はINTERRUPTになってerrorを帰す場合もトランザクションをコミットして欲しいので、 // アプリケーションエラーはhandleTrackEndResponseのフィールドで返すようにしてerrorの返り値はnilにしている func (s *SessionTimerUseCase) handleTrackEndTx(sessionID string) func(ctx context.Context) (interface{}, error) { @@ -202,20 +204,57 @@ func (s *SessionTimerUseCase) handleTrackEndTx(sessionID string) func(ctx contex } }() - // 曲の再生中にArchivedになった場合 if sess.StateType == entity.Archived { + return s.handleArchiveInTransaction(sessionID) + } - s.pusher.Push(&event.PushMessage{ - SessionID: sess.ID, - Msg: entity.EventArchived, - }) - + if err := sess.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) { + s.handleAllTrackFinish(sess) return &handleTrackEndResponse{ nextTrack: false, err: nil, }, nil } + res, err := s.enqueueTrackInTransaction(ctx, sess) + if res != nil { + return res, err + } + + logger.Debugj(map[string]interface{}{"message": "next track", "sessionID": sess.ID, "queueHead": sess.QueueHead}) + + return &handleTrackEndResponse{nextTrack: true, err: nil}, nil + } +} + +// handleNextTx はINTERRUPTになってerrorを帰す場合もトランザクションをコミットして欲しいので、 +// アプリケーションエラーはhandleTrackEndResponseのフィールドで返すようにしてerrorの返り値はnilにしている +func (s *SessionTimerUseCase) handleNextTx(sessionID string) func(ctx context.Context) (interface{}, error) { + logger := log.New() + return func(ctx context.Context) (_ interface{}, returnErr error) { + sess, err := s.sessionRepo.FindByIDForUpdate(ctx, sessionID) + if err != nil { + return &handleTrackEndResponse{nextTrack: false}, fmt.Errorf("find session id=%s: %v", sessionID, err) + } + + defer func() { + if err := s.sessionRepo.Update(ctx, sess); err != nil { + if returnErr != nil { + returnErr = fmt.Errorf("update session id=%s: %v: %w", sess.ID, err, returnErr) + } else { + returnErr = fmt.Errorf("update session id=%s: %w", sess.ID, err) + } + } + }() + + if err := s.playerCli.GoNextTrack(ctx, sess.DeviceID); err != nil { + return &handleTrackEndResponse{nextTrack: false}, fmt.Errorf("GoNextTrack: %w", err) + } + + if sess.StateType == entity.Archived { + return s.handleArchiveInTransaction(sessionID) + } + if err := sess.GoNextTrack(); err != nil && errors.Is(err, entity.ErrSessionAllTracksFinished) { s.handleAllTrackFinish(sess) return &handleTrackEndResponse{ @@ -224,12 +263,50 @@ func (s *SessionTimerUseCase) handleTrackEndTx(sessionID string) func(ctx contex }, nil } + res, err := s.enqueueTrackInTransaction(ctx, sess) + if res != nil { + return res, err + } + logger.Debugj(map[string]interface{}{"message": "next track", "sessionID": sess.ID, "queueHead": sess.QueueHead}) return &handleTrackEndResponse{nextTrack: true, err: nil}, nil } } +func (s *SessionTimerUseCase) handleArchiveInTransaction(sessionID string) (*handleTrackEndResponse, error) { + s.pusher.Push(&event.PushMessage{ + SessionID: sessionID, + Msg: entity.EventArchived, + }) + + return &handleTrackEndResponse{ + nextTrack: false, + err: nil, + }, nil +} + +func (s *SessionTimerUseCase) enqueueTrackInTransaction(ctx context.Context, sess *entity.Session) (*handleTrackEndResponse, error) { + logger := log.New() + + track := sess.TrackURIShouldBeAddedWhenHandleTrackEnd() + if track != "" { + if err := s.playerCli.Enqueue(ctx, track, sess.DeviceID); err != nil { + s.handleInterrupt(sess) + if err := s.sessionRepo.Update(ctx, sess); err != nil { + logger.Errorj(map[string]interface{}{ + "message": "handleWaitTimerExpired: failed to update session after Enqueue and handleInterrupt", + "sessionID": sess.ID, + "error": err.Error(), + }) + return &handleTrackEndResponse{err: err}, fmt.Errorf("failed to enqueue track") + } + return &handleTrackEndResponse{nextTrack: false, err: nil}, nil + } + } + return nil, nil +} + // handleAllTrackFinish はキューの全ての曲の再生が終わったときの処理を行います。 func (s *SessionTimerUseCase) handleAllTrackFinish(sess *entity.Session) { logger := log.New() diff --git a/usecase/session_timer_test.go b/usecase/session_timer_test.go index b8446d11..1fecc823 100644 --- a/usecase/session_timer_test.go +++ b/usecase/session_timer_test.go @@ -434,63 +434,6 @@ func TestSessionTimerUseCase_handleWaitTimerExpired(t *testing.T) { }, wantErr: false, }, - { - name: "Spotifyとの同期が取れていることが確認されると、新しく追加すべき曲がSpotifyのキューに追加される", - sessionID: "sessionID", - currentOperation: "NextTrack", - prepareMockPlayerFn: func(m *mock_spotify.MockPlayer) { - m.EXPECT().CurrentlyPlaying(gomock.Any()).Return(&entity.CurrentPlayingInfo{ - Playing: true, - Progress: 10000000, - Track: &entity.Track{ - URI: "spotify:track:06QTSGUEgcmKwiEJ0IMPig", - ID: "06QTSGUEgcmKwiEJ0IMPig", - Name: "Borderland", - Duration: 213066000000, - Artists: []*entity.Artist{{Name: "MONOEYES"}}, - URL: "https://open.spotify.com/track/06QTSGUEgcmKwiEJ0IMPig", - Album: &entity.Album{ - Name: "Interstate 46 E.P.", - Images: []*entity.AlbumImage{ - { - URL: "https://i.scdn.co/image/ab67616d0000b273b48630d6efcebca2596120c4", - Height: 640, - Width: 640, - }, - }, - }, - }, - }, nil) - m.EXPECT().Enqueue(gomock.Any(), "spotify:track:track3", "deviceID").Return(nil) - }, - prepareMockPusherFn: func(m *mock_event.MockPusher) { - m.EXPECT().Push(&event.PushMessage{ - SessionID: "sessionID", - Msg: entity.NewEventNextTrack(1), - }) - }, - prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, - prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return(&entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "PLAY", - QueueHead: 1, - QueueTracks: []*entity.QueueTrack{ - {Index: 0, URI: "spotify:track:5uQ0vKy2973Y9IUCd1wMEF"}, - {Index: 1, URI: "spotify:track:06QTSGUEgcmKwiEJ0IMPig"}, - {Index: 2, URI: "spotify:track:track2"}, - {Index: 3, URI: "spotify:track:track3"}, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: false, - ProgressWhenPaused: 0, - }, nil) - }, - wantErr: false, - }, { name: "Spotifyとの同期が取れていないとhandleInterruptが呼び出されErrorが返る", sessionID: "sessionID", diff --git a/web/handler/session_test.go b/web/handler/session_test.go index 757612a0..dd010d7b 100644 --- a/web/handler/session_test.go +++ b/web/handler/session_test.go @@ -828,7 +828,8 @@ func TestUserHandler_GetActiveDevices(t *testing.T) { } } -func TestUserHandler_NextTrack(t *testing.T) { +// 正常系のテストケースはトランザクションの中を確かめる必要があるので、 usecase/session_state_test.go で行っている +func TestSessionHandler_NextTrack(t *testing.T) { t.Parallel() tests := []struct { @@ -844,72 +845,6 @@ func TestUserHandler_NextTrack(t *testing.T) { wantErr bool wantCode int }{ - { - name: "STOPかつ次の曲が存在する時に次の曲にSTOPのまま遷移,202", - sessionID: "sessionID", - userID: "userID", - addToTimerSessionID: "sessionID", - prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) {}, - prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( - &entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "STOP", - QueueHead: 0, - QueueTracks: []*entity.QueueTrack{ - { - Index: 0, - URI: "spotify:track:track_uri1", - SessionID: "sessionID", - }, - { - Index: 1, - URI: "spotify:track:track_uri2", - SessionID: "sessionID", - }, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: true, - ProgressWhenPaused: 0, - }, nil) - m.EXPECT().Update(gomock.Any(), &entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "STOP", - QueueHead: 1, - QueueTracks: []*entity.QueueTrack{ - { - Index: 0, - URI: "spotify:track:track_uri1", - SessionID: "sessionID", - }, - { - Index: 1, - URI: "spotify:track:track_uri2", - SessionID: "sessionID", - }, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: true, - ProgressWhenPaused: 0, - }).Return(nil) - }, - prepareMockPusherFn: func(m *mock_event.MockPusher) { - m.EXPECT().Push(&event.PushMessage{ - SessionID: "sessionID", - Msg: entity.NewEventNextTrack(1), - }) - }, - prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, - prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, - wantErr: false, - wantCode: http.StatusAccepted, - }, { name: "STOPかつ次の曲が存在しない時にErrNextQueueTrackNotFound,400", sessionID: "sessionID", @@ -917,7 +852,7 @@ func TestUserHandler_NextTrack(t *testing.T) { addToTimerSessionID: "sessionID", prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) {}, prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( &entity.Session{ ID: "sessionID", Name: "name", @@ -941,6 +876,7 @@ func TestUserHandler_NextTrack(t *testing.T) { AllowToControlByOthers: true, ProgressWhenPaused: 0, }, nil) + m.EXPECT().DoInTx(gomock.Any(), gomock.Any()).Return(nil, entity.ErrNextQueueTrackNotFound) }, prepareMockPusherFn: func(m *mock_event.MockPusher) {}, prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, @@ -955,7 +891,7 @@ func TestUserHandler_NextTrack(t *testing.T) { addToTimerSessionID: "sessionID", prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) {}, prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( &entity.Session{ ID: "sessionID", Name: "name", @@ -981,10 +917,9 @@ func TestUserHandler_NextTrack(t *testing.T) { userID: "userID", addToTimerSessionID: "sessionID", prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) { - m.EXPECT().GoNextTrack(gomock.Any(), "deviceID").Return(nil) }, prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( &entity.Session{ ID: "sessionID", Name: "name", @@ -1011,7 +946,7 @@ func TestUserHandler_NextTrack(t *testing.T) { addToTimerSessionID: "sessionID", prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) {}, prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( + m.EXPECT().FindByIDForUpdate(gomock.Any(), "sessionID").Return( &entity.Session{ ID: "sessionID", Name: "name", @@ -1031,165 +966,6 @@ func TestUserHandler_NextTrack(t *testing.T) { wantErr: true, wantCode: http.StatusBadRequest, }, - { - name: "Pauseかつ次の曲が存在すると次の曲に遷移し、202", - sessionID: "sessionID", - userID: "userID", - addToTimerSessionID: "sessionID", - prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) { - m.EXPECT().GoNextTrack(gomock.Any(), "deviceID").Return(nil) - m.EXPECT().Pause(gomock.Any(), "deviceID").Return(nil) - }, - prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( - &entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "PAUSE", - QueueHead: 0, - QueueTracks: []*entity.QueueTrack{ - { - Index: 0, - URI: "spotify:track:track_uri1", - SessionID: "sessionID", - }, - { - Index: 1, - URI: "spotify:track:track_uri2", - SessionID: "sessionID", - }, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: true, - ProgressWhenPaused: 0, - }, nil) - m.EXPECT().Update(gomock.Any(), &entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "PAUSE", - QueueHead: 1, - QueueTracks: []*entity.QueueTrack{ - { - Index: 0, - URI: "spotify:track:track_uri1", - SessionID: "sessionID", - }, - { - Index: 1, - URI: "spotify:track:track_uri2", - SessionID: "sessionID", - }, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: true, - ProgressWhenPaused: 0, - }) - }, - prepareMockPusherFn: func(m *mock_event.MockPusher) { - m.EXPECT().Push(&event.PushMessage{ - SessionID: "sessionID", - Msg: entity.NewEventNextTrack(1), - }) - }, - prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, - prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, - wantErr: false, - wantCode: http.StatusAccepted, - }, - { - name: "Pauseかつ次の曲が3曲存在すると次の曲に遷移し、三曲先がEnqueueされ、202", - sessionID: "sessionID", - userID: "userID", - addToTimerSessionID: "sessionID", - prepareMockPlayerCliFn: func(m *mock_spotify.MockPlayer) { - m.EXPECT().GoNextTrack(gomock.Any(), "deviceID").Return(nil) - m.EXPECT().Pause(gomock.Any(), "deviceID").Return(nil) - m.EXPECT().Enqueue(gomock.Any(), "spotify:track:track_uri4", "deviceID").Return(nil) - }, - prepareMockSessionRepoFn: func(m *mock_repository.MockSession) { - m.EXPECT().FindByID(gomock.Any(), "sessionID").Return( - &entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "PAUSE", - QueueHead: 0, - QueueTracks: []*entity.QueueTrack{ - { - Index: 0, - URI: "spotify:track:track_uri1", - SessionID: "sessionID", - }, - { - Index: 1, - URI: "spotify:track:track_uri2", - SessionID: "sessionID", - }, - { - Index: 2, - URI: "spotify:track:track_uri3", - SessionID: "sessionID", - }, - { - Index: 3, - URI: "spotify:track:track_uri4", - SessionID: "sessionID", - }, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: true, - ProgressWhenPaused: 0, - }, nil) - m.EXPECT().Update(gomock.Any(), &entity.Session{ - ID: "sessionID", - Name: "name", - CreatorID: "creatorID", - DeviceID: "deviceID", - StateType: "PAUSE", - QueueHead: 1, - QueueTracks: []*entity.QueueTrack{ - { - Index: 0, - URI: "spotify:track:track_uri1", - SessionID: "sessionID", - }, - { - Index: 1, - URI: "spotify:track:track_uri2", - SessionID: "sessionID", - }, - { - Index: 2, - URI: "spotify:track:track_uri3", - SessionID: "sessionID", - }, - { - Index: 3, - URI: "spotify:track:track_uri4", - SessionID: "sessionID", - }, - }, - ExpiredAt: time.Time{}, - AllowToControlByOthers: true, - ProgressWhenPaused: 0, - }) - }, - prepareMockPusherFn: func(m *mock_event.MockPusher) { - m.EXPECT().Push(&event.PushMessage{ - SessionID: "sessionID", - Msg: entity.NewEventNextTrack(1), - }) - }, - prepareMockTrackCliFn: func(m *mock_spotify.MockTrackClient) {}, - prepareMockUserRepoFn: func(m *mock_repository.MockUser) {}, - wantErr: false, - wantCode: http.StatusAccepted, - }, } for _, tt := range tests { @@ -1198,7 +974,7 @@ func TestUserHandler_NextTrack(t *testing.T) { req := httptest.NewRequest(http.MethodPut, "/", nil) rec := httptest.NewRecorder() c := e.NewContext(req, rec) - c.SetPath("/sessions/:id/devices") + c.SetPath("/sessions/:id/next") c.SetParamNames("id") c.SetParamValues(tt.sessionID) c = setToContext(c, tt.userID, nil)