From 9713e06bc272a9ecd6080b8c424b3504a32ab41f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=C2=A0=C5=81owicki?= Date: Tue, 23 Oct 2018 21:40:57 +0200 Subject: [PATCH] Tasks history. Fixes #17. --- README.md | 1 + api/api.go | 26 +++++++ conf/conf.go | 40 +++++++--- coordinator/coordinator.go | 2 +- coordinator/zk/zk.go | 79 +++++++++---------- dev/config.json | 1 + docs/api.apib | 64 ++++++++++++++-- docs/api.html | 61 ++++++++++++--- mesos/handlers.go | 61 ++++++++++----- mesos/mesos.go | 1 + model/model.go | 20 ++--- rhythm.go | 8 +- storage/storage.go | 2 + storage/zk/zk.go | 152 ++++++++++++++++++++++++++++++++++++- 14 files changed, 416 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index c9e9a65..dfdf682 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ Options: * digest (optional and used only if `scheme` is set to `"digest"`) * user (optional) * password (optional) + * taskttl (optional) - number of milliseconds record of runned task should be kept (`7 days` by default). Example: ```javascript diff --git a/api/api.go b/api/api.go index a23487b..d2dfecb 100644 --- a/api/api.go +++ b/api/api.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "sort" "time" "github.com/gorilla/mux" @@ -44,6 +45,7 @@ type storage interface { GetJob(group string, project string, id string) (*model.Job, error) SaveJob(j *model.Job) error DeleteJob(group string, project string, id string) error + GetTasks(group string, project string, id string) ([]*model.Task, error) } type handler struct { @@ -104,6 +106,29 @@ func getJobs(a authorizer, s storage, w http.ResponseWriter, r *http.Request) er return nil } +func getTasks(a authorizer, s storage, w http.ResponseWriter, r *http.Request) error { + vars := mux.Vars(r) + group := vars["group"] + project := vars["project"] + lvl, err := a.GetProjectAccessLevel(r, group, project) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return err + } + if lvl == auth.NoAccess { + w.WriteHeader(http.StatusForbidden) + return errForbidden + } + tasks, err := s.GetTasks(group, project, vars["id"]) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return err + } + sort.Slice(tasks, func(i, j int) bool { return tasks[i].End.Before(tasks[j].End) }) + encoder(w).Encode(tasks) + return nil +} + func getGroupJobs(a authorizer, s storage, w http.ResponseWriter, r *http.Request) error { jobs, err := s.GetGroupJobs(mux.Vars(r)["group"]) if err != nil { @@ -450,6 +475,7 @@ func New(c *conf.API, s storage, state State) { v1.Handle("/jobs/{group}/{project}/{id}", &handler{a, s, getJob}).Methods("GET") v1.Handle("/jobs/{group}/{project}/{id}", &handler{a, s, deleteJob}).Methods("DELETE") v1.Handle("/jobs/{group}/{project}/{id}", &handler{a, s, updateJob}).Methods("PUT") + v1.Handle("/jobs/{group}/{project}/{id}/tasks", &handler{a, s, getTasks}).Methods("GET") v1.Handle("/metrics", promhttp.Handler()) srv := &http.Server{ Handler: r, diff --git a/conf/conf.go b/conf/conf.go index 696152f..51f1a57 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -3,6 +3,7 @@ package conf import ( "encoding/json" "io/ioutil" + "reflect" "time" ) @@ -67,6 +68,7 @@ type StorageZK struct { Addrs []string Timeout time.Duration Auth ZKAuth + TaskTTL time.Duration } const CoordinatorBackendZK = "zookeeper" @@ -77,10 +79,11 @@ type Coordinator struct { } type CoordinatorZK struct { - Dir string - Addrs []string - Timeout time.Duration - Auth ZKAuth + Dir string + Addrs []string + Timeout time.Duration + Auth ZKAuth + ElectionDir string } const ( @@ -167,6 +170,22 @@ type LoggingSentry struct { Tags map[string]string } +func millisecondFieldsToDuration(v reflect.Value) { + for i := 0; i < v.NumField(); i++ { + if v.Field(i).Kind() == reflect.Struct { + millisecondFieldsToDuration(v.Field(i)) + } else { + if v.Field(i).Type() == reflect.TypeOf(time.Second) { + if v.Field(i).CanSet() { + d := v.Field(i).Interface().(time.Duration) + d *= time.Millisecond + v.Field(i).Set(reflect.ValueOf(d)) + } + } + } + } +} + func New(path string) (*Conf, error) { file, err := ioutil.ReadFile(path) if err != nil { @@ -193,6 +212,7 @@ func New(path string) (*Conf, error) { Auth: ZKAuth{ Scheme: ZKAuthSchemeWorld, }, + TaskTTL: 1000 * 3600 * 24, // 24h }, }, Coordinator: Coordinator{ @@ -214,7 +234,7 @@ func New(path string) (*Conf, error) { }, }, Mesos: Mesos{ - FailoverTimeout: 1000 * 3600 * 24 * 7, // 7 days + FailoverTimeout: 1000 * 3600 * 24 * 7, // 7d Roles: []string{"*"}, Auth: MesosAuth{ Type: MesosAuthTypeNone, @@ -229,10 +249,10 @@ func New(path string) (*Conf, error) { if err != nil { return nil, err } - conf.Mesos.FailoverTimeout *= time.Millisecond - conf.Secrets.Vault.Timeout *= time.Millisecond - conf.Storage.ZooKeeper.Timeout *= time.Millisecond - conf.Coordinator.ZooKeeper.Timeout *= time.Millisecond - conf.API.Auth.LDAP.Timeout *= time.Millisecond + conf.Coordinator.ZooKeeper.ElectionDir = "election/mesos_scheduler" + // All time.Duration fields from Conf should be in milliseconds so + // conversion to time elapsed in nanoseconds (represented by time.Duration) + // is needed. + millisecondFieldsToDuration(reflect.ValueOf(conf).Elem()) return conf, nil } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index af5c42b..e231d0d 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -9,7 +9,7 @@ import ( ) type coordinator interface { - WaitUntilLeader() (context.Context, error) + WaitUntilLeader() context.Context } func New(c *conf.Coordinator) coordinator { diff --git a/coordinator/zk/zk.go b/coordinator/zk/zk.go index bbea446..e350d8e 100644 --- a/coordinator/zk/zk.go +++ b/coordinator/zk/zk.go @@ -6,6 +6,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/mlowicki/rhythm/conf" "github.com/mlowicki/rhythm/zkutil" @@ -13,46 +14,42 @@ import ( log "github.com/sirupsen/logrus" ) -const electionDir = "election" - type Coordinator struct { - dir string - conn *zk.Conn - acl func(perms int32) []zk.ACL - ticket string - eventChan <-chan zk.Event - cancel context.CancelFunc + dir string + electionDir string + conn *zk.Conn + acl func(perms int32) []zk.ACL + ticket string + eventChan <-chan zk.Event + cancel context.CancelFunc sync.Mutex } -func (coord *Coordinator) WaitUntilLeader() (context.Context, error) { - isLeader, ch, err := coord.isLeader() - if err != nil { - return nil, err - } - if !isLeader { - for { - log.Println("Not elected as leader. Waiting...") - <-ch - isLeader, ch, err = coord.isLeader() - if err != nil { - return nil, err - } else if isLeader { - break - } +func (coord *Coordinator) WaitUntilLeader() context.Context { + for { + isLeader, ch, err := coord.isLeader() + if err != nil { + log.Errorf("Failed checking if elected as leader: %s", err) + <-time.After(time.Second) + continue } + if isLeader { + break + } + log.Debug("Not elected as leader. Waiting...") + <-ch + continue } - log.Println("Elected as leader") ctx := context.Background() ctx, cancel := context.WithCancel(ctx) coord.Lock() coord.cancel = cancel coord.Unlock() - return ctx, nil + return ctx } func (coord *Coordinator) register() error { - name, err := coord.conn.Create(coord.dir+"/"+electionDir+"/", []byte{}, zk.FlagEphemeral|zk.FlagSequence, coord.acl(zk.PermAll)) + name, err := coord.conn.Create(coord.dir+"/"+coord.electionDir+"/", []byte{}, zk.FlagEphemeral|zk.FlagSequence, coord.acl(zk.PermAll)) if err != nil { return err } @@ -73,7 +70,7 @@ func (coord *Coordinator) isLeader() (bool, <-chan zk.Event, error) { return false, nil, fmt.Errorf("Registration failed: %s", err) } } - tickets, _, eventChan, err := coord.conn.ChildrenW(coord.dir + "/" + electionDir) + tickets, _, eventChan, err := coord.conn.ChildrenW(coord.dir + "/" + coord.electionDir) if err != nil { return false, nil, fmt.Errorf("Failed getting registration tickets: %s", err) } @@ -108,15 +105,18 @@ func (coord *Coordinator) initZK() error { return err } } - path := coord.dir + "/" + electionDir - exists, _, err = coord.conn.Exists(path) - if err != nil { - return fmt.Errorf("Failed checking if election directory exists: %s", err) - } - if !exists { - _, err = coord.conn.Create(path, []byte{}, 0, coord.acl(zk.PermAll)) + path := coord.dir + for _, part := range strings.Split(coord.electionDir, "/") { + path += "/" + part + exists, _, err = coord.conn.Exists(path) if err != nil { - return fmt.Errorf("Failed creating election directory: %s", err) + return fmt.Errorf("Failed checking if election directory exists: %s", err) + } + if !exists { + _, err = coord.conn.Create(path, []byte{}, 0, coord.acl(zk.PermAll)) + if err != nil { + return fmt.Errorf("Failed creating election directory: %s", err) + } } } return nil @@ -132,10 +132,11 @@ func New(c *conf.CoordinatorZK) (*Coordinator, error) { return nil, err } coord := Coordinator{ - conn: conn, - acl: acl, - dir: "/" + c.Dir, - eventChan: eventChan, + conn: conn, + acl: acl, + dir: "/" + c.Dir, + eventChan: eventChan, + electionDir: c.ElectionDir, } err = coord.initZK() if err != nil { diff --git a/dev/config.json b/dev/config.json index 7753159..adc20e2 100644 --- a/dev/config.json +++ b/dev/config.json @@ -7,6 +7,7 @@ "storage": { "backend": "zookeeper", "zookeeper": { + "taskttl": 300000, "dir": "rhythm", "timeout": 10000, "auth": { diff --git a/docs/api.apib b/docs/api.apib index 7556aa3..17b8894 100644 --- a/docs/api.apib +++ b/docs/api.apib @@ -61,7 +61,7 @@ Available under /api/v1/. "Cron": "*/1 * * * *" }, "CreatedAt": "2018-10-09T21:47:35.576513772+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": {}, @@ -97,7 +97,7 @@ Available under /api/v1/. "Cron": "*/2 * * * *" }, "CreatedAt": "2018-10-09T21:52:48.196252269+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": {}, @@ -274,7 +274,7 @@ Available under /api/v1/. "Cron": "*/1 * * * *" }, "CreatedAt": "2018-10-09T21:47:35.576513772+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": {}, @@ -310,7 +310,7 @@ Available under /api/v1/. "Cron": "*/2 * * * *" }, "CreatedAt": "2018-10-09T21:52:48.196252269+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": {}, @@ -359,7 +359,7 @@ Available under /api/v1/. "Cron": "*/1 * * * *" }, "CreatedAt": "2018-10-09T21:47:35.576513772+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": {}, @@ -395,7 +395,7 @@ Available under /api/v1/. "Cron": "*/2 * * * *" }, "CreatedAt": "2018-10-09T21:52:48.196252269+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": {}, @@ -444,7 +444,7 @@ Available under /api/v1/. "Cron": "*/1 * * * *" }, "CreatedAt": "2018-10-08T21:57:50.718196854+02:00", - "LastStartAt": "0001-01-01T00:00:00Z", + "LastStart": "0001-01-01T00:00:00Z", "TaskID": "", "AgentID": "", "Env": { @@ -593,6 +593,56 @@ Available under /api/v1/. + Response 204 +## Tasks [/api/v1/jobs/{group}/{project}/{job}/tasks] + +### List history of job's tasks (runs) [GET] + +Sorted in ascending order (oldest tasks first). +If job doesn't exist then empty list is returned with 200 HTTP status code. +Properties `Message`, `Reason` and `Source` are set to empty strings only for successful tasks. + ++ Parameters + + group: a (required, string) - ID of the group + + project: b (required, string) - ID of the project + + job: c (required, string) - ID of the job + ++ Response 200 (application/json) + + [{ + "Start": "2018-10-30T18:09:56.195107735+01:00", + "End": "2018-10-30T18:09:57.621237867+01:00", + "TaskID": "group:project:id:fa3623ff-819a-4ceb-a62c-1ce52797fb60", + "ExecutorID": "group:project:id:fa3623ff-819a-4ceb-a62c-1ce52797fb60", + "AgentID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0", + "FrameworkID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000", + "ExecutorURL": "http://example.com:5050/#/agents/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:fa3623ff-819a-4ceb-a62c-1ce52797fb60", + "Message": "", + "Reason": "", + "Source": "" + },{ + "Start": "2018-10-30T18:11:07.865192348+01:00", + "End": "2018-10-30T18:11:09.390047795+01:00", + "TaskID": "group:project:id:b1fc414e-3745-4389-99a8-f1d7dd4da021", + "ExecutorID": "group:project:id:b1fc414e-3745-4389-99a8-f1d7dd4da021", + "AgentID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0", + "FrameworkID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000", + "ExecutorURL": "http://example.com:5050/#/agents/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:b1fc414e-3745-4389-99a8-f1d7dd4da021", + "Message": "", + "Reason": "", + "Source": "" + },{ + "Start": "2018-10-30T18:13:07.889906135+01:00", + "End": "2018-10-30T18:13:09.245713177+01:00", + "TaskID": "group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0", + "ExecutorID": "group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0", + "AgentID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0", + "FrameworkID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000", + "ExecutorURL": "http://example.com:5050/#/agents/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0", + "Message": "Failed to create executor directory '/tmp/mesos/agent/slaves/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0/runs/80031a57-829e-4d0c-aeb3-2cf6d0bc30bf': Failed to chown directory to 'someone': No such user 'someone'", + "Reason": "REASON_EXECUTOR_TERMINATED", + "Source": "SOURCE_AGENT" + }] + ## Metrics [/api/v1/metrics] Backed by [Prometheus instrumenting library](https://github.com/prometheus/client_golang#instrumenting-applications). diff --git a/docs/api.html b/docs/api.html index 1b3fa65..144fe8c 100644 --- a/docs/api.html +++ b/docs/api.html @@ -1,4 +1,4 @@ -Rhythm API Back to top

Rhythm API

API documentation for Apache Mesos framework - Rhythm.

+Rhythm API Back to top

Rhythm API

API documentation for Apache Mesos framework - Rhythm.

Authorization

Rhythm has the concept of authorization backend. Authorization backend tells the level of access for particular project. There’re 3 access levels:

Response  204

Tasks

List history of job's tasks (runs)
GET/api/v1/jobs/{group}/{project}/{job}/tasks

Sorted in ascending order (oldest tasks first). +If job doesn’t exist then empty list is returned with 200 HTTP status code. +Properties Message, Reason and Source are set to empty strings only for successful tasks.

+

Example URI

GET /api/v1/jobs/a/b/c/tasks
URI Parameters
HideShow
group
string (required) Example: a

ID of the group

+
project
string (required) Example: b

ID of the project

+
job
string (required) Example: c

ID of the job

+
Response  200
HideShow
Headers
Content-Type: application/json
Body
[
+  {
+    "Start": "2018-10-30T18:09:56.195107735+01:00",
+    "End": "2018-10-30T18:09:57.621237867+01:00",
+    "TaskID": "group:project:id:fa3623ff-819a-4ceb-a62c-1ce52797fb60",
+    "ExecutorID": "group:project:id:fa3623ff-819a-4ceb-a62c-1ce52797fb60",
+    "AgentID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0",
+    "FrameworkID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000",
+    "ExecutorURL": "http://example.com:5050/#/agents/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:fa3623ff-819a-4ceb-a62c-1ce52797fb60",
+    "Message": "",
+    "Reason": "",
+    "Source": ""
+  },
+  {
+    "Start": "2018-10-30T18:11:07.865192348+01:00",
+    "End": "2018-10-30T18:11:09.390047795+01:00",
+    "TaskID": "group:project:id:b1fc414e-3745-4389-99a8-f1d7dd4da021",
+    "ExecutorID": "group:project:id:b1fc414e-3745-4389-99a8-f1d7dd4da021",
+    "AgentID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0",
+    "FrameworkID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000",
+    "ExecutorURL": "http://example.com:5050/#/agents/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:b1fc414e-3745-4389-99a8-f1d7dd4da021",
+    "Message": "",
+    "Reason": "",
+    "Source": ""
+  },
+  {
+    "Start": "2018-10-30T18:13:07.889906135+01:00",
+    "End": "2018-10-30T18:13:09.245713177+01:00",
+    "TaskID": "group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0",
+    "ExecutorID": "group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0",
+    "AgentID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0",
+    "FrameworkID": "3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000",
+    "ExecutorURL": "http://example.com:5050/#/agents/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0",
+    "Message": "Failed to create executor directory '/tmp/mesos/agent/slaves/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-S0/frameworks/3be69eb1-6b0b-4ab7-a7d1-3d3a813be77a-0000/executors/group:project:id:a837bd5d-4050-4592-93e3-7c6b6bf424b0/runs/80031a57-829e-4d0c-aeb3-2cf6d0bc30bf': Failed to chown directory to 'someone': No such user 'someone'",
+    "Reason": "REASON_EXECUTOR_TERMINATED",
+    "Source": "SOURCE_AGENT"
+  }
+]

Metrics

Backed by Prometheus instrumenting library.

Metrics
GET/api/v1/metrics

Example URI

GET /api/v1/metrics
Response  200
HideShow
Headers
Content-Type: text/plain
Body
...
 # HELP go_goroutines Number of goroutines that currently exist.
 # TYPE go_goroutines gauge
diff --git a/mesos/handlers.go b/mesos/handlers.go
index 002fbd8..9068b89 100644
--- a/mesos/handlers.go
+++ b/mesos/handlers.go
@@ -49,11 +49,16 @@ func buildSubscribedEventHandler(fidStore store.Singleton, failoverTimeout time.
 	)
 }
 
+func parseAbsoluteJobID(absoluteID string) (group, project, id string) {
+	chunks := strings.Split(absoluteID, ":")
+	return chunks[0], chunks[1], chunks[2]
+}
+
 func buildUpdateEventHandler(stor storage, cli calls.Caller, reconciler *reconciliation.Reconciliation, frameworkIDStore, leaderURLStore store.Singleton) eventrules.Rule {
 	return controller.AckStatusUpdates(cli).AndThen().HandleF(func(ctx context.Context, e *scheduler.Event) error {
 		status := e.GetUpdate().GetStatus()
 		reconciler.HandleUpdate(e.GetUpdate())
-		id, err := taskID2JobID(status.TaskID.Value)
+		absoluteJobID, err := taskID2JobID(status.TaskID.Value)
 		if err != nil {
 			log.WithFields(log.Fields{
 				"taskID": status.TaskID.Value,
@@ -61,15 +66,15 @@ func buildUpdateEventHandler(stor storage, cli calls.Caller, reconciler *reconci
 			return nil
 		}
 		state := status.GetState()
-		log.Printf("Task state update: %s (%s)", id, state)
-		chunks := strings.Split(id, ":")
-		job, err := stor.GetJob(chunks[0], chunks[1], chunks[2])
+		log.Printf("Task state update: %s (%s)", absoluteJobID, state)
+		group, project, jobID := parseAbsoluteJobID(absoluteJobID)
+		job, err := stor.GetJob(group, project, jobID)
 		if err != nil {
-			log.Printf("Failed to get job for task: %s", id)
+			log.Printf("Failed to get job for task: %s", absoluteJobID)
 			return nil
 		}
 		if job == nil {
-			log.Printf("Update for unknown job: %s", id)
+			log.Printf("Update for unknown job: %s", absoluteJobID)
 			return nil
 		}
 		switch state {
@@ -84,6 +89,13 @@ func buildUpdateEventHandler(stor storage, cli calls.Caller, reconciler *reconci
 			job.TaskID = ""
 			job.AgentID = ""
 			job.State = model.IDLE
+			frameworkID := store.GetIgnoreErrors(frameworkIDStore)()
+			leaderURL := store.GetIgnoreErrors(leaderURLStore)()
+			task := createTask(&status, &frameworkID, &leaderURL, job.LastStart)
+			err = stor.AddTask(group, project, jobID, task)
+			if err != nil {
+				log.Errorf("Failed to save task while handling FINISHED state: %s", err)
+			}
 			taskStateUpdatesCount.WithLabelValues("finished").Inc()
 		case mesos.TASK_LOST:
 			/*
@@ -108,7 +120,17 @@ func buildUpdateEventHandler(stor storage, cli calls.Caller, reconciler *reconci
 		case mesos.TASK_ERROR:
 			frameworkID := store.GetIgnoreErrors(frameworkIDStore)()
 			leaderURL := store.GetIgnoreErrors(leaderURLStore)()
-			handleFailedTask(job, &status, &frameworkID, &leaderURL)
+			task := createTask(&status, &frameworkID, &leaderURL, job.LastStart)
+			logFailedTask(job, &status)
+			job.State = model.FAILED
+			job.LastFailedTask = *task
+			job.TaskID = ""
+			job.AgentID = ""
+			err = stor.AddTask(group, project, jobID, task)
+			if err != nil {
+				log.Errorf("Failed to save task while handling FINISHED state: %s", err)
+			}
+			taskStateUpdatesCount.WithLabelValues("failed").Inc()
 		default:
 			log.Panicf("Unknown state: %s", state)
 		}
@@ -120,28 +142,31 @@ func buildUpdateEventHandler(stor storage, cli calls.Caller, reconciler *reconci
 	})
 }
 
-func handleFailedTask(job *model.Job, status *mesos.TaskStatus, frameworkID, leaderURL *string) {
+func logFailedTask(job *model.Job, status *mesos.TaskStatus) {
 	msg := status.GetMessage()
 	reason := status.GetReason().String()
 	src := status.GetSource().String()
 	log.Errorf("Task failed: %s (%s; %s; %s; %s)", job, status.GetState(), msg, reason, src)
-	job.State = model.FAILED
+}
+
+func createTask(status *mesos.TaskStatus, frameworkID, leaderURL *string, start time.Time) *model.Task {
 	executorID := status.GetExecutorID().GetValue()
 	agentID := status.GetAgentID().GetValue()
-	job.LastFailedTask = model.FailedTask{
-		Message:     msg,
-		Reason:      reason,
-		Source:      src,
-		When:        time.Now(),
+	task := model.Task{
+		Start:       start,
+		End:         time.Now(),
 		TaskID:      status.TaskID.GetValue(),
 		ExecutorID:  executorID,
 		AgentID:     agentID,
 		FrameworkID: *frameworkID,
 		ExecutorURL: fmt.Sprintf("%s/#/agents/%s/frameworks/%s/executors/%s", *leaderURL, agentID, *frameworkID, executorID),
 	}
-	job.TaskID = ""
-	job.AgentID = ""
-	taskStateUpdatesCount.WithLabelValues("failed").Inc()
+	if status.GetState() != mesos.TASK_FINISHED {
+		task.Message = status.GetMessage()
+		task.Reason = status.GetReason().String()
+		task.Source = status.GetSource().String()
+	}
+	return &task
 }
 
 func buildOffersEventHandler(stor storage, cli calls.Caller, secr secrets) events.HandlerFunc {
@@ -213,7 +238,7 @@ func handleOffer(ctx context.Context, cli calls.Caller, off *mesos.Offer, jobs [
 	}
 	for i, job := range jobsUsed {
 		job.State = model.STAGING
-		job.LastStartAt = time.Now()
+		job.LastStart = time.Now()
 		job.TaskID = tasks[i].TaskID.GetValue()
 		job.AgentID = tasks[i].AgentID.GetValue()
 		err := stor.SaveJob(job)
diff --git a/mesos/mesos.go b/mesos/mesos.go
index 89f7095..6a3fb0c 100644
--- a/mesos/mesos.go
+++ b/mesos/mesos.go
@@ -25,6 +25,7 @@ type storage interface {
 	GetFrameworkID() (string, error)
 	GetRunnableJobs() ([]*model.Job, error)
 	SaveJob(j *model.Job) error
+	AddTask(group, project, id string, task *model.Task) error
 }
 
 func newFrameworkInfo(conf *conf.Mesos, idStore store.Singleton) *mesos.FrameworkInfo {
diff --git a/model/model.go b/model/model.go
index 9330187..c6d133b 100644
--- a/model/model.go
+++ b/model/model.go
@@ -59,14 +59,14 @@ type Job struct {
 	ID             string
 	Schedule       JobSchedule
 	CreatedAt      time.Time
-	LastStartAt    time.Time
+	LastStart      time.Time
 	TaskID         string
 	AgentID        string
 	Env            map[string]string
 	Secrets        map[string]string
 	Container      JobContainer
 	State          State
-	LastFailedTask FailedTask
+	LastFailedTask Task
 	CPUs           float64
 	Mem            float64
 	Disk           float64
@@ -77,16 +77,18 @@ type Job struct {
 	Labels         map[string]string
 }
 
-type FailedTask struct {
-	Message     string
-	Reason      string
-	Source      string
-	When        time.Time
+type Task struct {
+	Start       time.Time
+	End         time.Time
 	TaskID      string
 	ExecutorID  string
 	AgentID     string
 	FrameworkID string
 	ExecutorURL string
+	// Set for failed task
+	Message string
+	Reason  string
+	Source  string
 }
 
 func (j *Job) String() string {
@@ -102,10 +104,10 @@ func (j *Job) NextRun() time.Time {
 		log.Panic(err)
 	}
 	var t time.Time
-	if j.LastStartAt.Before(j.CreatedAt) {
+	if j.LastStart.Before(j.CreatedAt) {
 		t = j.CreatedAt
 	} else {
-		t = j.LastStartAt
+		t = j.LastStart
 	}
 	return sched.Next(t)
 }
diff --git a/rhythm.go b/rhythm.go
index c387524..00456d7 100644
--- a/rhythm.go
+++ b/rhythm.go
@@ -83,12 +83,8 @@ func main() {
 	api.New(&conf.API, stor, api.State{func() bool { return isLeader.Get() }, version})
 	secr := secrets.New(&conf.Secrets)
 	for {
-		ctx, err := coord.WaitUntilLeader()
-		if err != nil {
-			log.Errorf("Error waiting for being a leader: %s", err)
-			<-time.After(time.Second)
-			continue
-		}
+		log.Info("Waiting until Mesos scheduler leader")
+		ctx := coord.WaitUntilLeader()
 		isLeader.Set(true)
 		err = mesos.Run(conf, ctx, stor, secr)
 		isLeader.Set(false)
diff --git a/storage/storage.go b/storage/storage.go
index d97c067..f151fc9 100644
--- a/storage/storage.go
+++ b/storage/storage.go
@@ -17,6 +17,8 @@ type storage interface {
 	GetRunnableJobs() ([]*model.Job, error)
 	SaveJob(j *model.Job) error
 	SetFrameworkID(id string) error
+	AddTask(group, project, id string, task *model.Task) error
+	GetTasks(group string, project string, id string) ([]*model.Task, error)
 }
 
 func New(c *conf.Storage) storage {
diff --git a/storage/zk/zk.go b/storage/zk/zk.go
index a709097..92f8bc6 100644
--- a/storage/zk/zk.go
+++ b/storage/zk/zk.go
@@ -1,15 +1,21 @@
 package zk
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"math/rand"
+	"strconv"
+	"strings"
 	"time"
 
 	"github.com/mlowicki/rhythm/conf"
+	zkcoord "github.com/mlowicki/rhythm/coordinator/zk"
 	"github.com/mlowicki/rhythm/model"
 	"github.com/mlowicki/rhythm/zkutil"
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/samuel/go-zookeeper/zk"
+	log "github.com/sirupsen/logrus"
 )
 
 type state struct {
@@ -21,11 +27,21 @@ const (
 	stateDir = "state"
 )
 
+var tasksCleanupCount = prometheus.NewCounter(prometheus.CounterOpts{
+	Name: "storage_zookeeper_tasks_cleanups",
+	Help: "Number of old tasks cleanups.",
+})
+
+func init() {
+	prometheus.MustRegister(tasksCleanupCount)
+}
+
 func NewStorage(c *conf.StorageZK) (*storage, error) {
 	s := &storage{
 		dir:     "/" + c.Dir,
 		addrs:   c.Addrs,
 		timeout: c.Timeout,
+		taskTTL: c.TaskTTL,
 	}
 	err := s.connect()
 	if err != nil {
@@ -40,6 +56,18 @@ func NewStorage(c *conf.StorageZK) (*storage, error) {
 	if err != nil {
 		return nil, err
 	}
+	coordConf := &conf.CoordinatorZK{
+		Dir:         c.Dir,
+		Addrs:       c.Addrs,
+		Timeout:     c.Timeout,
+		Auth:        c.Auth,
+		ElectionDir: "election/tasks_cleanup",
+	}
+	coord, err := zkcoord.New(coordConf)
+	if err != nil {
+		return nil, err
+	}
+	s.runTasksCleanupScheduler(coord)
 	return s, nil
 }
 
@@ -49,6 +77,36 @@ type storage struct {
 	conn    *zk.Conn
 	acl     func(perms int32) []zk.ACL
 	timeout time.Duration
+	taskTTL time.Duration
+}
+
+func (s *storage) runTasksCleanupScheduler(coord *zkcoord.Coordinator) {
+	interval := time.Hour
+	go func() {
+		for {
+			log.Info("Waiting until tasks cleanup leader")
+			ctx := coord.WaitUntilLeader()
+			log.Info("Elected as tasks cleanup leader")
+			timer := time.After(interval)
+		inner:
+			for {
+				select {
+				case <-timer:
+					log.Debug("Old tasks cleanup started")
+					deleted, err := s.tasksCleanup(ctx)
+					if err != nil {
+						log.Errorf("Old tasks cleanup failed: %s", err)
+					} else {
+						log.Debugf("Old tasks cleanup finished. Deleted tasks: %d", deleted)
+						tasksCleanupCount.Inc()
+					}
+					timer = time.After(interval)
+				case <-ctx.Done():
+					break inner
+				}
+			}
+		}
+	}()
 }
 
 func (s *storage) connect() error {
@@ -193,6 +251,31 @@ func (s *storage) GetJobs() ([]*model.Job, error) {
 	return jobs, nil
 }
 
+func (s *storage) GetTasks(group, project, id string) ([]*model.Task, error) {
+	tasks := []*model.Task{}
+	base := s.dir + "/" + jobsDir + "/" + group + ":" + project + ":" + id
+	children, _, err := s.conn.Children(base)
+	if err != nil {
+		if err == zk.ErrNoNode {
+			return tasks, nil
+		}
+		return tasks, err
+	}
+	for _, child := range children {
+		payload, _, err := s.conn.Get(base + "/" + child)
+		if err != nil {
+			return tasks, err
+		}
+		var task model.Task
+		err = json.Unmarshal(payload, &task)
+		if err != nil {
+			return tasks, err
+		}
+		tasks = append(tasks, &task)
+	}
+	return tasks, nil
+}
+
 func (s *storage) GetRunnableJobs() ([]*model.Job, error) {
 	runnable := []*model.Job{}
 	jobs, err := s.GetJobs()
@@ -210,6 +293,59 @@ func (s *storage) GetRunnableJobs() ([]*model.Job, error) {
 	return runnable, nil
 }
 
+func (s *storage) tasksCleanup(ctx context.Context) (int64, error) {
+	deleted := int64(0)
+	base := s.dir + "/" + jobsDir
+	jobIDs, _, err := s.conn.Children(base)
+	if err != nil {
+		return 0, err
+	}
+	if ctx.Err() != nil {
+		return deleted, nil
+	}
+	for _, jobID := range jobIDs {
+		keys, _, err := s.conn.Children(base + "/" + jobID)
+		if err != nil {
+			log.Errorf("Failed getting task IDs: %s", err)
+			continue
+		}
+		if ctx.Err() != nil {
+			return deleted, nil
+		}
+		for _, key := range keys {
+			chunks := strings.SplitN(key, "@", 2)
+			timestamp, err := strconv.ParseInt(chunks[0], 10, 64)
+			if err != nil {
+				log.Errorf("Failed parsing timestamp: %s", err)
+				continue
+			}
+			if time.Now().Sub(time.Unix(timestamp, 0)) > s.taskTTL {
+				err = s.conn.Delete(base+"/"+jobID+"/"+key, 0)
+				if err != nil {
+					log.Errorf("Failed removing old task: %s", err)
+					continue
+				}
+				deleted += 1
+			}
+			if ctx.Err() != nil {
+				return deleted, nil
+			}
+		}
+	}
+	return deleted, nil
+}
+
+func (s *storage) AddTask(group, project, id string, task *model.Task) error {
+	encoded, err := json.Marshal(task)
+	if err != nil {
+		return err
+	}
+	path := fmt.Sprintf("%s/%s:%s:%s/%d@%s", s.dir+"/"+jobsDir, group, project,
+		id, task.End.Unix(), task.TaskID)
+	_, err = s.conn.Create(path, encoded, 0, s.acl(zk.PermAll))
+	return err
+}
+
 func (s *storage) SaveJob(job *model.Job) error {
 	encoded, err := json.Marshal(job)
 	if err != nil {
@@ -235,13 +371,23 @@ func (s *storage) SaveJob(job *model.Job) error {
 }
 
 func (s *storage) DeleteJob(group string, project string, id string) error {
-	path := fmt.Sprintf("%s/%s:%s:%s", s.dir+"/"+jobsDir, group, project, id)
-	exists, stat, err := s.conn.Exists(path)
+	jobPath := fmt.Sprintf("%s/%s:%s:%s", s.dir+"/"+jobsDir, group, project, id)
+	exists, stat, err := s.conn.Exists(jobPath)
 	if err != nil {
 		return err
 	}
 	if exists {
-		err = s.conn.Delete(path, stat.Version)
+		keys, _, err := s.conn.Children(jobPath)
+		if err != nil {
+			return err
+		}
+		for _, key := range keys {
+			err = s.conn.Delete(jobPath+"/"+key, 0)
+			if err != nil {
+				return err
+			}
+		}
+		err = s.conn.Delete(jobPath, stat.Version)
 		if err != nil {
 			return err
 		}