Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jobs): Introducing new feature #6

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/label-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ ci:
- changed-files:
- any-glob-to-any-file: [".github/**/*"]

jobs:
- changed-files:
- any-glob-to-any-file:
- pkg/jobs/*.go
- domain/jobs/*.go
- pkg/actor/register_job.go
- pkg/actor/cancel_job.go
- pkg/command/job_command.go
- pkg/command/job_command_reply.go

cache:
- changed-files:
- any-glob-to-any-file:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.3
1.3.0
11 changes: 11 additions & 0 deletions domain/command/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package command

type JobCommand interface {
ID() string
Type() CommandType
Delay() uint64
IsPeriodic() bool
JobId() string

Reply
}
9 changes: 9 additions & 0 deletions domain/command/reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package command

import "net"

type Reply interface {
ReplyError(conn net.Conn, response string)
ReplySuccess(conn net.Conn)
ReplyOK(conn net.Conn, response string)
}
6 changes: 1 addition & 5 deletions domain/command/store.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package command

import "net"

type StoreCommand interface {
ID() string
Type() CommandType
Key() string
Value() *string
TTL() *uint64

ReplyError(conn net.Conn, response string)
ReplySuccess(conn net.Conn)
ReplyOK(conn net.Conn, response string)
Reply
}
8 changes: 8 additions & 0 deletions domain/command/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ package command
type CommandType = string

const (
// Store commands
CommandGet CommandType = "GET"
CommandSet CommandType = "SET"
CommandDelete CommandType = "DELETE"

// Pubsub commands
CommandPublish CommandType = "PUBLISH"
CommandSubscribe CommandType = "SUBSCRIBE"
CommandUnsubscribe CommandType = "UNSUBSCRIBE"

// Job commands
CommandJobRegister CommandType = "JOB_REGISTER"
CommandJobStatus CommandType = "JOB_STATUS"
CommandJobCancel CommandType = "JOB_CANCEL"
CommandJobExecute CommandType = "JOB_EXECUTE"
)
8 changes: 8 additions & 0 deletions domain/jobs/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package jobs

type Job interface {
ID() string
Delay() uint64
IsPeriodic() bool
PeerId() string
}
9 changes: 9 additions & 0 deletions domain/jobs/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package jobs

type JobsManager interface {
Add(peerId string, delay uint64, isPeriodic bool) string
Get(id string) (Job, bool)
Cancel(id string)
Jobs() []Job
Chan() <-chan Job
}
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ module famcache

go 1.21.6

require github.com/pkg/errors v0.9.1
require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
)

require github.com/google/uuid v1.6.0 // indirect
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.6.0
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 5 additions & 0 deletions pkg/actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package actor
import (
"famcache/domain/cache"
"famcache/domain/connection"
"famcache/domain/jobs"
"famcache/domain/logger"
"famcache/domain/pubsub"
conn "famcache/pkg/connection"
job "famcache/pkg/jobs"
queue "famcache/pkg/pubsub"
"time"
)
Expand All @@ -16,6 +18,7 @@ type Actor struct {
messagingQueue pubsub.Queue
peers connection.PeersManager
queueTicker time.Ticker
jobs jobs.JobsManager
}

func (a *Actor) Peers() *connection.PeersManager {
Expand All @@ -26,12 +29,14 @@ func NewActor(logger *logger.Logger, cache *cache.Cache) Actor {
messagingQueue := queue.NewPubsubQueue()
peers := conn.NewPeersManager()
queueTicker := *time.NewTicker(1 * time.Second)
jobs := job.NewJobsManager()

return Actor{
logger,
cache,
messagingQueue,
peers,
queueTicker,
jobs,
}
}
42 changes: 42 additions & 0 deletions pkg/actor/actor_impl.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package actor

import (
"famcache/domain/connection"
"famcache/pkg/command"
)

func (actor *Actor) ListenFailedMessages() {
for {
<-actor.queueTicker.C
Expand All @@ -22,3 +27,40 @@ func (actor *Actor) ListenFailedMessages() {
}
}
}

func (actor *Actor) ListenJobs() {
for {
job := <-actor.jobs.Chan()
peerId := job.PeerId()

peer := actor.peers.GetById(peerId)

if peer == nil {
continue
}

command := command.JobExecuteCommand(job.ID())

peer.Conn().Write([]byte(command))
}
}

func (actor *Actor) DisconnectPeer(peer connection.Peer) {
logger := *actor.logger

actor.peers.Remove(peer)
jobs := actor.jobs.Jobs()

for _, job := range jobs {
if job.PeerId() == peer.ID() {
actor.jobs.Cancel(job.ID())
}
}

logger.Info("Client disconnected: " + peer.ID())
}

func (actor *Actor) Start() {
go actor.ListenFailedMessages()
go actor.ListenJobs()
}
16 changes: 16 additions & 0 deletions pkg/actor/cancel_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
)

func (a *Actor) CancelJob(peer connection.Peer, query command.JobCommand) {
logger := *a.logger

a.jobs.Cancel(query.JobId())

logger.Info("Job canceled", "id", query.JobId())

query.ReplyOK(peer.Conn(), "OK")
}
19 changes: 19 additions & 0 deletions pkg/actor/register_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
"fmt"
)

func (actor *Actor) RegisterJob(peer connection.Peer, query command.JobCommand) {
logger := *actor.logger

id := actor.jobs.Add(peer.ID(), query.Delay(), query.IsPeriodic())

logger.Info(fmt.Sprintf("Job registered with id %s", id))

reply := fmt.Sprintf("%s %s", query.ID(), id)

query.ReplyOK(peer.Conn(), reply)
}
23 changes: 17 additions & 6 deletions pkg/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
)

var strToCommandType = map[string]command.CommandType{
"GET": command.CommandGet,
"SET": command.CommandSet,
"DELETE": command.CommandDelete,
"PUBLISH": command.CommandPublish,
"SUBSCRIBE": command.CommandSubscribe,
"UNSUBSCRIBE": command.CommandUnsubscribe,
"GET": command.CommandGet,
"SET": command.CommandSet,
"DELETE": command.CommandDelete,
"PUBLISH": command.CommandPublish,
"SUBSCRIBE": command.CommandSubscribe,
"UNSUBSCRIBE": command.CommandUnsubscribe,
"JOB_REGISTER": command.CommandJobRegister,
"JOB_STATUS": command.CommandJobStatus,
"JOB CANCEL": command.CommandJobCancel,
}

type AbstractCommand struct {
Expand All @@ -28,6 +31,10 @@ func (c *AbstractCommand) ToPubsubCommand() command.MessagingCommand {
return NewPubsubCommand(c.cType, c.query)
}

func (c *AbstractCommand) ToJobCommand() command.JobCommand {
return NewJobCommand(c.cType, c.query)
}

func (c *AbstractCommand) IsStoreCommand() bool {
return c.cType == command.CommandSet || c.cType == command.CommandGet || c.cType == command.CommandDelete
}
Expand All @@ -36,6 +43,10 @@ func (c *AbstractCommand) IsMessagingCommand() bool {
return c.cType == command.CommandPublish || c.cType == command.CommandSubscribe || c.cType == command.CommandUnsubscribe
}

func (c *AbstractCommand) IsJobCommand() bool {
return c.cType == command.CommandJobRegister || c.cType == command.CommandJobStatus || c.cType == command.CommandJobCancel
}

func determineCommandType(query string) (command.CommandType, bool) {
parts := strings.Fields(strings.TrimSpace(query))

Expand Down
4 changes: 4 additions & 0 deletions pkg/command/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ func SubscribeCommand(id, topic string) string {
func UnsubscribeCommand(id, topic string) string {
return fmt.Sprintf("%s UNSUBSCRIBE %s", id, topic)
}

func JobExecuteCommand(jobId string) string {
return fmt.Sprintf("JOB_EXECUTE %s\n", jobId)
}
Loading
Loading