Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from call_id to task_id for consistency #553

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ curl -H "Content-Type: application/json" -X POST -d '{
}' http://localhost:8080/r/myapp/hello-async
```

You will get a `call_id` in the response:
You will get a `task_id` in the response:

```json
{"call_id":"572415fd-e26e-542b-846f-f1f5870034f2"}
{"task_id":"572415fd-e26e-542b-846f-f1f5870034f2"}
```

If you watch the logs, you will see the function actually runs in the background:
Expand Down
6 changes: 3 additions & 3 deletions api/mqs/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (mq *BoltDbMQ) delayTask(job *models.Task) (*models.Task, error) {
}

func (mq *BoltDbMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
log.Println("Pushed to MQ")

if job.Delay > 0 {
Expand Down Expand Up @@ -315,7 +315,7 @@ func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) {
return nil, err
}

_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
log.Println("Reserved")

return &job, nil
Expand All @@ -325,7 +325,7 @@ func (mq *BoltDbMQ) Reserve(ctx context.Context) (*models.Task, error) {
}

func (mq *BoltDbMQ) Delete(ctx context.Context, job *models.Task) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
defer log.Println("Deleted")

return mq.db.Update(func(tx *bolt.Tx) error {
Expand Down
6 changes: 3 additions & 3 deletions api/mqs/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (ji *TaskItem) Less(than btree.Item) bool {
}

func (mq *MemoryMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
log.Println("Pushed to MQ")

// It seems to me that using the job ID in the reservation is acceptable since each job can only have one outstanding reservation.
Expand Down Expand Up @@ -173,13 +173,13 @@ func (mq *MemoryMQ) Reserve(ctx context.Context) (*models.Task, error) {
return nil, nil
}

_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
log.Println("Reserved")
return job, mq.pushTimeout(job)
}

func (mq *MemoryMQ) Delete(ctx context.Context, job *models.Task) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})

mq.Mutex.Lock()
defer mq.Mutex.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions api/mqs/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (mq *RedisMQ) delayTask(conn redis.Conn, job *models.Task) (*models.Task, e
}

func (mq *RedisMQ) Push(ctx context.Context, job *models.Task) (*models.Task, error) {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
defer log.Println("Pushed to MQ")

conn := mq.pool.Get()
Expand Down Expand Up @@ -280,14 +280,14 @@ func (mq *RedisMQ) Reserve(ctx context.Context) (*models.Task, error) {
return nil, err
}

_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
log.Println("Reserved")

return &job, nil
}

func (mq *RedisMQ) Delete(ctx context.Context, job *models.Task) error {
_, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": job.ID})
_, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": job.ID})
defer log.Println("Deleted")

conn := mq.pool.Get()
Expand Down
2 changes: 1 addition & 1 deletion api/runner/async_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func startAsyncRunners(ctx context.Context, url string, tasks chan task.Request,
continue
}

ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": task.ID})
log.Debug("Running task:", task.ID)

wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion api/runner/func_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (l *DefaultFuncLogger) Writer(ctx context.Context, appName, path, image, re
r, w := io.Pipe()

log := common.Logger(ctx)
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "image": image, "call_id": reqID})
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": appName, "path": path, "image": image, "task_id": reqID})

go func(reader io.Reader) {
scanner := bufio.NewScanner(reader)
Expand Down
4 changes: 2 additions & 2 deletions api/server/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *Server) handleRequest(c *gin.Context, enqueue models.Enqueue) {
ctx := c.MustGet("ctx").(context.Context)

reqID := uuid.NewV5(uuid.Nil, fmt.Sprintf("%s%s%d", c.Request.RemoteAddr, c.Request.URL.Path, time.Now().Unix())).String()
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"call_id": reqID})
ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"task_id": reqID})

var err error
var payload io.Reader
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, foun
// Push to queue
enqueue(c, s.MQ, task)
log.Info("Added new task to queue")
c.JSON(http.StatusAccepted, map[string]string{"call_id": task.ID})
c.JSON(http.StatusAccepted, map[string]string{"task_id": task.ID})

default:
result, err := runner.RunTask(s.tasks, ctx, cfg)
Expand Down
2 changes: 1 addition & 1 deletion api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func prepareMiddleware(ctx context.Context) gin.HandlerFunc {
}

func DefaultEnqueue(ctx context.Context, mq models.MessageQueue, task *models.Task) (*models.Task, error) {
ctx, _ = common.LoggerWithFields(ctx, logrus.Fields{"call_id": task.ID})
ctx, _ = common.LoggerWithFields(ctx, logrus.Fields{"task_id": task.ID})
return mq.Push(ctx, task)
}

Expand Down
2 changes: 1 addition & 1 deletion docs/definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Options: `sync` and `async`

`type` is defines how the function will be executed. If type is `sync` the request will be hold until the result is ready and flushed.

In `async` functions the request will be ended with a `call_id` and the function will be executed in the background.
In `async` functions the request will be ended with a `task_id` and the function will be executed in the background.

#### memory (number)

Expand Down
10 changes: 5 additions & 5 deletions docs/operating/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ We recommend using [logspout](https://github.com/gliderlabs/logspout) to forward

All logs are emitted in [logfmt](https://godoc.org/github.com/kr/logfmt) format for easy parsing.

## Call ID
## TASK ID

Every function call/request is assigned a `call_id`. If you search your logs, you can track all the activity
for each function call and find errors on a call by call basis. For example, these are the log lines for an aynschronous
Every function call/request is assigned a `task_id`. If you search your logs, you can track all the activity
for each function call and find errors on a call by call basis. For example, these are the log lines for an asynchronous
function call:

![async logs](/docs/assets/async-log-full.png)

Note the easily searchable `call_id=x` format.
Note the easily searchable `task_id=x` format.

```sh
call_id=477949e2-922c-5da9-8633-0b2887b79f6e
task_id=477949e2-922c-5da9-8633-0b2887b79f6e
```

## Metrics
Expand Down