Skip to content

Commit

Permalink
Merge pull request #6 from Famcache/feature/1.3.0-jobs
Browse files Browse the repository at this point in the history
feat(jobs): Introducing new feature
  • Loading branch information
shahen94 authored Jun 27, 2024
2 parents 810f2d2 + deedf5c commit 9dc68a8
Show file tree
Hide file tree
Showing 27 changed files with 501 additions and 15 deletions.
10 changes: 10 additions & 0 deletions .github/label-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ ci:
- changed-files:
- any-glob-to-any-file: [".github/**/*"]

jobs:
- changed-files:
- any-glob-to-any-file:
- pkg/jobs/*.go
- domain/jobs/*.go
- pkg/actor/register_job.go
- pkg/actor/cancel_job.go
- pkg/command/job_command.go
- pkg/command/job_command_reply.go

cache:
- changed-files:
- any-glob-to-any-file:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.3
1.3.0
11 changes: 11 additions & 0 deletions domain/command/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package command

type JobCommand interface {
ID() string
Type() CommandType
Delay() uint64
IsPeriodic() bool
JobId() string

Reply
}
9 changes: 9 additions & 0 deletions domain/command/reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package command

import "net"

type Reply interface {
ReplyError(conn net.Conn, response string)
ReplySuccess(conn net.Conn)
ReplyOK(conn net.Conn, response string)
}
6 changes: 1 addition & 5 deletions domain/command/store.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package command

import "net"

type StoreCommand interface {
ID() string
Type() CommandType
Key() string
Value() *string
TTL() *uint64

ReplyError(conn net.Conn, response string)
ReplySuccess(conn net.Conn)
ReplyOK(conn net.Conn, response string)
Reply
}
8 changes: 8 additions & 0 deletions domain/command/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ package command
type CommandType = string

const (
// Store commands
CommandGet CommandType = "GET"
CommandSet CommandType = "SET"
CommandDelete CommandType = "DELETE"

// Pubsub commands
CommandPublish CommandType = "PUBLISH"
CommandSubscribe CommandType = "SUBSCRIBE"
CommandUnsubscribe CommandType = "UNSUBSCRIBE"

// Job commands
CommandJobRegister CommandType = "JOB_REGISTER"
CommandJobStatus CommandType = "JOB_STATUS"
CommandJobCancel CommandType = "JOB_CANCEL"
CommandJobExecute CommandType = "JOB_EXECUTE"
)
8 changes: 8 additions & 0 deletions domain/jobs/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package jobs

type Job interface {
ID() string
Delay() uint64
IsPeriodic() bool
PeerId() string
}
9 changes: 9 additions & 0 deletions domain/jobs/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package jobs

type JobsManager interface {
Add(peerId string, delay uint64, isPeriodic bool) string
Get(id string) (Job, bool)
Cancel(id string)
Jobs() []Job
Chan() <-chan Job
}
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ module famcache

go 1.21.6

require github.com/pkg/errors v0.9.1
require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
)

require github.com/google/uuid v1.6.0 // indirect
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.6.0
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
5 changes: 5 additions & 0 deletions pkg/actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package actor
import (
"famcache/domain/cache"
"famcache/domain/connection"
"famcache/domain/jobs"
"famcache/domain/logger"
"famcache/domain/pubsub"
conn "famcache/pkg/connection"
job "famcache/pkg/jobs"
queue "famcache/pkg/pubsub"
"time"
)
Expand All @@ -16,6 +18,7 @@ type Actor struct {
messagingQueue pubsub.Queue
peers connection.PeersManager
queueTicker time.Ticker
jobs jobs.JobsManager
}

func (a *Actor) Peers() *connection.PeersManager {
Expand All @@ -26,12 +29,14 @@ func NewActor(logger *logger.Logger, cache *cache.Cache) Actor {
messagingQueue := queue.NewPubsubQueue()
peers := conn.NewPeersManager()
queueTicker := *time.NewTicker(1 * time.Second)
jobs := job.NewJobsManager()

return Actor{
logger,
cache,
messagingQueue,
peers,
queueTicker,
jobs,
}
}
42 changes: 42 additions & 0 deletions pkg/actor/actor_impl.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package actor

import (
"famcache/domain/connection"
"famcache/pkg/command"
)

func (actor *Actor) ListenFailedMessages() {
for {
<-actor.queueTicker.C
Expand All @@ -22,3 +27,40 @@ func (actor *Actor) ListenFailedMessages() {
}
}
}

func (actor *Actor) ListenJobs() {
for {
job := <-actor.jobs.Chan()
peerId := job.PeerId()

peer := actor.peers.GetById(peerId)

if peer == nil {
continue
}

command := command.JobExecuteCommand(job.ID())

peer.Conn().Write([]byte(command))
}
}

func (actor *Actor) DisconnectPeer(peer connection.Peer) {
logger := *actor.logger

actor.peers.Remove(peer)
jobs := actor.jobs.Jobs()

for _, job := range jobs {
if job.PeerId() == peer.ID() {
actor.jobs.Cancel(job.ID())
}
}

logger.Info("Client disconnected: " + peer.ID())
}

func (actor *Actor) Start() {
go actor.ListenFailedMessages()
go actor.ListenJobs()
}
16 changes: 16 additions & 0 deletions pkg/actor/cancel_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
)

func (a *Actor) CancelJob(peer connection.Peer, query command.JobCommand) {
logger := *a.logger

a.jobs.Cancel(query.JobId())

logger.Info("Job canceled", "id", query.JobId())

query.ReplyOK(peer.Conn(), "OK")
}
19 changes: 19 additions & 0 deletions pkg/actor/register_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package actor

import (
"famcache/domain/command"
"famcache/domain/connection"
"fmt"
)

func (actor *Actor) RegisterJob(peer connection.Peer, query command.JobCommand) {
logger := *actor.logger

id := actor.jobs.Add(peer.ID(), query.Delay(), query.IsPeriodic())

logger.Info(fmt.Sprintf("Job registered with id %s", id))

reply := fmt.Sprintf("%s %s", query.ID(), id)

query.ReplyOK(peer.Conn(), reply)
}
23 changes: 17 additions & 6 deletions pkg/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
)

var strToCommandType = map[string]command.CommandType{
"GET": command.CommandGet,
"SET": command.CommandSet,
"DELETE": command.CommandDelete,
"PUBLISH": command.CommandPublish,
"SUBSCRIBE": command.CommandSubscribe,
"UNSUBSCRIBE": command.CommandUnsubscribe,
"GET": command.CommandGet,
"SET": command.CommandSet,
"DELETE": command.CommandDelete,
"PUBLISH": command.CommandPublish,
"SUBSCRIBE": command.CommandSubscribe,
"UNSUBSCRIBE": command.CommandUnsubscribe,
"JOB_REGISTER": command.CommandJobRegister,
"JOB_STATUS": command.CommandJobStatus,
"JOB CANCEL": command.CommandJobCancel,
}

type AbstractCommand struct {
Expand All @@ -28,6 +31,10 @@ func (c *AbstractCommand) ToPubsubCommand() command.MessagingCommand {
return NewPubsubCommand(c.cType, c.query)
}

func (c *AbstractCommand) ToJobCommand() command.JobCommand {
return NewJobCommand(c.cType, c.query)
}

func (c *AbstractCommand) IsStoreCommand() bool {
return c.cType == command.CommandSet || c.cType == command.CommandGet || c.cType == command.CommandDelete
}
Expand All @@ -36,6 +43,10 @@ func (c *AbstractCommand) IsMessagingCommand() bool {
return c.cType == command.CommandPublish || c.cType == command.CommandSubscribe || c.cType == command.CommandUnsubscribe
}

func (c *AbstractCommand) IsJobCommand() bool {
return c.cType == command.CommandJobRegister || c.cType == command.CommandJobStatus || c.cType == command.CommandJobCancel
}

func determineCommandType(query string) (command.CommandType, bool) {
parts := strings.Fields(strings.TrimSpace(query))

Expand Down
4 changes: 4 additions & 0 deletions pkg/command/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ func SubscribeCommand(id, topic string) string {
func UnsubscribeCommand(id, topic string) string {
return fmt.Sprintf("%s UNSUBSCRIBE %s", id, topic)
}

func JobExecuteCommand(jobId string) string {
return fmt.Sprintf("JOB_EXECUTE %s\n", jobId)
}
Loading

0 comments on commit 9dc68a8

Please sign in to comment.