Skip to content

Commit 319dba1

Browse files
committed
feat(outbox): implements outboxsql adapter
1 parent 7dadcba commit 319dba1

File tree

7 files changed

+277
-0
lines changed

7 files changed

+277
-0
lines changed

outbox/adapter/sql/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package outboxsql
2+
3+
type Config struct {
4+
TableName string
5+
}

outbox/adapter/sql/db_ctx.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package outboxsql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
7+
"github.com/Masterminds/squirrel"
8+
)
9+
10+
type dbCtx struct {
11+
db *sql.DB
12+
}
13+
14+
func (d *dbCtx) executor(ctx context.Context) squirrel.BaseRunner {
15+
tx, ok := txFromContext(ctx)
16+
if !ok {
17+
return d.db
18+
}
19+
return tx
20+
}
21+
22+
type txContextKey struct{}
23+
24+
func withTx(ctx context.Context, tx *sql.Tx) context.Context {
25+
return context.WithValue(ctx, txContextKey{}, tx)
26+
}
27+
28+
func txFromContext(ctx context.Context) (*sql.Tx, bool) {
29+
tx, ok := ctx.Value(txContextKey{}).(*sql.Tx)
30+
return tx, ok
31+
}

outbox/adapter/sql/go.mod

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
module github.com/notjustmoney/protobox/outbox/adapter/sql
2+
3+
go 1.23.4
4+
5+
require (
6+
github.com/Masterminds/squirrel v1.5.4
7+
github.com/notjustmoney/protobox v0.0.0-20250128131955-5f749f4fabb3
8+
github.com/samber/lo v1.49.1
9+
)
10+
11+
require (
12+
github.com/google/uuid v1.6.0 // indirect
13+
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
14+
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
15+
golang.org/x/sync v0.10.0 // indirect
16+
golang.org/x/text v0.21.0 // indirect
17+
)

outbox/adapter/sql/go.sum

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
2+
github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
3+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
7+
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
8+
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
9+
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
10+
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
11+
github.com/notjustmoney/protobox v0.0.0-20250128131955-5f749f4fabb3 h1:TZRn8YQrtF5Xj2DTFVuDkoYwjwOOBdNyrJUdDwK3nvk=
12+
github.com/notjustmoney/protobox v0.0.0-20250128131955-5f749f4fabb3/go.mod h1:c47ESQ/AG/c9d7YZIFTk5BsgTnTmNbIUpSsXojs2dC8=
13+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
14+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
15+
github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew=
16+
github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o=
17+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
18+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
19+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
20+
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
21+
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
22+
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
23+
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
24+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
25+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

outbox/adapter/sql/marker.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright (c) 2025. protobox
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package outboxsql
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"strings"
23+
24+
sq "github.com/Masterminds/squirrel"
25+
"github.com/samber/lo"
26+
27+
"github.com/notjustmoney/protobox/outbox"
28+
)
29+
30+
type MarkerFunc func(ctx context.Context, messages []outbox.PublishedMessage) (int, error)
31+
32+
func (f MarkerFunc) Mark(ctx context.Context, messages []outbox.PublishedMessage) (int, error) {
33+
return f(ctx, messages)
34+
}
35+
36+
func Marker(
37+
dbCtx *dbCtx,
38+
cfg Config,
39+
) MarkerFunc {
40+
return func(ctx context.Context, messages []outbox.PublishedMessage) (int, error) {
41+
args := make([]interface{}, 0, len(messages)*3)
42+
for i := 0; i < len(messages); i++ {
43+
args = append(args, messages[i].ID)
44+
args = append(args, messages[i].ProcessedAt.UTC())
45+
args = append(args, messages[i].Error)
46+
}
47+
48+
if _, err := sq.
49+
Update(cfg.TableName).
50+
Set("processed_at", sq.Expr("v.processed_at")).
51+
Set("error", sq.Expr("v.error")).
52+
Suffix("FROM").
53+
SuffixExpr(
54+
sq.Alias(
55+
sq.Expr(fmt.Sprintf("VALUES %s", strings.Join(lo.Map(messages, func(_ outbox.PublishedMessage, index int) string {
56+
return fmt.Sprintf("($%d, $%d::timestamp with time zone, $%d)", index*3+1, index*3+2, index*3+3)
57+
}), ",")), args...),
58+
"v(id, processed_at, error)",
59+
),
60+
).
61+
Suffix("WHERE " + cfg.TableName + ".id = v.id::uuid").
62+
PlaceholderFormat(sq.Dollar).
63+
RunWith(dbCtx.executor(ctx)).
64+
ExecContext(ctx); err != nil {
65+
return 0, err
66+
}
67+
68+
return len(messages), nil
69+
}
70+
}

outbox/adapter/sql/poller.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2025. protobox
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package outboxsql
18+
19+
import (
20+
"context"
21+
22+
sq "github.com/Masterminds/squirrel"
23+
"github.com/notjustmoney/protobox/outbox"
24+
)
25+
26+
type PollerFunc func(ctx context.Context, cfg outbox.PollConfig) ([]outbox.Message, error)
27+
28+
func (f PollerFunc) Poll(ctx context.Context, cfg outbox.PollConfig) ([]outbox.Message, error) {
29+
return f(ctx, cfg)
30+
}
31+
32+
func Poller(dbCtx *dbCtx, cfg Config) PollerFunc {
33+
return func(ctx context.Context, pollCfg outbox.PollConfig) ([]outbox.Message, error) {
34+
rows, err := sq.
35+
Select("id", "topic", "payload").
36+
From(cfg.TableName).
37+
Where(sq.Eq{"processed_at": nil}).
38+
OrderBy("created_at").
39+
Limit(uint64(pollCfg.BatchSize)).
40+
Suffix("FOR UPDATE SKIP LOCKED").
41+
PlaceholderFormat(sq.Dollar).
42+
RunWith(dbCtx.executor(ctx)).
43+
QueryContext(ctx)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
var messages []outbox.Message
49+
for rows.Next() {
50+
var message outbox.Message
51+
if err := rows.Scan(&message.ID, &message.Topic, &message.Payload); err != nil {
52+
return nil, err
53+
}
54+
messages = append(messages, message)
55+
}
56+
57+
return messages, nil
58+
}
59+
}

outbox/adapter/sql/relay_handler.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright (c) 2025. protobox
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package outboxsql
18+
19+
import (
20+
"context"
21+
"database/sql"
22+
"fmt"
23+
24+
"github.com/notjustmoney/protobox/outbox"
25+
)
26+
27+
type RelayHandler struct {
28+
db *sql.DB
29+
relayHandler *outbox.RelayHandler
30+
}
31+
32+
func NewRelayHandler(
33+
db *sql.DB,
34+
publisher outbox.Publisher,
35+
cfg Config,
36+
) *RelayHandler {
37+
dbCtx := &dbCtx{db: db}
38+
return &RelayHandler{
39+
db: db,
40+
relayHandler: outbox.NewRelayHandler(
41+
Poller(dbCtx, cfg),
42+
publisher,
43+
Marker(dbCtx, cfg),
44+
),
45+
}
46+
}
47+
48+
func (h *RelayHandler) Handle(ctx context.Context) (int, error) {
49+
tx, err := h.db.BeginTx(ctx, nil)
50+
if err != nil {
51+
return 0, fmt.Errorf("outboxsql: %w", err)
52+
}
53+
defer func() {
54+
if p := recover(); p != nil {
55+
_ = tx.Rollback()
56+
panic(p)
57+
}
58+
if err != nil {
59+
_ = tx.Rollback()
60+
return
61+
}
62+
_ = tx.Commit()
63+
}()
64+
65+
processed, err := h.Handle(ctx)
66+
if err != nil {
67+
return 0, fmt.Errorf("outboxsql: %w", err)
68+
}
69+
return processed, nil
70+
}

0 commit comments

Comments
 (0)