From 8963ebc397e50800f577b679ebf2fb1d4e2ae37b Mon Sep 17 00:00:00 2001 From: Shahen Hovhannisyan Date: Thu, 27 Jun 2024 01:06:25 +0400 Subject: [PATCH 1/3] feat(jobs): Introducing new feature --- VERSION | 2 +- domain/command/job.go | 11 ++++ domain/command/reply.go | 9 +++ domain/command/store.go | 6 +- domain/command/types.go | 8 +++ domain/jobs/job.go | 8 +++ domain/jobs/manager.go | 8 +++ go.mod | 12 +++- go.sum | 10 ++++ pkg/actor/actor.go | 5 ++ pkg/actor/actor_impl.go | 24 ++++++++ pkg/actor/cancel_job.go | 16 +++++ pkg/actor/register_job.go | 19 ++++++ pkg/command/command.go | 23 ++++++-- pkg/command/generator.go | 4 ++ pkg/command/job_command.go | 97 +++++++++++++++++++++++++++++++ pkg/command/job_command_reply.go | 21 +++++++ pkg/jobs/job.go | 25 ++++++++ pkg/jobs/job_impl.go | 17 ++++++ pkg/jobs/job_test.go | 31 ++++++++++ pkg/jobs/manager.go | 15 +++++ pkg/jobs/manager_impl.go | 44 ++++++++++++++ pkg/jobs/manager_test.go | 35 +++++++++++ pkg/server/handler.go | 6 ++ pkg/server/job_command_handler.go | 17 ++++++ pkg/server/server_impl.go | 2 + 26 files changed, 461 insertions(+), 14 deletions(-) create mode 100644 domain/command/job.go create mode 100644 domain/command/reply.go create mode 100644 domain/jobs/job.go create mode 100644 domain/jobs/manager.go create mode 100644 pkg/actor/cancel_job.go create mode 100644 pkg/actor/register_job.go create mode 100644 pkg/command/job_command.go create mode 100644 pkg/command/job_command_reply.go create mode 100644 pkg/jobs/job.go create mode 100644 pkg/jobs/job_impl.go create mode 100644 pkg/jobs/job_test.go create mode 100644 pkg/jobs/manager.go create mode 100644 pkg/jobs/manager_impl.go create mode 100644 pkg/jobs/manager_test.go create mode 100644 pkg/server/job_command_handler.go diff --git a/VERSION b/VERSION index e2cac26..589268e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.2.3 \ No newline at end of file +1.3.0 \ No newline at end of file diff --git a/domain/command/job.go b/domain/command/job.go new file mode 100644 index 0000000..9b7d4c4 --- /dev/null +++ b/domain/command/job.go @@ -0,0 +1,11 @@ +package command + +type JobCommand interface { + ID() string + Type() CommandType + Delay() uint64 + IsPeriodic() bool + JobId() string + + Reply +} diff --git a/domain/command/reply.go b/domain/command/reply.go new file mode 100644 index 0000000..ea6698f --- /dev/null +++ b/domain/command/reply.go @@ -0,0 +1,9 @@ +package command + +import "net" + +type Reply interface { + ReplyError(conn net.Conn, response string) + ReplySuccess(conn net.Conn) + ReplyOK(conn net.Conn, response string) +} diff --git a/domain/command/store.go b/domain/command/store.go index 71205e7..3e6dcef 100644 --- a/domain/command/store.go +++ b/domain/command/store.go @@ -1,7 +1,5 @@ package command -import "net" - type StoreCommand interface { ID() string Type() CommandType @@ -9,7 +7,5 @@ type StoreCommand interface { Value() *string TTL() *uint64 - ReplyError(conn net.Conn, response string) - ReplySuccess(conn net.Conn) - ReplyOK(conn net.Conn, response string) + Reply } diff --git a/domain/command/types.go b/domain/command/types.go index 1f074e3..482fdd7 100644 --- a/domain/command/types.go +++ b/domain/command/types.go @@ -3,11 +3,19 @@ package command type CommandType = string const ( + // Store commands CommandGet CommandType = "GET" CommandSet CommandType = "SET" CommandDelete CommandType = "DELETE" + // Pubsub commands CommandPublish CommandType = "PUBLISH" CommandSubscribe CommandType = "SUBSCRIBE" CommandUnsubscribe CommandType = "UNSUBSCRIBE" + + // Job commands + CommandJobRegister CommandType = "JOB_REGISTER" + CommandJobStatus CommandType = "JOB_STATUS" + CommandJobCancel CommandType = "JOB_CANCEL" + CommandJobExecute CommandType = "JOB_EXECUTE" ) diff --git a/domain/jobs/job.go b/domain/jobs/job.go new file mode 100644 index 0000000..40ebb90 --- /dev/null +++ b/domain/jobs/job.go @@ -0,0 +1,8 @@ +package jobs + +type Job interface { + ID() string + Delay() uint64 + IsPeriodic() bool + PeerId() string +} diff --git a/domain/jobs/manager.go b/domain/jobs/manager.go new file mode 100644 index 0000000..c788723 --- /dev/null +++ b/domain/jobs/manager.go @@ -0,0 +1,8 @@ +package jobs + +type JobsManager interface { + Add(peerId string, delay uint64, isPeriodic bool) string + Get(id string) (Job, bool) + Cancel(id string) + Chan() <-chan Job +} diff --git a/go.mod b/go.mod index c0f5607..33d0e59 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,14 @@ module famcache go 1.21.6 -require github.com/pkg/errors v0.9.1 +require ( + github.com/pkg/errors v0.9.1 + github.com/stretchr/testify v1.9.0 +) -require github.com/google/uuid v1.6.0 // indirect +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 52a4c7a..e4a26a7 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,14 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/actor/actor.go b/pkg/actor/actor.go index 6808cf3..94554d8 100644 --- a/pkg/actor/actor.go +++ b/pkg/actor/actor.go @@ -3,9 +3,11 @@ package actor import ( "famcache/domain/cache" "famcache/domain/connection" + "famcache/domain/jobs" "famcache/domain/logger" "famcache/domain/pubsub" conn "famcache/pkg/connection" + job "famcache/pkg/jobs" queue "famcache/pkg/pubsub" "time" ) @@ -16,6 +18,7 @@ type Actor struct { messagingQueue pubsub.Queue peers connection.PeersManager queueTicker time.Ticker + jobs jobs.JobsManager } func (a *Actor) Peers() *connection.PeersManager { @@ -26,6 +29,7 @@ func NewActor(logger *logger.Logger, cache *cache.Cache) Actor { messagingQueue := queue.NewPubsubQueue() peers := conn.NewPeersManager() queueTicker := *time.NewTicker(1 * time.Second) + jobs := job.NewJobsManager() return Actor{ logger, @@ -33,5 +37,6 @@ func NewActor(logger *logger.Logger, cache *cache.Cache) Actor { messagingQueue, peers, queueTicker, + jobs, } } diff --git a/pkg/actor/actor_impl.go b/pkg/actor/actor_impl.go index e110a05..d1c5d25 100644 --- a/pkg/actor/actor_impl.go +++ b/pkg/actor/actor_impl.go @@ -1,5 +1,7 @@ package actor +import "famcache/pkg/command" + func (actor *Actor) ListenFailedMessages() { for { <-actor.queueTicker.C @@ -22,3 +24,25 @@ func (actor *Actor) ListenFailedMessages() { } } } + +func (actor *Actor) ListenJobs() { + for { + job := <-actor.jobs.Chan() + peerId := job.PeerId() + + peer := actor.peers.GetById(peerId) + + if peer == nil { + continue + } + + command := command.JobExecuteCommand(job.ID()) + + peer.Conn().Write([]byte(command)) + } +} + +func (actor *Actor) Start() { + go actor.ListenFailedMessages() + go actor.ListenJobs() +} diff --git a/pkg/actor/cancel_job.go b/pkg/actor/cancel_job.go new file mode 100644 index 0000000..c5f58c5 --- /dev/null +++ b/pkg/actor/cancel_job.go @@ -0,0 +1,16 @@ +package actor + +import ( + "famcache/domain/command" + "famcache/domain/connection" +) + +func (a *Actor) CancelJob(peer connection.Peer, query command.JobCommand) { + logger := *a.logger + + a.jobs.Cancel(query.JobId()) + + logger.Info("Job canceled", "id", query.JobId()) + + query.ReplyOK(peer.Conn(), "OK") +} diff --git a/pkg/actor/register_job.go b/pkg/actor/register_job.go new file mode 100644 index 0000000..16a29ba --- /dev/null +++ b/pkg/actor/register_job.go @@ -0,0 +1,19 @@ +package actor + +import ( + "famcache/domain/command" + "famcache/domain/connection" + "fmt" +) + +func (actor *Actor) RegisterJob(peer connection.Peer, query command.JobCommand) { + logger := *actor.logger + + id := actor.jobs.Add(peer.ID(), query.Delay(), query.IsPeriodic()) + + logger.Info(fmt.Sprintf("Job registered with id %s", id)) + + reply := fmt.Sprintf("%s %s", query.ID(), id) + + query.ReplyOK(peer.Conn(), reply) +} diff --git a/pkg/command/command.go b/pkg/command/command.go index 251d64e..2960277 100644 --- a/pkg/command/command.go +++ b/pkg/command/command.go @@ -7,12 +7,15 @@ import ( ) var strToCommandType = map[string]command.CommandType{ - "GET": command.CommandGet, - "SET": command.CommandSet, - "DELETE": command.CommandDelete, - "PUBLISH": command.CommandPublish, - "SUBSCRIBE": command.CommandSubscribe, - "UNSUBSCRIBE": command.CommandUnsubscribe, + "GET": command.CommandGet, + "SET": command.CommandSet, + "DELETE": command.CommandDelete, + "PUBLISH": command.CommandPublish, + "SUBSCRIBE": command.CommandSubscribe, + "UNSUBSCRIBE": command.CommandUnsubscribe, + "JOB_REGISTER": command.CommandJobRegister, + "JOB_STATUS": command.CommandJobStatus, + "JOB CANCEL": command.CommandJobCancel, } type AbstractCommand struct { @@ -28,6 +31,10 @@ func (c *AbstractCommand) ToPubsubCommand() command.MessagingCommand { return NewPubsubCommand(c.cType, c.query) } +func (c *AbstractCommand) ToJobCommand() command.JobCommand { + return NewJobCommand(c.cType, c.query) +} + func (c *AbstractCommand) IsStoreCommand() bool { return c.cType == command.CommandSet || c.cType == command.CommandGet || c.cType == command.CommandDelete } @@ -36,6 +43,10 @@ func (c *AbstractCommand) IsMessagingCommand() bool { return c.cType == command.CommandPublish || c.cType == command.CommandSubscribe || c.cType == command.CommandUnsubscribe } +func (c *AbstractCommand) IsJobCommand() bool { + return c.cType == command.CommandJobRegister || c.cType == command.CommandJobStatus || c.cType == command.CommandJobCancel +} + func determineCommandType(query string) (command.CommandType, bool) { parts := strings.Fields(strings.TrimSpace(query)) diff --git a/pkg/command/generator.go b/pkg/command/generator.go index 960d8cd..9c7d877 100644 --- a/pkg/command/generator.go +++ b/pkg/command/generator.go @@ -28,3 +28,7 @@ func SubscribeCommand(id, topic string) string { func UnsubscribeCommand(id, topic string) string { return fmt.Sprintf("%s UNSUBSCRIBE %s", id, topic) } + +func JobExecuteCommand(jobId string) string { + return fmt.Sprintf("JOB_EXECUTE %s\n", jobId) +} diff --git a/pkg/command/job_command.go b/pkg/command/job_command.go new file mode 100644 index 0000000..6c64fa8 --- /dev/null +++ b/pkg/command/job_command.go @@ -0,0 +1,97 @@ +package command + +import ( + "famcache/domain/command" + "strconv" + "strings" +) + +type JobCommand struct { + id string + command command.CommandType + isPeriodic bool + delay uint64 + jobId string +} + +func (c *JobCommand) ID() string { + return c.id +} + +func (c *JobCommand) Type() command.CommandType { + return c.command +} + +func (c *JobCommand) IsPeriodic() bool { + return c.isPeriodic +} + +func (c *JobCommand) Delay() uint64 { + return c.delay +} + +func (c *JobCommand) JobId() string { + return c.jobId +} + +// uuid JOB_REGISTER +func newRegisterJobCommand(parts []string) command.JobCommand { + if len(parts) < 3 { + return nil + } + + delayStr := parts[2] + + delay := uint64(0) + + if delayStr != "" { + err := error(nil) + delay, err = strconv.ParseUint(delayStr, 10, 64) + + if err != nil { + return nil + } + } + + isPeriodic := false + + if len(parts) > 3 { + isPeriodic = parts[3] == "true" + } + + return &JobCommand{ + id: parts[0], + command: command.CommandJobRegister, + delay: delay, + isPeriodic: isPeriodic, + jobId: "", + } +} + +// uuid JOB_CANCEL +func newCancelJobCommand(parts []string) command.JobCommand { + if len(parts) < 2 { + return nil + } + + return &JobCommand{ + id: parts[0], + command: command.CommandJobCancel, + delay: 0, + isPeriodic: false, + jobId: parts[1], + } +} + +func NewJobCommand(commandType command.CommandType, query string) command.JobCommand { + parts := strings.Fields(strings.TrimSpace(query)) + + switch commandType { + case command.CommandJobRegister: + return newRegisterJobCommand(parts) + case command.CommandJobCancel: + return newCancelJobCommand(parts) + default: + return nil + } +} diff --git a/pkg/command/job_command_reply.go b/pkg/command/job_command_reply.go new file mode 100644 index 0000000..872d171 --- /dev/null +++ b/pkg/command/job_command_reply.go @@ -0,0 +1,21 @@ +package command + +import "net" + +func (q *JobCommand) ReplyOK(conn net.Conn, response string) { + message := q.ID() + " OK " + response + "\n" + + conn.Write([]byte(message)) +} + +func (q *JobCommand) ReplySuccess(conn net.Conn) { + message := q.ID() + " OK" + "\n" + + conn.Write([]byte(message)) +} + +func (q *JobCommand) ReplyError(conn net.Conn, response string) { + message := q.ID() + " ERROR: " + response + "\n" + + conn.Write([]byte(message)) +} diff --git a/pkg/jobs/job.go b/pkg/jobs/job.go new file mode 100644 index 0000000..aa7ded4 --- /dev/null +++ b/pkg/jobs/job.go @@ -0,0 +1,25 @@ +package jobs + +import ( + job "famcache/domain/jobs" + + "github.com/google/uuid" +) + +type Job struct { + id string + delay uint64 + isPeriodic bool + peerId string +} + +func NewJob(peerId string, delay uint64, isPeriodic bool) job.Job { + id := uuid.New().String() + + return &Job{ + id: id, + delay: delay, + isPeriodic: isPeriodic, + peerId: peerId, + } +} diff --git a/pkg/jobs/job_impl.go b/pkg/jobs/job_impl.go new file mode 100644 index 0000000..330a1d3 --- /dev/null +++ b/pkg/jobs/job_impl.go @@ -0,0 +1,17 @@ +package jobs + +func (j *Job) ID() string { + return j.id +} + +func (j *Job) Delay() uint64 { + return j.delay +} + +func (j *Job) IsPeriodic() bool { + return j.isPeriodic +} + +func (j *Job) PeerId() string { + return j.peerId +} diff --git a/pkg/jobs/job_test.go b/pkg/jobs/job_test.go new file mode 100644 index 0000000..d50fcc9 --- /dev/null +++ b/pkg/jobs/job_test.go @@ -0,0 +1,31 @@ +package jobs_test + +import ( + "famcache/pkg/jobs" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewJob(t *testing.T) { + delay := uint64(5000) + isPeriodic := true + + job := jobs.NewJob("peer-id", delay, isPeriodic) + + assert.Equal(t, delay, job.Delay()) + assert.Equal(t, isPeriodic, job.IsPeriodic()) +} + +func TestJobMethods(t *testing.T) { + delay := uint64(5000) + isPeriodic := true + peerId := "peer-id" + + job := jobs.NewJob(peerId, delay, isPeriodic) + + assert.NotEmpty(t, job.ID()) + assert.Equal(t, delay, job.Delay()) + assert.Equal(t, isPeriodic, job.IsPeriodic()) + assert.Equal(t, peerId, job.PeerId()) +} diff --git a/pkg/jobs/manager.go b/pkg/jobs/manager.go new file mode 100644 index 0000000..751cdac --- /dev/null +++ b/pkg/jobs/manager.go @@ -0,0 +1,15 @@ +package jobs + +import "famcache/domain/jobs" + +type JobsManager struct { + jobs map[string]jobs.Job + triggerChan chan jobs.Job +} + +func NewJobsManager() jobs.JobsManager { + return &JobsManager{ + jobs: make(map[string]jobs.Job), + triggerChan: make(chan jobs.Job), + } +} diff --git a/pkg/jobs/manager_impl.go b/pkg/jobs/manager_impl.go new file mode 100644 index 0000000..611b722 --- /dev/null +++ b/pkg/jobs/manager_impl.go @@ -0,0 +1,44 @@ +package jobs + +import ( + job "famcache/domain/jobs" + mJob "famcache/domain/jobs" + "time" +) + +func (jm *JobsManager) StartJob(job mJob.Job) { + time.Sleep(time.Duration(job.Delay()) * time.Millisecond) + + jm.triggerChan <- job + + if !job.IsPeriodic() { + jm.Cancel(job.ID()) + return + } + + jm.StartJob(job) +} + +func (jm *JobsManager) Add(peerId string, delay uint64, isPeriodic bool) string { + job := NewJob(peerId, delay, isPeriodic) + + jm.jobs[job.ID()] = job + + go jm.StartJob(job) + + return job.ID() +} + +func (jm *JobsManager) Get(id string) (job.Job, bool) { + job, ok := jm.jobs[id] + + return job, ok +} + +func (jm *JobsManager) Cancel(id string) { + delete(jm.jobs, id) +} + +func (jm *JobsManager) Chan() <-chan job.Job { + return jm.triggerChan +} diff --git a/pkg/jobs/manager_test.go b/pkg/jobs/manager_test.go new file mode 100644 index 0000000..e7450d6 --- /dev/null +++ b/pkg/jobs/manager_test.go @@ -0,0 +1,35 @@ +package jobs_test + +import ( + "famcache/pkg/jobs" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewJobManager(t *testing.T) { + jm := jobs.NewJobsManager() + + assert.NotNil(t, jm) +} + +func TestAddJob(t *testing.T) { + jm := jobs.NewJobsManager() + + id := jm.Add("peer-id", 5000, true) + + assert.NotEmpty(t, id) +} + +func TestCancelJob(t *testing.T) { + jm := jobs.NewJobsManager() + peerId := "peer-id" + + id := jm.Add(peerId, 5000, true) + + jm.Cancel(id) + + _, ok := jm.Get(id) + + assert.False(t, ok) +} diff --git a/pkg/server/handler.go b/pkg/server/handler.go index 71d01f2..f39dcb1 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -46,5 +46,11 @@ func (s *Server) handle(peer connection.Peer) { s.handleMessagingCommand(peer, action) } + + if com.IsJobCommand() { + job := com.ToJobCommand() + + s.handleJobCommand(peer, job) + } } } diff --git a/pkg/server/job_command_handler.go b/pkg/server/job_command_handler.go new file mode 100644 index 0000000..2f88692 --- /dev/null +++ b/pkg/server/job_command_handler.go @@ -0,0 +1,17 @@ +package server + +import ( + "famcache/domain/command" + "famcache/domain/connection" +) + +func (s *Server) handleJobCommand(peer connection.Peer, com command.JobCommand) { + switch com.Type() { + case command.CommandJobRegister: + s.actor.RegisterJob(peer, com) + case command.CommandJobCancel: + s.actor.CancelJob(peer, com) + default: + println("Invalid command") + } +} diff --git a/pkg/server/server_impl.go b/pkg/server/server_impl.go index f230e31..cb8d47a 100644 --- a/pkg/server/server_impl.go +++ b/pkg/server/server_impl.go @@ -25,6 +25,8 @@ func (s *Server) Start() error { return domain.ErrCacheStart } + s.actor.Start() + for { conn, err := s.listener.Accept() From 1a28ff7b3d8b6e5d69a38e11cae378afacbc0cb5 Mon Sep 17 00:00:00 2001 From: Shahen Hovhannisyan Date: Thu, 27 Jun 2024 01:10:29 +0400 Subject: [PATCH 2/3] chore(ci): Fix labeler-config.yml --- .github/label-config.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/label-config.yml b/.github/label-config.yml index af7d62f..efa97eb 100644 --- a/.github/label-config.yml +++ b/.github/label-config.yml @@ -5,6 +5,16 @@ ci: - changed-files: - any-glob-to-any-file: [".github/**/*"] +jobs: + - changed-files: + - any-glob-to-any-file: + - pkg/jobs/*.go + - domain/jobs/*.go + - pkg/actor/register_job.go + - pkg/actor/cancel_job.go + - pkg/command/job_command.go + - pkg/command/job_command_reply.go + cache: - changed-files: - any-glob-to-any-file: From deedf5c629727c7c94b0f95edfd761a9a93ee405 Mon Sep 17 00:00:00 2001 From: Shahen Hovhannisyan Date: Thu, 27 Jun 2024 13:09:43 +0400 Subject: [PATCH 3/3] fix(Disconnect): Cleanup all jobs for that peer --- domain/jobs/manager.go | 1 + pkg/actor/actor_impl.go | 20 +++++++++++++++++++- pkg/jobs/manager_impl.go | 10 ++++++++++ pkg/server/handler.go | 2 +- 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/domain/jobs/manager.go b/domain/jobs/manager.go index c788723..9d0e41b 100644 --- a/domain/jobs/manager.go +++ b/domain/jobs/manager.go @@ -4,5 +4,6 @@ type JobsManager interface { Add(peerId string, delay uint64, isPeriodic bool) string Get(id string) (Job, bool) Cancel(id string) + Jobs() []Job Chan() <-chan Job } diff --git a/pkg/actor/actor_impl.go b/pkg/actor/actor_impl.go index d1c5d25..366ec55 100644 --- a/pkg/actor/actor_impl.go +++ b/pkg/actor/actor_impl.go @@ -1,6 +1,9 @@ package actor -import "famcache/pkg/command" +import ( + "famcache/domain/connection" + "famcache/pkg/command" +) func (actor *Actor) ListenFailedMessages() { for { @@ -42,6 +45,21 @@ func (actor *Actor) ListenJobs() { } } +func (actor *Actor) DisconnectPeer(peer connection.Peer) { + logger := *actor.logger + + actor.peers.Remove(peer) + jobs := actor.jobs.Jobs() + + for _, job := range jobs { + if job.PeerId() == peer.ID() { + actor.jobs.Cancel(job.ID()) + } + } + + logger.Info("Client disconnected: " + peer.ID()) +} + func (actor *Actor) Start() { go actor.ListenFailedMessages() go actor.ListenJobs() diff --git a/pkg/jobs/manager_impl.go b/pkg/jobs/manager_impl.go index 611b722..7991aa2 100644 --- a/pkg/jobs/manager_impl.go +++ b/pkg/jobs/manager_impl.go @@ -42,3 +42,13 @@ func (jm *JobsManager) Cancel(id string) { func (jm *JobsManager) Chan() <-chan job.Job { return jm.triggerChan } + +func (jm *JobsManager) Jobs() []job.Job { + jobs := make([]job.Job, len(jm.jobs)) + + for _, job := range jm.jobs { + jobs = append(jobs, job) + } + + return jobs +} diff --git a/pkg/server/handler.go b/pkg/server/handler.go index f39dcb1..ff855a8 100644 --- a/pkg/server/handler.go +++ b/pkg/server/handler.go @@ -15,7 +15,7 @@ func (s *Server) handle(peer connection.Peer) { if err != nil { if err == io.EOF { - (*s.actor.Peers()).Remove(peer) + s.actor.DisconnectPeer(peer) return }