Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove HostIP and SiamuxAddr from ContractMetadata #1712

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
default: minor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking this is a major change since it breaks the API.

---

# Remove HostIP and SiamuxAddr from ContractMetadata

#1712 by @chris124567

Implement https://github.com/SiaFoundation/renterd/issues/1691
4 changes: 0 additions & 4 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ type (
InitialRenterFunds types.Currency `json:"initialRenterFunds"`
Spending ContractSpending `json:"spending"`

// following fields are decorated
HostIP string `json:"hostIP"`
SiamuxAddr string `json:"siamuxAddr,omitempty"`

// following fields are only set on archived contracts
ArchivalReason string `json:"archivalReason,omitempty"`
RenewedTo types.FileContractID `json:"renewedTo,omitempty"`
Expand Down
2 changes: 0 additions & 2 deletions api/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,8 @@ func (c ContractMetadata) PrometheusMetric() (metrics []prometheus.Metric) {
{
Name: "renterd_contract",
Labels: map[string]any{
"host_ip": c.HostIP,
"state": c.State,
"host_key": c.HostKey.String(),
"siamux_addr": c.SiamuxAddr,
"contract_price": c.ContractPrice.Siacoins(),
},
Value: c.InitialRenterFunds.Siacoins(),
Expand Down
6 changes: 3 additions & 3 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (b *Bus) broadcastContract(ctx context.Context, fcid types.FileContractID)
return txn.ID(), nil
} else {
// fetch revision
rev, err := b.rhp2Client.SignedRevision(ctx, c.HostIP, c.HostKey, renterKey, fcid, time.Minute)
rev, err := b.rhp2Client.SignedRevision(ctx, host.NetAddress, c.HostKey, renterKey, fcid, time.Minute)
if err != nil {
return types.TransactionID{}, fmt.Errorf("couldn't fetch revision; %w", err)
}
Expand Down Expand Up @@ -797,15 +797,15 @@ func (b *Bus) renewContractV1(ctx context.Context, cs consensus.State, gp api.Go
renterKey := b.masterKey.DeriveContractKey(c.HostKey)

// fetch the revision
rev, err := b.rhp3Client.Revision(ctx, c.ID, c.HostKey, c.SiamuxAddr)
rev, err := b.rhp3Client.Revision(ctx, c.ID, c.HostKey, hs.SiamuxAddr())
if err != nil {
return api.ContractMetadata{}, err
}

// renew contract
gc := gouging.NewChecker(gp.GougingSettings, gp.ConsensusState)
prepareRenew := b.prepareRenew(cs, rev, hs.Address, b.w.Address(), renterFunds, minNewCollateral, endHeight, expectedNewStorage)
newRevision, txnSet, contractPrice, fundAmount, err := b.rhp3Client.Renew(ctx, gc, rev, renterKey, c.HostKey, c.SiamuxAddr, prepareRenew, b.w.SignTransaction)
newRevision, txnSet, contractPrice, fundAmount, err := b.rhp3Client.Renew(ctx, gc, rev, renterKey, c.HostKey, hs.SiamuxAddr(), prepareRenew, b.w.SignTransaction)
if err != nil {
return api.ContractMetadata{}, err
}
Expand Down
15 changes: 10 additions & 5 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) {
}
} else {
// latest revision
rev, err := b.rhp3Client.Revision(jc.Request.Context(), req.ContractID, cm.HostKey, cm.SiamuxAddr)
rev, err := b.rhp3Client.Revision(jc.Request.Context(), req.ContractID, cm.HostKey, host.Settings.SiamuxAddr())
if jc.Check("failed to fetch contract revision", err) != nil {
return
}
Expand All @@ -119,7 +119,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) {
}

// price table
pt, err := b.rhp3Client.PriceTable(jc.Request.Context(), cm.HostKey, cm.SiamuxAddr, rhp3.PreparePriceTableContractPayment(&rev, req.AccountID, rk))
pt, err := b.rhp3Client.PriceTable(jc.Request.Context(), cm.HostKey, host.Settings.SiamuxAddr(), rhp3.PreparePriceTableContractPayment(&rev, req.AccountID, rk))
if jc.Check("failed to fetch price table", err) != nil {
return
}
Expand All @@ -139,7 +139,7 @@ func (b *Bus) accountsFundHandler(jc jape.Context) {
}

// fund the account
err = b.rhp3Client.FundAccount(jc.Request.Context(), &rev, cm.HostKey, cm.SiamuxAddr, deposit, req.AccountID, pt.HostPriceTable, rk)
err = b.rhp3Client.FundAccount(jc.Request.Context(), &rev, cm.HostKey, host.Settings.SiamuxAddr(), deposit, req.AccountID, pt.HostPriceTable, rk)
if jc.Check("failed to fund account", err) != nil {
return
}
Expand Down Expand Up @@ -859,7 +859,7 @@ func (b *Bus) contractLatestRevisionHandlerGET(jc jape.Context) {
Size: revision.Filesize,
})
} else {
revision, err := b.rhp3Client.Revision(jc.Request.Context(), fcid, contract.HostKey, contract.SiamuxAddr)
revision, err := b.rhp3Client.Revision(jc.Request.Context(), fcid, contract.HostKey, host.Settings.SiamuxAddr())
if jc.Check("failed to fetch revision", err) != nil {
return
}
Expand Down Expand Up @@ -922,6 +922,11 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {
} else if jc.Check("couldn't fetch contract", err) != nil {
return
}
// host
host, err := b.store.Host(jc.Request.Context(), c.HostKey)
if jc.Check("failed to fetch host for contract", err) != nil {
return
}

// build map of uploading sectors
pending := make(map[types.Hash256]struct{})
Expand All @@ -931,7 +936,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {

// prune the contract
rk := b.masterKey.DeriveContractKey(c.HostKey)
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(pruneCtx, rk, gc, c.HostIP, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
rev, spending, pruned, remaining, err := b.rhp2Client.PruneContract(pruneCtx, rk, gc, host.NetAddress, c.HostKey, fcid, c.RevisionNumber, func(fcid types.FileContractID, roots []types.Hash256) ([]uint64, error) {
indices, err := b.store.PrunableContractRoots(ctx, fcid, roots)
if err != nil {
return nil, err
Expand Down
30 changes: 22 additions & 8 deletions internal/upload/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ var (
)

type (
HostStore interface {
Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error)
}

ContractStore interface {
RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error)
}
Expand Down Expand Up @@ -75,10 +79,10 @@ type (
cs ContractStore
cl locking.ContractLocker
hm host.HostManager
hs HostStore
logger *zap.SugaredLogger

hk types.PublicKey
siamuxAddr string
signalNewUpload chan struct{}
shutdownCtx context.Context

Expand All @@ -98,16 +102,21 @@ type (
}
)

func New(ctx context.Context, cl locking.ContractLocker, cs ContractStore, hm host.HostManager, c api.ContractMetadata, l *zap.SugaredLogger) *Uploader {
func New(ctx context.Context, cl locking.ContractLocker, cs ContractStore, hm host.HostManager, hs HostStore, c api.ContractMetadata, l *zap.SugaredLogger) (*Uploader, error) {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
host, err := hs.Host(ctx, c.HostKey)
if err != nil {
return nil, err
}

return &Uploader{
cl: cl,
cs: cs,
hm: hm,
hs: hs,
logger: l,

// static
hk: c.HostKey,
siamuxAddr: c.SiamuxAddr,
shutdownCtx: ctx,
signalNewUpload: make(chan struct{}, 1),

Expand All @@ -116,11 +125,11 @@ func New(ctx context.Context, cl locking.ContractLocker, cs ContractStore, hm ho
statsSectorUploadSpeedBytesPerMS: utils.NewDataPoints(0),

// covered by mutex
host: hm.Host(c.HostKey, c.ID, c.SiamuxAddr),
host: hm.Host(c.HostKey, c.ID, host.Settings.SiamuxAddr()),
fcid: c.ID,
endHeight: c.WindowEnd,
queue: make([]*SectorUploadReq, 0),
}
}, nil
}

func (u *Uploader) AvgUploadSpeedBytesPerMS() float64 {
Expand Down Expand Up @@ -149,14 +158,19 @@ func (u *Uploader) PublicKey() types.PublicKey {
return u.hk
}

func (u *Uploader) Refresh(c api.ContractMetadata) {
func (u *Uploader) Refresh(c api.ContractMetadata) error {
u.mu.Lock()
defer u.mu.Unlock()

u.host = u.hm.Host(c.HostKey, c.ID, c.SiamuxAddr)
host, err := u.hs.Host(u.shutdownCtx, c.HostKey)
if err != nil {
return err
}

u.host = u.hm.Host(c.HostKey, c.ID, host.Settings.SiamuxAddr())
u.fcid = c.ID
u.siamuxAddr = c.SiamuxAddr
u.endHeight = c.WindowEnd
return nil
}

func (u *Uploader) Start() {
Expand Down
14 changes: 12 additions & 2 deletions internal/upload/uploader/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ import (
func TestUploaderStopped(t *testing.T) {
cs := mocks.NewContractStore()
hm := mocks.NewHostManager()
hs := mocks.NewHostStore()
c := mocks.NewContract(types.PublicKey{1}, types.FileContractID{1})
cl := mocks.NewContractLocker()

ul := New(context.Background(), cl, cs, hm, c.Metadata(), zap.NewNop().Sugar())
hs.AddHost()
ul, err := New(context.Background(), cl, cs, hm, hs, c.Metadata(), zap.NewNop().Sugar())
if err != nil {
t.Fatal(err)
}
ul.Stop(errors.New("test"))

req := SectorUploadReq{
Expand Down Expand Up @@ -106,12 +111,17 @@ func TestHandleSectorUpload(t *testing.T) {
func TestRefreshUploader(t *testing.T) {
cs := mocks.NewContractStore()
hm := mocks.NewHostManager()
hs := mocks.NewHostStore()
cl := mocks.NewContractLocker()

// create uploader
hs.AddHost()
hk := types.PublicKey{1}
c1 := cs.AddContract(hk)
ul := New(context.Background(), cl, cs, hm, c1.Metadata(), zap.NewNop().Sugar())
ul, err := New(context.Background(), cl, cs, hm, hs, c1.Metadata(), zap.NewNop().Sugar())
if err != nil {
t.Fatal(err)
}

// renew the first contract
fmt.Println(c1.ID())
Expand Down
1 change: 0 additions & 1 deletion stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ func TestSQLContractStore(t *testing.T) {
}

// assert it's equal
c.HostIP = inserted.HostIP
if !reflect.DeepEqual(inserted, c) {
t.Fatal("contract mismatch", cmp.Diff(inserted, c))
}
Expand Down
11 changes: 0 additions & 11 deletions stores/sql/rows.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sql

import (
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)
Expand Down Expand Up @@ -56,14 +55,6 @@ func (r *ContractRow) Scan(s Scanner) error {
}

func (r *ContractRow) ContractMetadata() api.ContractMetadata {
var siamuxAddr string
if r.NetAddress != "" && r.SiamuxPort != "" {
siamuxAddr = rhpv2.HostSettings{
NetAddress: r.NetAddress,
SiaMuxPort: r.SiamuxPort,
}.SiamuxAddr()
}

spending := api.ContractSpending{
Uploads: types.Currency(r.UploadSpending),
FundAccount: types.Currency(r.FundAccountSpending),
Expand All @@ -73,7 +64,6 @@ func (r *ContractRow) ContractMetadata() api.ContractMetadata {

return api.ContractMetadata{
ID: types.FileContractID(r.FCID),
HostIP: r.NetAddress,
HostKey: types.PublicKey(r.HostKey),
V2: r.V2,

Expand All @@ -86,7 +76,6 @@ func (r *ContractRow) ContractMetadata() api.ContractMetadata {
RenewedTo: types.FileContractID(r.RenewedTo),
RevisionHeight: r.RevisionHeight,
RevisionNumber: r.RevisionNumber,
SiamuxAddr: siamuxAddr,
Size: r.Size,
Spending: spending,
StartHeight: r.StartHeight,
Expand Down
22 changes: 15 additions & 7 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
type (
uploadManager struct {
hm host.HostManager
hs HostStore
mm memory.MemoryManager
os ObjectStore
cl ContractLocker
Expand Down Expand Up @@ -138,7 +139,7 @@ func (w *Worker) initUploadManager(uploadKey *utils.UploadKey, maxMemory, maxOve
panic("upload manager already initialized") // developer error
}

w.uploadManager = newUploadManager(w.shutdownCtx, uploadKey, w, w.bus, w.bus, w.bus, maxMemory, maxOverdrive, overdriveTimeout, logger)
w.uploadManager = newUploadManager(w.shutdownCtx, uploadKey, w, w.bus, w.bus, w.bus, w.bus, maxMemory, maxOverdrive, overdriveTimeout, logger)
}

func (w *Worker) upload(ctx context.Context, bucket, key string, rs api.RedundancySettings, r io.Reader, contracts []api.ContractMetadata, opts ...UploadOption) (_ string, err error) {
Expand Down Expand Up @@ -289,10 +290,11 @@ func (w *Worker) uploadPackedSlab(ctx context.Context, mem memory.Memory, ps api
return nil
}

func newUploadManager(ctx context.Context, uploadKey *utils.UploadKey, hm host.HostManager, os ObjectStore, cl ContractLocker, cs ContractStore, maxMemory, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.Logger) *uploadManager {
func newUploadManager(ctx context.Context, uploadKey *utils.UploadKey, hm host.HostManager, hs HostStore, os ObjectStore, cl ContractLocker, cs ContractStore, maxMemory, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.Logger) *uploadManager {
logger = logger.Named("uploadmanager")
return &uploadManager{
hm: hm,
hs: hs,
mm: memory.NewManager(maxMemory, logger),
os: os,
cl: cl,
Expand All @@ -312,8 +314,8 @@ func newUploadManager(ctx context.Context, uploadKey *utils.UploadKey, hm host.H
}
}

func (mgr *uploadManager) newUploader(cl ContractLocker, cs ContractStore, hm host.HostManager, c api.ContractMetadata) *uploader.Uploader {
return uploader.New(mgr.shutdownCtx, cl, cs, hm, c, mgr.logger)
func (mgr *uploadManager) newUploader(cl ContractLocker, cs ContractStore, hm host.HostManager, hs HostStore, c api.ContractMetadata) (*uploader.Uploader, error) {
return uploader.New(mgr.shutdownCtx, cl, cs, hm, hs, c, mgr.logger)
}

func (mgr *uploadManager) Stats() uploadManagerStats {
Expand Down Expand Up @@ -649,7 +651,9 @@ func (mgr *uploadManager) newUpload(totalShards int, contracts []api.ContractMet
defer mgr.mu.Unlock()

// refresh the uploaders
mgr.refreshUploaders(contracts, bh)
if err := mgr.refreshUploaders(contracts, bh); err != nil {
return nil, fmt.Errorf("failed to refresh uploaders: %w", err)
}

// check if we have enough contracts
if len(contracts) < totalShards {
Expand All @@ -671,7 +675,7 @@ func (mgr *uploadManager) newUpload(totalShards int, contracts []api.ContractMet
}, nil
}

func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh uint64) {
func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh uint64) error {
// build map of renewals
renewals := make(map[types.FileContractID]api.ContractMetadata)
for _, c := range contracts {
Expand Down Expand Up @@ -706,13 +710,17 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
// add missing uploaders
for _, c := range contracts {
if _, exists := existing[c.ID]; !exists && bh < c.WindowEnd {
uploader := mgr.newUploader(mgr.cl, mgr.cs, mgr.hm, c)
uploader, err := mgr.newUploader(mgr.cl, mgr.cs, mgr.hm, mgr.hs, c)
if err != nil {
return err
}
refreshed = append(refreshed, uploader)
go uploader.Start()
}
}

mgr.uploaders = refreshed
return nil
}

func (u *upload) newSlabUpload(ctx context.Context, shards [][]byte, uploaders []*uploader.Uploader, mem memory.Memory, maxOverdrive uint64) (*slabUpload, chan uploader.SectorUploadResp) {
Expand Down
Loading
Loading