From a03d1166c41d0b33302b586a76b26b28c01bf0e0 Mon Sep 17 00:00:00 2001 From: Hiep Date: Wed, 5 Aug 2020 00:26:08 +0800 Subject: [PATCH] Implement all repository functions and move away from mongoClient Part of #34 --- odin-engine/api/jobs.go | 131 +-- odin-engine/api/links.go | 24 +- odin-engine/api/main.go | 32 +- odin-engine/api/stats.go | 48 +- odin-engine/go.mod | 2 + odin-engine/go.sum | 4 + odin-engine/pkg/jobs/client.go | 185 ++++ odin-engine/pkg/jobs/mongoClient.go | 316 ------- odin-engine/pkg/repository/nosql/nosql.go | 312 +++++- .../pkg/repository/nosql/nosql_test.go | 887 +++++++++++++++++- odin-engine/pkg/repository/repository.go | 50 +- 11 files changed, 1522 insertions(+), 469 deletions(-) create mode 100644 odin-engine/pkg/jobs/client.go delete mode 100644 odin-engine/pkg/jobs/mongoClient.go diff --git a/odin-engine/api/jobs.go b/odin-engine/api/jobs.go index 47d70a6..d10e075 100644 --- a/odin-engine/api/jobs.go +++ b/odin-engine/api/jobs.go @@ -2,124 +2,125 @@ package api import ( "bytes" - "fmt" + "fmt" "io/ioutil" "os" "strconv" "strings" "github.com/theycallmemac/odin/odin-engine/pkg/jobs" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" "github.com/theycallmemac/odin/odin-engine/pkg/resources" - "github.com/valyala/fasthttp" - - "go.mongodb.org/mongo-driver/bson" + "github.com/valyala/fasthttp" ) // AddJob is used to create a new job -func AddJob(ctx *fasthttp.RequestCtx) { - body := ctx.PostBody() +func AddJob(client *jobs.Client, ctx *fasthttp.RequestCtx) { + body := ctx.PostBody() path := jobs.SetupEnvironment(body) - client, err := jobs.SetupClient() + id, err := client.Repo.CreateJob(ctx, body, path, "") if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + fmt.Fprintf(ctx, "[FAILED] Job failed to deploy: %v\n", err) } else { - status := jobs.InsertIntoMongo(client, body, path, "") - fmt.Fprintf(ctx, status) + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) deployed successfully\n", id) } } // DeleteJob is used to delete a job -func DeleteJob(ctx *fasthttp.RequestCtx) { +func DeleteJob(client *jobs.Client, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), " ") id, uid := args[0], args[1] - client, err := jobs.SetupClient() - if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + os.RemoveAll("/etc/odin/jobs/" + id) + os.RemoveAll("/etc/odin/logs/" + id) + if err := client.Repo.DeleteJob(ctx, id, uid); err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to remove job (%s): %v\n", id, err) } else { - os.RemoveAll("/etc/odin/jobs/" + id) - os.RemoveAll("/etc/odin/logs/" + id) - if jobs.DeleteJobByValue(client, bson.M{"id": id}, uid) { - fmt.Fprintf(ctx, "Job removed!\n") - } else { - fmt.Fprintf(ctx, "Job with that ID does not exist!\n") - } + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) removed\n", id) } } // UpdateJob is used to update a job -func UpdateJob(ctx *fasthttp.RequestCtx) { +func UpdateJob(client *jobs.Client, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), "_") id, name, description, schedule, uid := args[0], args[1], args[2], args[3], args[4] - client, err := jobs.SetupClient() - if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + job := &repository.Job{ + ID: id, + UID: uid, + } + if resources.NotEmpty(name) { + job.Name = name + } + if resources.NotEmpty(description) { + job.Description = description + } + if resources.NotEmpty(schedule) { + ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654) + resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml"))) + os.Remove(".tmp.yml") + job.Schedule = resp + } + if err := client.Repo.UpdateJob(ctx, job); err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err) } else { - job := jobs.GetJobByValue(client, bson.M{"id": id}, uid) - if resources.NotEmpty(name) { - job.Name = name - } - if resources.NotEmpty(description) { - job.Description = description - } - if resources.NotEmpty(schedule) { - ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654) - resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml"))) - os.Remove(".tmp.yml") - job.Schedule = resp - } - _ = jobs.UpdateJobByValue(client, job) - fmt.Fprintf(ctx, "Updated job " + id + " successfully\n") + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id) } } // GetJobDescription is used to show a job's description -func GetJobDescription(ctx *fasthttp.RequestCtx) { +func GetJobDescription(client *jobs.Client, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), "_") id, uid := args[0], args[1] - client, err := jobs.SetupClient() + job, err := client.Repo.GetJobById(ctx, id, uid) if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + fmt.Fprintf(ctx, "[FAILED] Failed to get job (%s): %v", id, err) } else { - job := jobs.GetJobByValue(client, bson.M{"id": id}, uid) - fmt.Fprintf(ctx, job.Name + " - " + job.Description + "\n") + fmt.Fprintf(ctx, job.Name+" - "+job.Description+"\n") } } // UpdateJobRuns is used to update a job's run number -func UpdateJobRuns(ctx *fasthttp.RequestCtx) { +// TODO: Get and update job should be done as a transaction +func UpdateJobRuns(client *jobs.Client, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), " ") id, runs, uid := args[0], args[1], args[2] - client, err := jobs.SetupClient() + job, err := client.Repo.GetJobById(ctx, id, uid) + if err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", id, err) + return + } + inc, err := strconv.Atoi(runs) if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + fmt.Fprint(ctx, "Invalid run") + return + } + job.Runs = job.Runs + inc + if err := client.Repo.UpdateJob(ctx, job); err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err) } else { - job := jobs.GetJobByValue(client, bson.M{"id": id}, uid) - inc, _ := strconv.Atoi(runs) - job.Runs = job.Runs + inc - _ = jobs.UpdateJobByValue(client, job) + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id) } } // ListJobs is used to list the current jobs running -func ListJobs(ctx *fasthttp.RequestCtx) { - client, err := jobs.SetupClient() +func ListJobs(client *jobs.Client, ctx *fasthttp.RequestCtx) { + uid := string(ctx.PostBody()) + jobList, err := client.Repo.GetUserJobs(ctx, uid) if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") - } else { - jobList := jobs.GetUserJobs(client, string(ctx.PostBody())) - fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE")) - for _, job := range jobList { - linkLen := len(job.Links) - 1 - if linkLen < 0 { - linkLen = 0 - } - fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1])) + fmt.Fprintf(ctx, "[FAILED] Failed to get jobs for user %s\n", uid) + return + } + fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE")) + for _, job := range jobList { + linkLen := len(job.Links) - 1 + if linkLen < 0 { + linkLen = 0 } + fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1])) } } // GetJobLogs is used to retrieve the logs for a job func GetJobLogs(ctx *fasthttp.RequestCtx) { log, _ := ioutil.ReadFile("/etc/odin/logs/" + string(ctx.PostBody())) - fmt.Fprintf(ctx, "\n" + string(log) + "\n") + fmt.Fprintf(ctx, "\n"+string(log)+"\n") } diff --git a/odin-engine/api/links.go b/odin-engine/api/links.go index 6ef356f..5900945 100644 --- a/odin-engine/api/links.go +++ b/odin-engine/api/links.go @@ -1,35 +1,31 @@ package api import ( - "fmt" + "fmt" "strings" "github.com/theycallmemac/odin/odin-engine/pkg/jobs" - "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp" ) // LinkJobs is used to link two jobs together -func LinkJobs(ctx *fasthttp.RequestCtx) { +func LinkJobs(client *jobs.Client, ctx *fasthttp.RequestCtx) { split := strings.Split(string(ctx.PostBody()), "_") from, to, uid := split[0], split[1], split[2] - client, _ := jobs.SetupClient() - updated := jobs.AddJobLink(client, from, to, uid) - if updated == 1 { - fmt.Fprintf(ctx, "Job " + from + " linked to " + to + "!\n") + if err := client.Repo.AddJobLink(ctx, from, to, uid); err != nil { + fmt.Fprintf(ctx, "[FAILED] Job %s could not be linked to %s: %v\n", from, to, err) } else { - fmt.Fprintf(ctx, "Job " + from + " could not be linked to " + to + ".\n") + fmt.Fprintf(ctx, "[SUCCESS] Job %s linked to %s\n", from, to) } } // UnlinkJobs is used to delete a job link -func UnlinkJobs(ctx *fasthttp.RequestCtx) { +func UnlinkJobs(client *jobs.Client, ctx *fasthttp.RequestCtx) { split := strings.Split(string(ctx.PostBody()), "_") from, to, uid := split[0], split[1], split[2] - client, _ := jobs.SetupClient() - updated := jobs.DeleteJobLink(client, from, to, uid) - if updated == 1 { - fmt.Fprintf(ctx, "Job " + to + " unlinked from " + from + "!\n") + if err := client.Repo.DeleteJobLink(ctx, from, to, uid); err != nil { + fmt.Fprintf(ctx, "[FAILED] Link %s could not be unlinked from %s: %v\n", to, from, err) } else { - fmt.Fprintf(ctx, "Job " + to + " has no links!\n") + fmt.Fprintf(ctx, "[SUCCESS] Link %s unlinked from %s\n", to, from) } } diff --git a/odin-engine/api/main.go b/odin-engine/api/main.go index a087460..ba96404 100644 --- a/odin-engine/api/main.go +++ b/odin-engine/api/main.go @@ -29,9 +29,9 @@ func SetOdinEnv(mongoURL string) { // Service is a type to be used by the raft consensus protocol // consists of a base http address and a store in the finite state machine type Service struct { - addr string - store fsm.Store - repo repository.Repository + addr string + store fsm.Store + client *jobs.Client } // NewService is used to initialize a new service struct @@ -39,9 +39,9 @@ type Service struct { // returns: *Service (a newly initialized service struct) func NewService(addr string, store fsm.Store, repo repository.Repository) *Service { return &Service{ - addr: addr, - store: store, - repo: repo, + addr: addr, + store: store, + client: jobs.NewClient(repo), } } @@ -60,29 +60,29 @@ func (service *Service) Start() { case "/execute/yaml": ExecuteYaml(ctx) case "/jobs/add": - AddJob(ctx) + AddJob(service.client, ctx) case "/jobs/delete": - DeleteJob(ctx) + DeleteJob(service.client, ctx) case "/jobs/info/update": - UpdateJob(ctx) + UpdateJob(service.client, ctx) case "/jobs/info/description": - GetJobDescription(ctx) + GetJobDescription(service.client, ctx) case "/jobs/info/runs": - UpdateJobRuns(ctx) + UpdateJobRuns(service.client, ctx) case "/jobs/list": - ListJobs(ctx) + ListJobs(service.client, ctx) case "/jobs/logs": GetJobLogs(ctx) case "/links/add": - LinkJobs(ctx) + LinkJobs(service.client, ctx) case "/links/delete": - UnlinkJobs(ctx) + UnlinkJobs(service.client, ctx) case "/schedule": GetJobSchedule(ctx) case "/stats/add": - AddJobStats(ctx) + AddJobStats(service.client, ctx) case "/stats/get": - GetJobStats(service.repo, ctx) + GetJobStats(service.client, ctx) } } diff --git a/odin-engine/api/stats.go b/odin-engine/api/stats.go index 2c7c1b1..92821d6 100644 --- a/odin-engine/api/stats.go +++ b/odin-engine/api/stats.go @@ -1,15 +1,12 @@ package api import ( - "context" "fmt" "strings" "github.com/theycallmemac/odin/odin-engine/pkg/jobs" "github.com/theycallmemac/odin/odin-engine/pkg/repository" "github.com/valyala/fasthttp" - - "go.mongodb.org/mongo-driver/mongo" ) // JobStats is a type to be used for accessing and storing job stats information @@ -22,45 +19,28 @@ type JobStats struct { } // AddJobStats is used to parse collected metrics -func AddJobStats(ctx *fasthttp.RequestCtx) { +func AddJobStats(client *jobs.Client, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), ",") typeOfValue, desc, value, id, timestamp := args[0], args[1], args[2], args[3], args[4] - client, err := jobs.SetupClient() - if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") - } else { - if InsertIntoMongo(client, typeOfValue, desc, value, id, timestamp) { - fmt.Fprintf(ctx, "200") - } else { - fmt.Fprintf(ctx, "500") - } + js := &repository.JobStats{ + ID: id, + Description: desc, + Type: typeOfValue, + Value: value, + Timestamp: timestamp, } -} - -// InsertIntoMongo is used to add collected metrics to the observability collection -// parameters: client (a *mongo.Client), typeOfValue (a string of the type of value being stored), desc (a string describing the value being stored), value (a string of the value being stored), id (a string of the associated Job ID), timestamp (a string of the unix time at which the operation took place) -// returns: bool (true is successful, false if otherwise) -func InsertIntoMongo(client *mongo.Client, typeOfValue string, desc string, value string, id string, timestamp string) bool { - var js JobStats - js.ID = id - js.Description = desc - js.Type = typeOfValue - js.Value = value - js.Timestamp = timestamp - collection := client.Database("odin").Collection("observability") - _, err := collection.InsertOne(context.TODO(), js) - client.Disconnect(context.TODO()) - if err != nil { - return false + if err := client.Repo.CreateJobStats(ctx, js); err != nil { + fmt.Fprintf(ctx, "500") + } else { + fmt.Fprintf(ctx, "200") } - return true } // GetJobStats is used to show stats collected by a specified job -func GetJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) { - statsList, err := repo.GetJobStats(ctx, string(ctx.PostBody())) +func GetJobStats(client *jobs.Client, ctx *fasthttp.RequestCtx) { + statsList, err := client.Repo.GetJobStats(ctx, string(ctx.PostBody())) if err != nil { - fmt.Fprintf(ctx, "[ERROR] Cannot get job stats: %v", err) + fmt.Fprintf(ctx, "[FAILED] Cannot get job stats: %v\n", err) } else { for _, stat := range statsList { fmt.Fprintf(ctx, jobs.Format(stat.ID, stat.Description, stat.Type, stat.Value)) diff --git a/odin-engine/go.mod b/odin-engine/go.mod index 9f94482..5e768c4 100644 --- a/odin-engine/go.mod +++ b/odin-engine/go.mod @@ -2,6 +2,8 @@ module github.com/theycallmemac/odin/odin-engine go 1.13 +replace github.com/hidal-go/hidalgo => github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06 + require ( github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/hashicorp/raft v1.1.2 diff --git a/odin-engine/go.sum b/odin-engine/go.sum index a4af58a..f24845f 100644 --- a/odin-engine/go.sum +++ b/odin-engine/go.sum @@ -116,6 +116,10 @@ github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:hBE4LGxApbZiV/3YoEPv7uYlUMWOogG1hwtkpiU87zQ= github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0= +github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:PmwFwRyxsWbp6bGa+cNbfDdJm3cyz41ut83kblKc6o8= +github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0= +github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06 h1:ZKIptNmc3Mfb8pNDMz3kM2dZAWliF53qZFno8XcqbRw= +github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/odin-engine/pkg/jobs/client.go b/odin-engine/pkg/jobs/client.go new file mode 100644 index 0000000..a27afcc --- /dev/null +++ b/odin-engine/pkg/jobs/client.go @@ -0,0 +1,185 @@ +package jobs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "os/user" + "strconv" + "strings" + "time" + + "github.com/lnquy/cron" + "github.com/theycallmemac/odin/odin-engine/pkg/fsm" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" + "github.com/theycallmemac/odin/odin-engine/pkg/resources" + "github.com/theycallmemac/odin/odin-engine/pkg/types" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + + "gopkg.in/yaml.v2" +) + +// URI is used to store the address to the MongoDB instance used by the Odin Engine +var URI = resources.UnmarsharlYaml(resources.ReadFileBytes(getHome() + "/odin-config.yml")).Mongo.Address + +// NewJob is a type to be used for accessing and storing job information +type NewJob struct { + ID string `yaml:"id"` + UID string `yaml:"uid"` + GID string `yaml:"gid"` + Name string `yaml:"name"` + Description string `yaml:"description"` + Language string `yaml:"language"` + File string `yaml:"file"` + Stats string `yaml:"stats"` + Schedule string `yaml:"schedule"` + Runs int + Links string +} + +// JobStats is a type to be used for accessing and storing job stats information +type JobStats struct { + ID string + Description string + Type string + Value string +} + +type Client struct { + Repo repository.Repository +} + +func NewClient(repo repository.Repository) *Client { + return &Client{ + Repo: repo, + } +} + +// getHome is used to get the path to the user's home directory +// parameters: nil +// return string (the path to the user's home) +func getHome() string { + usr, _ := user.Current() + return usr.HomeDir +} + +// unmarsharlYaml is used to unmarshal YAML +// parameters: byteArray (an array of bytes representing the contents of a file) +// returns: Config (a struct form of the YAML) +func unmarsharlYaml(byteArray []byte) types.EngineConfig { + var cfg types.EngineConfig + err := yaml.Unmarshal([]byte(byteArray), &cfg) + if err != nil { + log.Fatalf("error: %v", err) + } + return cfg +} + +// SetupClient is used to set up a MongoDB client and test it with a ping command +// parameters: nil +// returns: *mogno.Client (a client) +func SetupClient() (*mongo.Client, error) { + c := getMongoClient() + ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) + err := c.Ping(ctx, readpref.Primary()) + if err != nil { + fmt.Println("Cannot connect to MongoDB: ", err) + c.Disconnect(context.TODO()) + } + return c, err +} + +// getMongoClient is used to get a MongoDB Client and set it's options +// parameters: none +// returns: *mogno.Client (a client) +func getMongoClient() *mongo.Client { + clientOptions := options.Client().ApplyURI(URI) + client, err := mongo.NewClient(clientOptions) + if err != nil { + log.Fatal(err) + } + err = client.Connect(context.Background()) + if err != nil { + log.Fatal(err) + } + return client +} + +// GetAll is used to return all jobs in MongoDB +// parameters: client (a *mongo.Client) +// returns: []NewJob (all jobs in the Mongo instance) +func GetAll(client *mongo.Client) []NewJob { + var jobs []NewJob + collection := client.Database("odin").Collection("jobs") + documents, _ := collection.Find(context.TODO(), bson.D{}) + client.Disconnect(context.TODO()) + for documents.Next(context.TODO()) { + var job NewJob + documents.Decode(&job) + jobs = append(jobs, job) + } + return jobs +} + +// Format is used to format the output of MongoDB stat contents +// parameters: id, description, valType, value (four strings corresponding to individual job stats) +// returns: string (a space formatted string used for display) +func Format(id string, description string, valType string, value string) string { + return fmt.Sprintf("%-20s%-20s%-20s%-20s\n", id, description, valType, value) +} + +// SchFormat is used to parse and format the output of the MongoDB schedule contents +// parameters: id, name, description, schedule (four strings corresponding to individual job data) +// returns: string (a space formatted string used for display) +func SchFormat(id string, name, string, description string, links string, schedule string) string { + var finalSchedule = "" + var tmpSchedule = "" + if schedule == "0 5 31 2 *" { + finalSchedule = "never" + } else if schedule != "SCHEDULE" { + scheduleArray := strings.Split(schedule, ",") + for i, item := range scheduleArray { + descriptor, _ := cron.NewDescriptor() + tmpSchedule, _ = descriptor.ToDescription(item, cron.Locale_en) + if i+1 == len(scheduleArray) { + finalSchedule += tmpSchedule + } else { + finalSchedule += tmpSchedule + " & " + } + } + } else { + finalSchedule = schedule + } + return fmt.Sprintf("%-20s%-20s%-20s%-20s%-20s\n", id, name, description, links, finalSchedule) +} + +// RunLinks is used to run jobs linked to a job which has just been executed +// parameters: links (a string array of Job ID's to execute), uid (a uint32 of that user's id), httpAddr (a string port of the master node), store (a fsm.Store containing information about other nodes) +// returns: nil +func RunLinks(links []string, uid uint32, httpAddr string, store fsm.Store) { + client, _ := SetupClient() + var jobs []Node + var node Node + for _, link := range links { + job := GetJobByValue(client, bson.M{"id": string(link)}, fmt.Sprint(uid)) + node.ID, node.Lang, node.File, node.Links = job.ID, job.Language, job.File, job.Links + uid, _ := strconv.ParseUint(job.UID, 10, 32) + gid, _ := strconv.ParseUint(job.GID, 10, 32) + node.UID = uint32(uid) + node.GID = uint32(gid) + jobs = append(jobs, node) + } + client.Disconnect(context.TODO()) + var en ExecNode + jobsArray, _ := json.Marshal(jobs) + en.Items = jobsArray + en.Store = store + buffer, _ := json.Marshal(en) + go MakePostRequest("http://localhost"+httpAddr+"/execute", bytes.NewBuffer(buffer)) +} diff --git a/odin-engine/pkg/jobs/mongoClient.go b/odin-engine/pkg/jobs/mongoClient.go deleted file mode 100644 index a8a75fa..0000000 --- a/odin-engine/pkg/jobs/mongoClient.go +++ /dev/null @@ -1,316 +0,0 @@ -package jobs - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "log" - "os/user" - "strconv" - "strings" - "time" - - "github.com/lnquy/cron" - "github.com/theycallmemac/odin/odin-engine/pkg/fsm" - "github.com/theycallmemac/odin/odin-engine/pkg/resources" - "github.com/theycallmemac/odin/odin-engine/pkg/types" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readpref" - - "gopkg.in/yaml.v2" -) - -// URI is used to store the address to the MongoDB instance used by the Odin Engine -var URI = resources.UnmarsharlYaml(resources.ReadFileBytes(getHome() + "/odin-config.yml")).Mongo.Address - -// NewJob is a type to be used for accessing and storing job information -type NewJob struct { - ID string `yaml:"id"` - UID string `yaml:"uid"` - GID string `yaml:"gid"` - Name string `yaml:"name"` - Description string `yaml:"description"` - Language string `yaml:"language"` - File string `yaml:"file"` - Stats string `yaml:"stats"` - Schedule string `yaml:"schedule"` - Runs int - Links string -} - -// JobStats is a type to be used for accessing and storing job stats information -type JobStats struct { - ID string - Description string - Type string - Value string -} - - -// getHome is used to get the path to the user's home directory -// parameters: nil -// return string (the path to the user's home) -func getHome() string { - usr, _ := user.Current() - return usr.HomeDir -} - -// unmarsharlYaml is used to unmarshal YAML -// parameters: byteArray (an array of bytes representing the contents of a file) -// returns: Config (a struct form of the YAML) -func unmarsharlYaml(byteArray []byte) types.EngineConfig { - var cfg types.EngineConfig - err := yaml.Unmarshal([]byte(byteArray), &cfg) - if err != nil { - log.Fatalf("error: %v", err) - } - return cfg -} - -// SetupClient is used to set up a MongoDB client and test it with a ping command -// parameters: nil -// returns: *mogno.Client (a client) -func SetupClient() (*mongo.Client, error) { - c := getMongoClient() - ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) - err := c.Ping(ctx, readpref.Primary()) - if err != nil { - fmt.Println("Cannot connect to MongoDB: ", err) - c.Disconnect(context.TODO()) - } - return c, err -} - -// getMongoClient is used to get a MongoDB Client and set it's options -// parameters: none -// returns: *mogno.Client (a client) -func getMongoClient() *mongo.Client { - clientOptions := options.Client().ApplyURI(URI) - client, err := mongo.NewClient(clientOptions) - if err != nil { - log.Fatal(err) - } - err = client.Connect(context.Background()) - if err != nil { - log.Fatal(err) - } - return client -} - -// InsertIntoMongo is used to add information to the MongoDB instance -// parameters: client (a *mongo.Client), d (a byte array containing marshaled JSON), and path (a string to set as the new job.File) -// returns: interface{} (an interface on the insertion results) -func InsertIntoMongo(client *mongo.Client, d []byte, path string, uid string) string { - var job NewJob - json.Unmarshal(d, &job) - job.File = path - job.Runs = 0 - if string(GetJobByValue(client, bson.M{"id": string(job.ID)}, uid).ID) == string(job.ID) { - return "Job with ID: " + job.ID + " already exists\n" - } - collection := client.Database("odin").Collection("jobs") - _, err := collection.InsertOne(context.TODO(), job) - client.Disconnect(context.TODO()) - if err != nil { - log.Fatalln("Error on inserting new job", err) - } - return "Job: " + job.ID + " deployed successfully\n" -} - -// GetJobStats is used to retrieve the stats associated with each job from the MongoDB instance -// parameters: client (a *mongo.Client), id (a string representation of a job's id) -// returns: []JobStats (the collection of fetched job stats) -func GetJobStats(client *mongo.Client, id string) []JobStats { - var statMap map[string]string - var jobStats JobStats - var statsList []JobStats - collection := client.Database("odin").Collection("observability") - documents, _ := collection.Find(context.TODO(), bson.M{"id": id}) - client.Disconnect(context.TODO()) - for documents.Next(context.TODO()) { - documents.Decode(&statMap) - jobStats.ID = statMap["id"] - jobStats.Description = statMap["desc"] - jobStats.Type = statMap["type"] - jobStats.Value = statMap["value"] - statsList = append(statsList, jobStats) - } - return statsList -} - -// GetJobByValue is used to return a job in MongoDB by filtering on a certain value pertaining to that job -// parameters: client (a *mongo.Client), filter (a bson encoding of a job id), uid (a string of the user's ID) -// returns: NewJob (the fetched job) -func GetJobByValue(client *mongo.Client, filter bson.M, uid string) NewJob { - var job NewJob - collection := client.Database("odin").Collection("jobs") - documentReturned := collection.FindOne(context.TODO(), filter) - documentReturned.Decode(&job) - if job.UID == uid { - return job - } - var tmp NewJob - return tmp -} - -// GetUserJobs is used to return a specific user's jobs from MongoDB -// parameters: client (a *mongo.Client), uid (a string of that user's id) -// returns: []NewJob (all jobs in the Mongo instance) -func GetUserJobs(client *mongo.Client, uid string) []NewJob { - var jobs []NewJob - collection := client.Database("odin").Collection("jobs") - documents, _ := collection.Find(context.TODO(), bson.D{}) - client.Disconnect(context.TODO()) - for documents.Next(context.TODO()) { - var job NewJob - documents.Decode(&job) - if job.UID == uid || uid == "0" { - jobs = append(jobs, job) - } - } - return jobs -} - -// GetAll is used to return all jobs in MongoDB -// parameters: client (a *mongo.Client) -// returns: []NewJob (all jobs in the Mongo instance) -func GetAll(client *mongo.Client) []NewJob { - var jobs []NewJob - collection := client.Database("odin").Collection("jobs") - documents, _ := collection.Find(context.TODO(), bson.D{}) - client.Disconnect(context.TODO()) - for documents.Next(context.TODO()) { - var job NewJob - documents.Decode(&job) - jobs = append(jobs, job) - } - return jobs -} - -// Format is used to format the output of MongoDB stat contents -// parameters: id, description, valType, value (four strings corresponding to individual job stats) -// returns: string (a space formatted string used for display) -func Format(id string, description string, valType string, value string) string { - return fmt.Sprintf("%-20s%-20s%-20s%-20s\n", id, description, valType, value) -} - -// SchFormat is used to parse and format the output of the MongoDB schedule contents -// parameters: id, name, description, schedule (four strings corresponding to individual job data) -// returns: string (a space formatted string used for display) -func SchFormat(id string, name, string, description string, links string, schedule string) string { - var finalSchedule = "" - var tmpSchedule = "" - if schedule == "0 5 31 2 *" { - finalSchedule = "never" - } else if schedule != "SCHEDULE" { - scheduleArray := strings.Split(schedule, ",") - for i, item := range scheduleArray { - descriptor, _ := cron.NewDescriptor() - tmpSchedule, _ = descriptor.ToDescription(item, cron.Locale_en) - if i+1 == len(scheduleArray) { - finalSchedule += tmpSchedule - } else { - finalSchedule += tmpSchedule + " & " - } - } - } else { - finalSchedule = schedule - } - return fmt.Sprintf("%-20s%-20s%-20s%-20s%-20s\n", id, name, description, links, finalSchedule) -} - -// UpdateJobByValue is used to modify a job in MongoDB -// parameters: client (a *mongo.Client), job (a NewJob structure) -// returns: int64 (value of the number of entries modified) -func UpdateJobByValue(client *mongo.Client, job NewJob) int64 { - update := bson.M{"$set": bson.M{"name": job.Name, "description": job.Description, "schedule": job.Schedule, "runs": job.Runs}} - collection := client.Database("odin").Collection("jobs") - updateResult, err := collection.UpdateOne(context.TODO(), bson.M{"id": job.ID}, update) - client.Disconnect(context.TODO()) - if err != nil { - return int64(0) - } - return updateResult.ModifiedCount -} - -// DeleteJobByValue is used to delete a job in MongoDB -// parameters: parameters: client (a *mongo.Client), filter (a bson encoding of a job id), uid (a string of the user's ID) -// returns: bool (whether a job was deleted or not) -func DeleteJobByValue(client *mongo.Client, filter bson.M, uid string) bool { - job := GetJobByValue(client, filter, uid) - if job.ID == "" || job.UID != uid { - return false - } - collection := client.Database("odin").Collection("jobs") - _, err := collection.DeleteOne(context.TODO(), filter) - client.Disconnect(context.TODO()) - if err != nil { - return false - } - return true -} - -// AddJobLink is used to add links the job is associated with -// parameters: client (a *mongo.Client), from (a string of a job ID to give a new link), to (a string of a job ID to create a link to), uid (a string of the user's ID) -// returns: int64 (value of the number of entries modified) -func AddJobLink(client *mongo.Client, from string, to string, uid string) int64 { - job := GetJobByValue(client, bson.M{"id": string(from)}, uid) - if strings.Contains(job.Links, to) { - return 0 - } - job.Links = job.Links + to + "," - update := bson.M{"$set": bson.M{"links": job.Links}} - collection := client.Database("odin").Collection("jobs") - updateResult, _ := collection.UpdateOne(context.TODO(), bson.M{"id": from}, update) - client.Disconnect(context.TODO()) - return updateResult.ModifiedCount -} - -// DeleteJobLink is used to delete links the job is associated with -// parameters: client (a *mongo.Client), from (a string of a job ID to remove a link from), to (a string of a job ID to remove), uid (a string of the user's ID) -// returns: int64 (value of the number of entries modified) -func DeleteJobLink(client *mongo.Client, from string, to string, uid string) int64 { - var newLinks string - job := GetJobByValue(client, bson.M{"id": string(from)}, uid) - links := strings.Split(job.Links, ",") - for _, link := range links { - if link != to && link != "" { - newLinks = newLinks + link + "," - } - } - update := bson.M{"$set": bson.M{"links": newLinks}} - collection := client.Database("odin").Collection("jobs") - updateResult, _ := collection.UpdateOne(context.TODO(), bson.M{"id": job.ID}, update) - client.Disconnect(context.TODO()) - return updateResult.ModifiedCount -} - - -// RunLinks is used to run jobs linked to a job which has just been executed -// parameters: links (a string array of Job ID's to execute), uid (a uint32 of that user's id), httpAddr (a string port of the master node), store (a fsm.Store containing information about other nodes) -// returns: nil -func RunLinks(links []string, uid uint32, httpAddr string, store fsm.Store) { - client, _ := SetupClient() - var jobs []Node - var node Node - for _, link := range links { - job := GetJobByValue(client, bson.M{"id": string(link)}, fmt.Sprint(uid)) - node.ID, node.Lang, node.File, node.Links = job.ID, job.Language, job.File, job.Links - uid, _ := strconv.ParseUint(job.UID, 10, 32) - gid, _ := strconv.ParseUint(job.GID, 10, 32) - node.UID = uint32(uid) - node.GID = uint32(gid) - jobs = append(jobs, node) - } - client.Disconnect(context.TODO()) - var en ExecNode - jobsArray, _ := json.Marshal(jobs) - en.Items = jobsArray - en.Store = store - buffer, _ := json.Marshal(en) - go MakePostRequest("http://localhost"+httpAddr+"/execute", bytes.NewBuffer(buffer)) -} diff --git a/odin-engine/pkg/repository/nosql/nosql.go b/odin-engine/pkg/repository/nosql/nosql.go index 034de41..80b5391 100644 --- a/odin-engine/pkg/repository/nosql/nosql.go +++ b/odin-engine/pkg/repository/nosql/nosql.go @@ -2,6 +2,10 @@ package nosql import ( "context" + "encoding/json" + "errors" + "fmt" + "strings" "github.com/hidal-go/hidalgo/legacy/nosql/mongo" @@ -27,30 +31,188 @@ func init() { }) } +var ( + ErrInvalidArgument = errors.New("Invalid argument") +) + +// Repository is implementation of Repository interface type Repository struct { db nosql.Database } func (repo *Repository) ensureIndex(ctx context.Context) error { - return repo.db.EnsureIndex( + if err := repo.db.EnsureIndex( ctx, base.ObservabilityTable, nosql.Index{ - Fields: []string{"_id"}, + Fields: []string{"id"}, Type: nosql.StringExact, }, - []nosql.Index{ - { - Fields: []string{"id"}, - Type: nosql.StringExact, - }, + nil, + ); err != nil { + return err + } + + if err := repo.db.EnsureIndex( + ctx, + base.JobTable, + nosql.Index{ + Fields: []string{"id"}, + Type: nosql.StringExact, + }, + nil, + ); err != nil { + return err + } + + return nil +} + +// CreateJob creates a new job for an user +func (repo *Repository) CreateJob(ctx context.Context, data []byte, path string, uid string) (string, error) { + job := &base.Job{} + if err := json.Unmarshal(data, job); err != nil { + return "", err + } + if job.ID == "" { + return "", ErrInvalidArgument + } else if _, err := repo.GetJobById(ctx, job.ID, uid); err == nil { + return "", fmt.Errorf("job with id %s exists", job.ID) + } else if err != nosql.ErrNotFound { + return "", err + } + job.File = path + job.Runs = 0 + doc := marshalJob(job) + _, err := repo.db.Insert(ctx, base.JobTable, []string{job.ID}, doc) + return job.ID, err +} + +// GetJobById returns a job by filtering on a certain value pertaining to that job +func (repo *Repository) GetJobById(ctx context.Context, id string, uid string) (*base.Job, error) { + if id == "" { + return nil, ErrInvalidArgument + } + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(id), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), }, - ) + ).One(ctx) + + if err != nil { + return nil, err + } + + job := &base.Job{} + unmarshalJob(doc, job) + return job, nil +} + +// GetUserJobs returns all jobs belonging to an user +func (repo *Repository) GetUserJobs(ctx context.Context, uid string) ([]*base.Job, error) { + if uid == "" { + return nil, ErrInvalidArgument + } + iter := repo.db.Query(base.JobTable).WithFields(nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }).Iterate() + + if iter.Err() != nil { + return nil, iter.Err() + } + defer iter.Close() + + results := make([]*base.Job, 0) + for iter.Next(ctx) { + doc := iter.Doc() + job := &base.Job{} + unmarshalJob(doc, job) + results = append(results, job) + } + return results, nil } -func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobStats, error) { +// GetAll returns all jobs +func (repo *Repository) GetAll(ctx context.Context) ([]*base.Job, error) { + iter := repo.db.Query(base.JobTable).Iterate() + if iter.Err() != nil { + return nil, iter.Err() + } + defer iter.Close() + + results := make([]*base.Job, 0) + for iter.Next(ctx) { + doc := iter.Doc() + job := &base.Job{} + unmarshalJob(doc, job) + results = append(results, job) + } + return results, nil +} + +// UpdateJob modifies a job +func (repo *Repository) UpdateJob(ctx context.Context, job *base.Job) error { + if job.ID == "" { + return ErrInvalidArgument + } + key := []string{job.ID} + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(job.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(job.UID), + }, + ).One(ctx) + if err != nil { + return err + } + doc["name"] = nosql.String(job.Name) + doc["description"] = nosql.String(job.Description) + doc["schedule"] = nosql.String(job.Schedule) + doc["runs"] = nosql.Int(job.Runs) + return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx) +} + +// DeleteJob deletes an user's job +func (repo *Repository) DeleteJob(ctx context.Context, id string, uid string) error { + if id == "" { + return ErrInvalidArgument + } + return repo.db.Delete(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(id), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).Do(ctx) +} + +// GetJobStats returns stats of a job given the job id +func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]*base.JobStats, error) { + if id == "" { + return nil, ErrInvalidArgument + } iter := repo.db.Query(base.ObservabilityTable).WithFields(nosql.FieldFilter{ - Path: []string{"id"}, + Path: []string{"_id"}, Filter: nosql.Equal, Value: nosql.String(id), }).Iterate() @@ -60,20 +222,136 @@ func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobS } defer iter.Close() - results := make([]base.JobStats, 0) + results := make([]*base.JobStats, 0) for iter.Next(ctx) { doc := iter.Doc() - jobStats := base.JobStats{ - ID: string(doc["id"].(nosql.String)), - Description: string(doc["desc"].(nosql.String)), - Type: string(doc["type"].(nosql.String)), - Value: string(doc["value"].(nosql.String)), - } + jobStats := &base.JobStats{} + unmarshalJobStats(doc, jobStats) results = append(results, jobStats) } return results, nil } +// CreateJobStats creates a new job for an user +func (repo *Repository) CreateJobStats(ctx context.Context, js *base.JobStats) error { + if js.ID == "" { + return ErrInvalidArgument + } else if _, err := repo.GetJobStats(ctx, js.ID); err == nil { + return fmt.Errorf("job stats with id %s exists", js.ID) + } else if err != nosql.ErrNotFound { + return err + } + doc := marshalJobStats(js) + _, err := repo.db.Insert(ctx, base.JobTable, []string{js.ID}, doc) + return err +} + +// AddJobLink is used to add links the job is associated with +func (repo *Repository) AddJobLink(ctx context.Context, from string, to string, uid string) error { + if from == "" || to == "" { + return ErrInvalidArgument + } + key := []string{from} + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(from), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).One(ctx) + if err != nil { + return err + } + doc["links"] = nosql.String(string(doc["links"].(nosql.String)) + to + ",") + return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx) +} + +// DeleteJobLink is used to delete links the job is associated with +func (repo *Repository) DeleteJobLink(ctx context.Context, from string, to string, uid string) error { + if from == "" || to == "" { + return ErrInvalidArgument + } + key := []string{from} + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(from), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).One(ctx) + if err != nil { + return err + } + links := strings.Split(string(doc["links"].(nosql.String)), ",") + newLinks := "" + for _, link := range links { + if link != to && link != "" { + newLinks = newLinks + link + "," + } + } + doc["links"] = nosql.String(newLinks) + return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx) +} + +// Close closes db connection func (repo *Repository) Close() error { return repo.db.Close() } + +func unmarshalJob(doc nosql.Document, job *base.Job) { + job.ID = string(doc["id"].(nosql.String)) + job.UID = string(doc["uid"].(nosql.String)) + job.GID = string(doc["gid"].(nosql.String)) + job.Name = string(doc["name"].(nosql.String)) + job.Description = string(doc["description"].(nosql.String)) + job.Language = string(doc["language"].(nosql.String)) + job.File = string(doc["file"].(nosql.String)) + job.Stats = string(doc["stats"].(nosql.String)) + job.Schedule = string(doc["schedule"].(nosql.String)) + job.Runs = int(doc["runs"].(nosql.Int)) + job.Links = string(doc["links"].(nosql.String)) +} + +func marshalJob(job *base.Job) nosql.Document { + return nosql.Document{ + "id": nosql.String(job.ID), + "uid": nosql.String(job.UID), + "gid": nosql.String(job.GID), + "name": nosql.String(job.Name), + "description": nosql.String(job.Description), + "language": nosql.String(job.Language), + "file": nosql.String(job.File), + "stats": nosql.String(job.Stats), + "schedule": nosql.String(job.Schedule), + "runs": nosql.Int(job.Runs), + "links": nosql.String(job.Links), + } +} + +func unmarshalJobStats(doc nosql.Document, js *base.JobStats) { + js.ID = string(doc["id"].(nosql.String)) + js.Description = string(doc["desc"].(nosql.String)) + js.Type = string(doc["type"].(nosql.String)) + js.Value = string(doc["value"].(nosql.String)) + js.Timestamp = string(doc["timestamp"].(nosql.String)) +} + +func marshalJobStats(js *base.JobStats) nosql.Document { + return nosql.Document{ + "id": nosql.String(js.ID), + "desc": nosql.String(js.Description), + "type": nosql.String(js.Type), + "value": nosql.String(js.Value), + "timestamp": nosql.String(js.Timestamp), + } +} diff --git a/odin-engine/pkg/repository/nosql/nosql_test.go b/odin-engine/pkg/repository/nosql/nosql_test.go index 01dc560..acfeac6 100644 --- a/odin-engine/pkg/repository/nosql/nosql_test.go +++ b/odin-engine/pkg/repository/nosql/nosql_test.go @@ -2,6 +2,7 @@ package nosql import ( "context" + "errors" "testing" "github.com/hidal-go/hidalgo/legacy/nosql" @@ -17,7 +18,7 @@ import ( // use mongo as default nosql test db func testRepo(t *testing.T) *Repository { // TODO: pass mongo url as env variable instead - db, err := mongo.Dial("localhost:27017", base.DefaultDatabase, nosql.Options(nil)) + db, err := mongo.Dial("localhost:27017", base.DefaultDatabase, nil) require.NoError(t, err) return &Repository{ db: db, @@ -32,9 +33,883 @@ func cleanUp(t *testing.T, ctx context.Context) { client.Connect(ctx) err = client.Database(base.DefaultDatabase).Collection(base.ObservabilityTable).Drop(ctx) require.NoError(t, err) + err = client.Database(base.DefaultDatabase).Collection(base.JobTable).Drop(ctx) require.NoError(t, client.Disconnect(ctx)) } +func TestRepository_CreateJob(t *testing.T) { + type args struct { + ctx context.Context + data []byte + path string + uid string + } + tests := []struct { + name string + existing []*base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "Simple success", + existing: nil, + args: args{ + ctx: context.Background(), + data: []byte(`{"id":"1", "uid": "2", "name": "sample", "description": "job desc"}`), + path: "sample.yaml", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + File: "sample.yaml", + }, + wantErr: nil, + }, + { + name: "Exist", + existing: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + File: "sample.yaml", + }, + }, + args: args{ + ctx: context.Background(), + data: []byte(`{"id":"1", "uid": "2", "name": "sample", "description": "job desc"}`), + path: "sample.yaml", + uid: "2", + }, + want: nil, + wantErr: errors.New("job with id 1 exists"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.existing { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + err := repo.CreateJob(tt.args.ctx, tt.args.data, tt.args.path, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.want.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.want.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_GetJobById(t *testing.T) { + type args struct { + ctx context.Context + id string + uid string + } + tests := []struct { + name string + existing []*base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "Simple success", + existing: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + }, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.existing { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + job, err := repo.GetJobById(tt.args.ctx, tt.args.id, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + assert.Equal(t, tt.want, job) + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_GetUserJobs(t *testing.T) { + type args struct { + ctx context.Context + uid string + } + tests := []struct { + name string + existing []*base.Job + args args + want []*base.Job + wantErr error + }{ + { + name: "Simple success", + existing: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "2", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "3", + UID: "3", + Name: "sample", + Description: "job desc", + }, + }, + args: args{ + ctx: context.Background(), + uid: "2", + }, + want: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "2", + UID: "2", + Name: "sample", + Description: "job desc", + }, + }, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + uid: "2", + }, + want: []*base.Job{}, + wantErr: nil, + }, + { + name: "No user jobs", + existing: []*base.Job{ + { + ID: "1", + UID: "3", + Description: "sample", + }, + }, + args: args{ + ctx: context.Background(), + uid: "2", + }, + want: []*base.Job{}, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.existing { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + jobs, err := repo.GetUserJobs(tt.args.ctx, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + assert.Equal(t, tt.want, jobs) + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_GetAll(t *testing.T) { + type args struct { + ctx context.Context + } + tests := []struct { + name string + args args + want []*base.Job + wantErr error + }{ + { + name: "Simple success 1", + want: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "2", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "3", + UID: "3", + Name: "sample", + Description: "job desc", + }, + }, + args: args{ + ctx: context.Background(), + }, + wantErr: nil, + }, + { + name: "Simple success 2", + want: []*base.Job{ + { + ID: "1", + UID: "3", + Description: "sample", + }, + }, + args: args{ + ctx: context.Background(), + }, + wantErr: nil, + }, + { + name: "Not exist", + args: args{ + ctx: context.Background(), + }, + want: []*base.Job{}, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.want { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + jobs, err := repo.GetAll(tt.args.ctx) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + assert.Equal(t, tt.want, jobs) + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_UpdateJob(t *testing.T) { + type args struct { + ctx context.Context + job *base.Job + } + tests := []struct { + name string + existing *base.Job + args args + want *base.Job + count int + wantErr error + }{ + { + name: "Simple success", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + job: &base.Job{ + ID: "1", + UID: "2", + Name: "changed", + Description: "changed", + Schedule: "weekly", + Runs: 1, + }, + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "changed", + Description: "changed", + Schedule: "weekly", + Runs: 1, + }, + count: 1, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + job: &base.Job{ + ID: "1", + UID: "2", + Name: "changed", + Description: "changed", + Schedule: "weekly", + Runs: 1, + }, + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + if tt.existing != nil { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + require.NoError(t, err) + } + err := repo.UpdateJob(tt.args.ctx, tt.args.job) + if tt.wantErr == nil { + require.NoError(t, err) + it := repo.db.Query(base.JobTable).Iterate() + require.NoError(t, it.Err()) + c := 0 + for it.Next(tt.args.ctx) { + c++ + } + assert.Equal(t, tt.count, c) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_DeleteJob(t *testing.T) { + type args struct { + ctx context.Context + id string + uid string + } + tests := []struct { + name string + existing *base.Job + args args + count int + wantErr error + }{ + { + name: "Simple success", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + count: 0, + wantErr: nil, + }, + { + name: "Not matched", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + id: "2", + uid: "3", + }, + count: 1, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + count: 0, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + var err error + if tt.existing != nil { + _, err = repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + } + require.NoError(t, err) + err = repo.DeleteJob(tt.args.ctx, tt.args.id, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + it := repo.db.Query(base.JobTable).Iterate() + require.NoError(t, it.Err()) + c := 0 + for it.Next(tt.args.ctx) { + c++ + } + assert.Equal(t, tt.count, c) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_AddJobLink(t *testing.T) { + type args struct { + ctx context.Context + from string + to string + uid string + } + tests := []struct { + name string + existing *base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "No existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,", + }, + wantErr: nil, + }, + { + name: "Existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "3", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,3,", + }, + wantErr: nil, + }, + { + name: "No job", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + { + name: "Invalid argument", + existing: nil, + args: args{ + ctx: context.Background(), + from: "", + to: "2", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + { + name: "Invalid argument 2", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + if tt.existing != nil { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + require.NoError(t, err) + } + err := repo.AddJobLink(tt.args.ctx, tt.args.from, tt.args.to, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_DeleteJobLink(t *testing.T) { + type args struct { + ctx context.Context + from string + to string + uid string + } + tests := []struct { + name string + existing *base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "No existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "", + }, + wantErr: nil, + }, + { + name: "Existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "", + }, + wantErr: nil, + }, + { + name: "Existing links 2", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,3,", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "3,", + }, + wantErr: nil, + }, + { + name: "No job", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + { + name: "Invalid argument", + existing: nil, + args: args{ + ctx: context.Background(), + from: "", + to: "2", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + { + name: "Invalid argument 2", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + if tt.existing != nil { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + require.NoError(t, err) + } + err := repo.DeleteJobLink(tt.args.ctx, tt.args.from, tt.args.to, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + func TestRepository_GetJobStats(t *testing.T) { type args struct { ctx context.Context @@ -42,14 +917,14 @@ func TestRepository_GetJobStats(t *testing.T) { } tests := []struct { name string - existing []base.JobStats + existing []*base.JobStats args args - want []base.JobStats + want []*base.JobStats wantErr error }{ { name: "Simple success", - existing: []base.JobStats{ + existing: []*base.JobStats{ { ID: "1", Description: "sample stats", @@ -61,7 +936,7 @@ func TestRepository_GetJobStats(t *testing.T) { ctx: context.Background(), id: "1", }, - want: []base.JobStats{ + want: []*base.JobStats{ { ID: "1", Description: "sample stats", @@ -78,7 +953,7 @@ func TestRepository_GetJobStats(t *testing.T) { require.NoError(t, repo.ensureIndex(tt.args.ctx)) for _, e := range tt.existing { - _, err := repo.db.Insert(tt.args.ctx, base.ObservabilityTable, nil, nosql.Document{ + _, err := repo.db.Insert(tt.args.ctx, base.ObservabilityTable, []string{e.ID}, nosql.Document{ "id": nosql.String(e.ID), "desc": nosql.String(e.Description), "type": nosql.String(e.Type), diff --git a/odin-engine/pkg/repository/repository.go b/odin-engine/pkg/repository/repository.go index 739aa41..a24ba0b 100644 --- a/odin-engine/pkg/repository/repository.go +++ b/odin-engine/pkg/repository/repository.go @@ -8,6 +8,7 @@ import ( const ( DefaultDatabase = "odin" ObservabilityTable = "observability" + JobTable = "jobs" ) var registry = make(map[string]*Registration) @@ -34,14 +35,61 @@ func GetRegistration(name string) (*Registration, error) { return reg, nil } +// Job is a type to be used for accessing and storing job information +type Job struct { + ID string `yaml:"id"` + UID string `yaml:"uid"` + GID string `yaml:"gid"` + Name string `yaml:"name"` + Description string `yaml:"description"` + Language string `yaml:"language"` + File string `yaml:"file"` + Stats string `yaml:"stats"` + Schedule string `yaml:"schedule"` + Runs int + Links string +} + +// JobStats is a type to be used for accessing and storing job stats information type JobStats struct { ID string Description string Type string Value string + Timestamp string } type Repository interface { - GetJobStats(ctx context.Context, id string) ([]JobStats, error) + // CreateJob creates a new job for an user + CreateJob(ctx context.Context, data []byte, path string, uid string) (string, error) + + // GetJobById returns a job by job id and user id + GetJobById(ctx context.Context, id string, uid string) (*Job, error) + + // GetUserJobs returns all jobs belonging to an user + GetUserJobs(ctx context.Context, uid string) ([]*Job, error) + + // GetAll returns all jobs + GetAll(ctx context.Context) ([]*Job, error) + + // UpdateJob modifies a job + UpdateJob(ctx context.Context, job *Job) error + + // DeleteJob deletes an user's job + DeleteJob(ctx context.Context, id string, uid string) error + + // AddJobLink is used to add links the job is associated with + AddJobLink(ctx context.Context, from string, to string, uid string) error + + // DeleteJobLink is used to delete links the job is associated with + DeleteJobLink(ctx context.Context, from string, to string, uid string) error + + // GetJobStats returns stats of a job given the job id + GetJobStats(ctx context.Context, id string) ([]*JobStats, error) + + // CreateJobStats create new job stats + CreateJobStats(ctx context.Context, js *JobStats) error + + // Close closes db connection Close() error }