Skip to content
This repository was archived by the owner on Oct 31, 2021. It is now read-only.

Commit a358cd5

Browse files
committed
Big job/plaid updates.
Moved schema migrations to the internal package. I want to add functional schema migrations in the future which might involve more than just database access, but to keep that stuff from being used I want to keep all of that in the internal package. I've added much more to the plaid helper interface. This will serve as an abstraction layer for making API calls to plaid so that we can track them with sentry traces. I've migrated all of the jobs to use the new plaid interface rather than the direct client for now. I've added an institutions table as well as a job to maintain the basic institution data for now. It does not keep track of status yet, but will let us start to add instutition data to the links as they are created in the future.
1 parent 5b1b9cd commit a358cd5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+506
-81
lines changed

pkg/controller/plaid.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (c *Controller) waitForPlaid(ctx iris.Context) {
255255
})
256256

257257
repo := c.mustGetAuthenticatedRepository(ctx)
258-
link, err := repo.GetLink(linkId)
258+
link, err := repo.GetLink(c.getContext(ctx), linkId)
259259
if err != nil {
260260
c.wrapPgError(ctx, err, "failed to retrieve link")
261261
return

pkg/internal/cmd/migrate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"fmt"
55
"github.com/go-pg/pg/v10"
66
"github.com/monetrapp/rest-api/pkg/logging"
7-
"github.com/monetrapp/rest-api/pkg/migrations"
7+
"github.com/monetrapp/rest-api/pkg/internal/migrations"
88
"github.com/spf13/cobra"
99
)
1010

pkg/internal/cmd/serve.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99
"github.com/monetrapp/rest-api/pkg/build"
1010
"github.com/monetrapp/rest-api/pkg/cache"
1111
"github.com/monetrapp/rest-api/pkg/config"
12+
"github.com/monetrapp/rest-api/pkg/internal/migrations"
13+
"github.com/monetrapp/rest-api/pkg/internal/plaid_helper"
1214
"github.com/monetrapp/rest-api/pkg/jobs"
1315
"github.com/monetrapp/rest-api/pkg/logging"
1416
"github.com/monetrapp/rest-api/pkg/metrics"
15-
"github.com/monetrapp/rest-api/pkg/migrations"
1617
"github.com/plaid/plaid-go/plaid"
1718
"github.com/spf13/cobra"
1819
"github.com/stripe/stripe-go/v72"
@@ -126,6 +127,15 @@ func RunServer() error {
126127
}
127128
defer redisController.Close()
128129

130+
plaidHelper := plaid_helper.NewPlaidClient(log, plaid.ClientOptions{
131+
ClientID: configuration.Plaid.ClientID,
132+
Secret: configuration.Plaid.ClientSecret,
133+
Environment: configuration.Plaid.Environment,
134+
HTTPClient: &http.Client{
135+
Timeout: 30 * time.Second,
136+
},
137+
})
138+
129139
plaidClient, err := plaid.NewClient(plaid.ClientOptions{
130140
ClientID: configuration.Plaid.ClientID,
131141
Secret: configuration.Plaid.ClientSecret,
@@ -146,7 +156,7 @@ func RunServer() error {
146156
log.Debugf("stripe webhooks are enabled and will be sent to: %s", configuration.Stripe.WebhooksDomain)
147157
}
148158

149-
jobManager := jobs.NewJobManager(log, redisController.Pool(), db, plaidClient, stats)
159+
jobManager := jobs.NewJobManager(log, redisController.Pool(), db, plaidHelper, stats)
150160
defer jobManager.Close()
151161

152162
app := application.NewApp(configuration, getControllers(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package functional
2+
3+
import (
4+
"fmt"
5+
"github.com/go-pg/migrations/v8"
6+
)
7+
8+
func init() {
9+
FunctionalMigrations = append(FunctionalMigrations, &migrations.Migration{
10+
Version: 2021050999,
11+
UpTx: false,
12+
Up: func(db migrations.DB) error {
13+
fmt.Println("TEST MIGRATION UP")
14+
return nil
15+
},
16+
DownTx: false,
17+
Down: func(db migrations.DB) error {
18+
fmt.Println("TEST MIGRATION DOWN")
19+
return nil
20+
},
21+
})
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package functional
2+
3+
import "github.com/go-pg/migrations/v8"
4+
5+
var (
6+
FunctionalMigrations = []*migrations.Migration{}
7+
)

pkg/migrations/migrations.go pkg/internal/migrations/migrations.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package migrations
22

33
import (
44
"github.com/go-pg/migrations/v8"
5+
"github.com/monetrapp/rest-api/pkg/internal/migrations/functional"
56
"github.com/pkg/errors"
67
"github.com/sirupsen/logrus"
78
"net/http"
@@ -14,7 +15,7 @@ type MonetrMigrationsManager struct {
1415
}
1516

1617
func NewMigrationsManager(log *logrus.Entry, db migrations.DB) (*MonetrMigrationsManager, error) {
17-
collection := migrations.NewCollection()
18+
collection := migrations.NewCollection(functional.FunctionalMigrations...)
1819
if err := collection.DiscoverSQLMigrationsFromFilesystem(http.FS(things), "schema"); err != nil {
1920
return nil, errors.Wrap(err, "failed to discover embedded sql migrations")
2021
}
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE "institutions";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
CREATE TABLE "institutions"
2+
(
3+
institution_id BIGSERIAL NOT NULL,
4+
name TEXT NOT NULL,
5+
plaid_institution_id TEXT,
6+
plaid_products TEXT[],
7+
url TEXT,
8+
primary_color TEXT,
9+
logo TEXT,
10+
CONSTRAINT "pk_institutions" PRIMARY KEY ("institution_id"),
11+
CONSTRAINT "uq_institutions_plaid_institution_id" UNIQUE ("plaid_institution_id")
12+
);
13+
14+
ALTER TABLE "links" ADD COLUMN "institution_id" BIGINT NULL;
15+
ALTER TABLE "links" ADD CONSTRAINT "fk_links_institution" FOREIGN KEY ("institution_id") REFERENCES "institutions" ("institution_id") ON DELETE SET NULL;
16+
17+
-- -- Will use this later to add the proper institutions to the link records.
18+
-- UPDATE "links" AS "link"
19+
-- SET "institution_id" = "instituion"."institution_id"
20+
-- FROM "plaid_links" AS "plaid_link"
21+
-- JOIN "institutions" AS "institution" ON "institution"."plaid_institution_id" = "plaid_link"."institution_id"
22+
-- WHERE "plaid_link"."plaid_link_id" = "link"."plaid_link_id"
23+
File renamed without changes.

pkg/internal/plaid_helper/client.go

+183-1
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,16 @@ import (
66
"github.com/pkg/errors"
77
"github.com/plaid/plaid-go/plaid"
88
"github.com/sirupsen/logrus"
9+
"strings"
10+
"time"
911
)
1012

1113
type Client interface {
14+
GetAccounts(ctx context.Context, accessToken string, options plaid.GetAccountsOptions) ([]plaid.Account, error)
15+
GetAllTransactions(ctx context.Context, accessToken string, start, end time.Time, accountIds []string) ([]plaid.Transaction, error)
16+
GetAllInstitutions(ctx context.Context, countryCodes []string, options plaid.GetInstitutionsOptions) ([]plaid.Institution, error)
17+
GetInstitutions(ctx context.Context, count, offset int, countryCodes []string, options plaid.GetInstitutionsOptions) (total int, _ []plaid.Institution, _ error)
18+
GetInstitution(ctx context.Context, institutionId string, includeMetadata bool, countryCodes []string) (*plaid.Institution, error)
1219
GetWebhookVerificationKey(ctx context.Context, keyId string) (plaid.GetWebhookVerificationKeyResponse, error)
1320
Close() error
1421
}
@@ -36,8 +43,183 @@ type plaidClient struct {
3643
client *plaid.Client
3744
}
3845

46+
func (p *plaidClient) GetAccounts(ctx context.Context, accessToken string, options plaid.GetAccountsOptions) ([]plaid.Account, error) {
47+
span := sentry.StartSpan(ctx, "Plaid - GetAccounts")
48+
defer span.Finish()
49+
if span.Data == nil {
50+
span.Data = map[string]interface{}{}
51+
}
52+
span.Data["options"] = options
53+
54+
result, err := p.client.GetAccountsWithOptions(accessToken, options)
55+
span.Data["plaidRequestId"] = result.RequestID
56+
if err != nil {
57+
span.Status = sentry.SpanStatusInternalError
58+
return nil, errors.Wrap(err, "failed to retrieve plaid accounts")
59+
}
60+
61+
return result.Accounts, nil
62+
}
63+
64+
func (p *plaidClient) GetAllTransactions(ctx context.Context, accessToken string, start, end time.Time, accountIds []string) ([]plaid.Transaction, error) {
65+
span := sentry.StartSpan(ctx, "Plaid - GetAllTransactions")
66+
defer span.Finish()
67+
if span.Data == nil {
68+
span.Data = map[string]interface{}{}
69+
}
70+
71+
span.Data["start"] = start
72+
span.Data["end"] = start
73+
if len(accountIds) > 0 {
74+
span.Data["accountIds"] = accountIds
75+
}
76+
77+
perPage := 100
78+
79+
options := plaid.GetTransactionsOptions{
80+
StartDate: start.Format("2006-01-02"),
81+
EndDate: end.Format("2006-01-02"),
82+
AccountIDs: accountIds,
83+
Count: perPage,
84+
Offset: 0,
85+
}
86+
87+
transactions := make([]plaid.Transaction, 0)
88+
for {
89+
options.Offset = len(transactions)
90+
total, items, err := p.GetTransactions(span.Context(), accessToken, options)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
transactions = append(transactions, items...)
96+
97+
if len(items) < perPage {
98+
break
99+
}
100+
101+
if len(transactions) >= total {
102+
break
103+
}
104+
}
105+
106+
return transactions, nil
107+
}
108+
109+
func (p *plaidClient) GetTransactions(ctx context.Context, accessToken string, options plaid.GetTransactionsOptions) (total int, _ []plaid.Transaction, _ error) {
110+
span := sentry.StartSpan(ctx, "Plaid - GetTransactions")
111+
defer span.Finish()
112+
if span.Data == nil {
113+
span.Data = map[string]interface{}{}
114+
}
115+
116+
span.Data["options"] = options
117+
118+
result, err := p.client.GetTransactionsWithOptions(accessToken, options)
119+
span.Data["plaidRequestId"] = result.RequestID
120+
if err != nil {
121+
return 0, nil, errors.Wrap(err, "failed to retrieve plaid transactions")
122+
}
123+
124+
return result.TotalTransactions, result.Transactions, nil
125+
}
126+
127+
func (p *plaidClient) GetInstitution(ctx context.Context, institutionId string, includeMetadata bool, countryCodes []string) (*plaid.Institution, error) {
128+
span := sentry.StartSpan(ctx, "Plaid - GetInstitution")
129+
defer span.Finish()
130+
if span.Data == nil {
131+
span.Data = map[string]interface{}{}
132+
}
133+
134+
span.Data["institutionId"] = institutionId
135+
span.Data["includeMetadata"] = includeMetadata
136+
span.Data["countryCodes"] = countryCodes
137+
138+
result, err := p.client.GetInstitutionByIDWithOptions(institutionId, countryCodes, plaid.GetInstitutionByIDOptions{
139+
IncludeOptionalMetadata: true,
140+
IncludePaymentInitiationMetadata: false,
141+
IncludeStatus: false,
142+
})
143+
span.Data["plaidRequestId"] = result.RequestID
144+
if err != nil {
145+
return nil, errors.Wrap(err, "failed to retrieve plaid institution")
146+
}
147+
148+
return &result.Institution, nil
149+
}
150+
151+
func (p *plaidClient) GetAllInstitutions(ctx context.Context, countryCodes []string, options plaid.GetInstitutionsOptions) ([]plaid.Institution, error) {
152+
span := sentry.StartSpan(ctx, "Plaid - GetAllInstitutions")
153+
defer span.Finish()
154+
if span.Data == nil {
155+
span.Data = map[string]interface{}{}
156+
}
157+
span.Data["countryCodes"] = countryCodes
158+
span.Data["options"] = options
159+
160+
perPage := 100
161+
institutions := make([]plaid.Institution, 0)
162+
for {
163+
total, items, err := p.GetInstitutions(span.Context(), perPage, len(institutions), countryCodes, options)
164+
if err != nil {
165+
span.Status = sentry.SpanStatusInternalError
166+
return nil, err
167+
}
168+
169+
institutions = append(institutions, items...)
170+
171+
// If we received fewer items than we requested, then we have reached the end.
172+
if len(items) < perPage {
173+
break
174+
}
175+
176+
// If we have received at least what we expect to be the total amount, then we are also done.
177+
if len(institutions) >= total {
178+
break
179+
}
180+
}
181+
182+
return institutions, nil
183+
}
184+
185+
func (p *plaidClient) GetInstitutions(ctx context.Context, count, offset int, countryCodes []string, options plaid.GetInstitutionsOptions) (total int, _ []plaid.Institution, _ error) {
186+
span := sentry.StartSpan(ctx, "Plaid - GetInstitutions")
187+
defer span.Finish()
188+
if span.Data == nil {
189+
span.Data = map[string]interface{}{}
190+
}
191+
192+
span.Data["count"] = count
193+
span.Data["offset"] = offset
194+
span.Data["countryCodes"] = countryCodes
195+
span.Data["options"] = options
196+
197+
log := p.log.WithFields(logrus.Fields{
198+
"count": count,
199+
"offset": offset,
200+
"countryCodes": strings.Join(countryCodes, ","),
201+
})
202+
203+
log.Debug("retrieving plaid institutions")
204+
205+
result, err := p.client.GetInstitutionsWithOptions(count, offset, countryCodes, options)
206+
span.Data["plaidRequestId"] = result.RequestID
207+
log = log.WithField("plaidRequestId", result.RequestID)
208+
if err != nil {
209+
span.Status = sentry.SpanStatusInternalError
210+
log.WithError(err).Errorf("failed to retrieve plaid institutions")
211+
return 0, nil, errors.Wrap(err, "failed to retrieve plaid institutions")
212+
}
213+
214+
log.Debugf("successfully retrieved %d institutions", len(result.Institutions))
215+
216+
span.Status = sentry.SpanStatusOK
217+
218+
return result.Total, result.Institutions, nil
219+
}
220+
39221
func (p *plaidClient) GetWebhookVerificationKey(ctx context.Context, keyId string) (plaid.GetWebhookVerificationKeyResponse, error) {
40-
span := sentry.StartSpan(ctx, "GetWebhookVerificationKey")
222+
span := sentry.StartSpan(ctx, "Plaid - GetWebhookVerificationKey")
41223
defer span.Finish()
42224

43225
result, err := p.client.GetWebhookVerificationKey(keyId)

pkg/jobs/check_institution_status.go

-9
This file was deleted.

pkg/jobs/jobs.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"github.com/go-pg/pg/v10"
66
"github.com/gocraft/work"
77
"github.com/gomodule/redigo/redis"
8+
"github.com/monetrapp/rest-api/pkg/internal/plaid_helper"
89
"github.com/monetrapp/rest-api/pkg/metrics"
910
"github.com/monetrapp/rest-api/pkg/models"
1011
"github.com/monetrapp/rest-api/pkg/pubsub"
1112
"github.com/monetrapp/rest-api/pkg/repository"
1213
"github.com/pkg/errors"
13-
"github.com/plaid/plaid-go/plaid"
1414
"github.com/sirupsen/logrus"
1515
"math"
1616
"time"
@@ -28,12 +28,12 @@ type jobManagerBase struct {
2828
work *work.WorkerPool
2929
queue *work.Enqueuer
3030
db *pg.DB
31-
plaidClient *plaid.Client
31+
plaidClient plaid_helper.Client
3232
stats *metrics.Stats
3333
ps pubsub.PublishSubscribe
3434
}
3535

36-
func NewJobManager(log *logrus.Entry, pool *redis.Pool, db *pg.DB, plaidClient *plaid.Client, stats *metrics.Stats) JobManager {
36+
func NewJobManager(log *logrus.Entry, pool *redis.Pool, db *pg.DB, plaidClient plaid_helper.Client, stats *metrics.Stats) JobManager {
3737
manager := &jobManagerBase{
3838
log: log,
3939
// TODO (elliotcourant) Use namespace from config.
@@ -56,6 +56,7 @@ func NewJobManager(log *logrus.Entry, pool *redis.Pool, db *pg.DB, plaidClient *
5656
manager.work.Job(PullInitialTransactions, manager.pullInitialTransactions)
5757
manager.work.Job(PullLatestTransactions, manager.pullLatestTransactions)
5858
manager.work.Job(RemoveTransactions, manager.removeTransactions)
59+
manager.work.Job(UpdateInstitutions, manager.updateInstitutions)
5960

6061
// Every 30 minutes. 0 */30 * * * *
6162

@@ -65,6 +66,7 @@ func NewJobManager(log *logrus.Entry, pool *redis.Pool, db *pg.DB, plaidClient *
6566
// Once a day. But also can be triggered by a webhook.
6667
manager.work.PeriodicallyEnqueue("0 0 0 * * *", EnqueuePullAccountBalances)
6768
manager.work.PeriodicallyEnqueue("0 0 0 * * *", EnqueuePullLatestTransactions)
69+
manager.work.PeriodicallyEnqueue("0 0 0 * * *", UpdateInstitutions)
6870

6971
manager.work.Start()
7072
log.Debug("job manager started")

0 commit comments

Comments
 (0)