Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisting Storage Deals #1526

Merged
merged 1 commit into from
Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type DaemonInitConfig struct {
ClusterNightly bool
// AutoSealIntervalSeconds, when set, configures the daemon to check for and seal any staged sectors on an interval
AutoSealIntervalSeconds uint
DefaultAddress address.Address
}

// DaemonInitOpt is the signature a daemon init option has to fulfill.
Expand Down Expand Up @@ -85,3 +86,10 @@ func AutoSealIntervalSeconds(autoSealIntervalSeconds uint) DaemonInitOpt {
dc.AutoSealIntervalSeconds = autoSealIntervalSeconds
}
}

// DefaultAddress sets the daemons's default address to the provided address.
func DefaultAddress(address address.Address) DaemonInitOpt {
return func(dc *DaemonInitConfig) {
dc.DefaultAddress = address
}
}
8 changes: 8 additions & 0 deletions api/impl/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (nd *nodeDaemon) Init(ctx context.Context, opts ...api.DaemonInitOpt) error
}
}

if cfg.DefaultAddress != (address.Address{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newConfig := rep.Config()
newConfig.Wallet.DefaultAddress = cfg.DefaultAddress
if err := rep.ReplaceConfig(newConfig); err != nil {
return err
}
}

if cfg.ClusterTest && cfg.ClusterNightly {
return fmt.Errorf(`cannot use both "--cluster-test" and "--cluster-nightly" options`)
}
Expand Down
6 changes: 3 additions & 3 deletions commands/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ be 2, 1 hour would be 120, and 1 day would be 2880.
Type: storage.DealResponse{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, resp *storage.DealResponse) error {
fmt.Fprintf(w, "State: %s\n", resp.State.String()) // nolint: errcheck
fmt.Fprintf(w, "Message: %s\n", resp.Message) // nolint: errcheck
fmt.Fprintf(w, "DealID: %s\n", resp.Proposal.String()) // nolint: errcheck
fmt.Fprintf(w, "State: %s\n", resp.State.String()) // nolint: errcheck
fmt.Fprintf(w, "Message: %s\n", resp.Message) // nolint: errcheck
fmt.Fprintf(w, "DealID: %s\n", resp.ProposalCid.String()) // nolint: errcheck
return nil
}),
},
Expand Down
81 changes: 64 additions & 17 deletions commands/client_daemon_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,82 @@
package commands

import (
"fmt"
"strings"
rosalinekarr marked this conversation as resolved.
Show resolved Hide resolved
"sync"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/filecoin-project/go-filecoin/fixtures"
th "github.com/filecoin-project/go-filecoin/testhelpers"
"github.com/stretchr/testify/assert"
)

func TestListAsks(t *testing.T) {
t.Parallel()
assert := assert.New(t)

peer := th.NewDaemon(t, th.WithMiner(fixtures.TestMiners[0]), th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer peer.ShutdownSuccess()
d := th.NewDaemon(t, th.KeyFile(fixtures.KeyFilePaths()[2])).Start()
defer d.ShutdownSuccess()
peer.ConnectSuccess(d)
minerDaemon := th.NewDaemon(t,
lkowalick marked this conversation as resolved.
Show resolved Hide resolved
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
).Start()
defer minerDaemon.ShutdownSuccess()

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")

listAsksOutput := minerDaemon.RunSuccess("client", "list-asks").ReadStdoutTrimNewlines()
assert.Equal(fixtures.TestMiners[0]+" 000 20 11", listAsksOutput)
}

func TestStorageDealsAfterRestart(t *testing.T) {
t.Skip("Temporarily skipped to be fixed in subsequent refactor work")

minerDaemon := th.NewDaemon(t,
th.WithMiner(fixtures.TestMiners[0]),
th.KeyFile(fixtures.KeyFilePaths()[0]),
th.DefaultAddress(fixtures.TestAddresses[0]),
th.AutoSealInterval("1"),
).Start()
defer minerDaemon.ShutdownSuccess()

clientDaemon := th.NewDaemon(t,
th.KeyFile(fixtures.KeyFilePaths()[1]),
th.DefaultAddress(fixtures.TestAddresses[1]),
).Start()
defer clientDaemon.ShutdownSuccess()

minerDaemon.UpdatePeerID()
minerDaemon.RunSuccess("mining", "start")

minerDaemon.ConnectSuccess(clientDaemon)

minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10")
dataCid := clientDaemon.RunWithStdin(strings.NewReader("HODLHODLHODL"), "client", "import").ReadStdoutTrimNewlines()

proposeDealOutput := clientDaemon.RunSuccess("client", "propose-storage-deal", fixtures.TestMiners[0], dataCid, "0", "5").ReadStdoutTrimNewlines()

splitOnSpace := strings.Split(proposeDealOutput, " ")

dealCid := splitOnSpace[len(splitOnSpace)-1]

minerDaemon.Restart()
minerDaemon.RunSuccess("mining", "start")

// create a miner with one ask
minerAddr := d.CreateMinerAddr(peer, fixtures.TestAddresses[2])
d.CreateAsk(peer, minerAddr.String(), fixtures.TestAddresses[2], "20", "10")
clientDaemon.Restart()
lkowalick marked this conversation as resolved.
Show resolved Hide resolved

// check ls
expectedBaseHeight := 2
expectedExpiry := expectedBaseHeight + 10
ls := d.RunSuccess("client", "list-asks")
expectedResult := fmt.Sprintf("%s 000 20 %d", minerAddr.String(), expectedExpiry)
assert.Equal(expectedResult, strings.Trim(ls.ReadStdout(), "\n"))
minerDaemon.ConnectSuccess(clientDaemon)

var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
queryDealOutput := clientDaemon.RunSuccess("client", "query-storage-deal", dealCid).ReadStdout()
lkowalick marked this conversation as resolved.
Show resolved Hide resolved
if strings.Contains(queryDealOutput, "posted") {
wg.Done()
break
}
time.Sleep(500 * time.Millisecond)
}
}()
th.WaitTimeout(&wg, 120*time.Second)
}
11 changes: 11 additions & 0 deletions commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var initCmd = &cmds.Command{
cmdkit.StringOption(GenesisFile, "path of file or HTTP(S) URL containing archive of genesis block DAG data"),
cmdkit.StringOption(PeerKeyFile, "path of file containing key to use for new node's libp2p identity"),
cmdkit.StringOption(WithMiner, "when set, creates a custom genesis block with a pre generated miner account, requires running the daemon using dev mode (--dev)"),
cmdkit.StringOption(DefaultAddress, "when set, sets the daemons's default address to the provided address"),
cmdkit.UintOption(AutoSealIntervalSeconds, "when set to a number > 0, configures the daemon to check for and seal any staged sectors on an interval.").WithDefault(uint(120)),
cmdkit.BoolOption(ClusterTest, "when set, populates config bootstrap addrs with the dns multiaddrs of the test cluster and other test cluster specific bootstrap parameters."),
cmdkit.BoolOption(ClusterNightly, "when set, populates config bootstrap addrs with the dns multiaddrs of the nightly cluster and other nightly cluster specific bootstrap parameters"),
Expand All @@ -45,6 +46,15 @@ var initCmd = &cmds.Command{
}
}

var defaultAddress address.Address
if m, ok := req.Options[DefaultAddress].(string); ok {
var err error
defaultAddress, err = address.NewFromString(m)
if err != nil {
return err
}
}

return GetAPI(env).Daemon().Init(
req.Context,
api.RepoDir(repoDir),
Expand All @@ -54,6 +64,7 @@ var initCmd = &cmds.Command{
api.ClusterTest(clusterTest),
api.ClusterNightly(clusterNightly),
api.AutoSealIntervalSeconds(autoSealIntervalSeconds),
api.DefaultAddress(defaultAddress),
)
},
Encoders: cmds.EncoderMap{
Expand Down
3 changes: 3 additions & 0 deletions commands/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
// WithMiner when set, creates a custom genesis block with a pre generated miner account, requires to run the daemon using dev mode (--dev)
WithMiner = "with-miner"

// DefaultAddress when set, sets the daemons's default address to the provided address
DefaultAddress = "default-address"

// GenesisFile is the path of file containing archive of genesis block DAG data
GenesisFile = "genesisfile"

Expand Down
2 changes: 1 addition & 1 deletion node/block_propagate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions)
)
seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.PlumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), minerNode.PlumbingAPI)
assertions.NoError(err)
clientNode := MakeNodeWithChainSeed(t, seed, configOpts)
nodes := []*Node{minerNode, clientNode}
Expand Down
8 changes: 6 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,11 @@ func (node *Node) Start(ctx context.Context) error {
node.Host(),
node.Lookup(),
node.CallQueryMethod)
node.StorageMinerClient = storage.NewClient(cni)
var err error
node.StorageMinerClient, err = storage.NewClient(cni, node.Repo.ClientDealsDatastore())
if err != nil {
return errors.Wrap(err, "Could not make new storage client")
}

node.RetrievalClient = retrieval.NewClient(node)
node.RetrievalMiner = retrieval.NewMiner(node)
Expand Down Expand Up @@ -846,7 +850,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e
return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup")
}

miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.PlumbingAPI)
miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.MinerDealsDatastore(), node.Repo.DealsAwaitingSealDatastore(), node.PlumbingAPI)
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate storage miner")
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestNodeStartMining(t *testing.T) {

seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI)
assert.NoError(err)

assert.NoError(minerNode.Start(ctx))
Expand Down
87 changes: 67 additions & 20 deletions protocol/storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ import (
"math/big"
"sync"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor"
"gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors"
"gx/ipfs/QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8/go-multistream"
"gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query"

"github.com/filecoin-project/go-filecoin/abi"
"github.com/filecoin-project/go-filecoin/actor/builtin/miner"
"github.com/filecoin-project/go-filecoin/address"
cbu "github.com/filecoin-project/go-filecoin/cborutil"
"github.com/filecoin-project/go-filecoin/lookup"
"github.com/filecoin-project/go-filecoin/repo"
"github.com/filecoin-project/go-filecoin/types"
)

Expand All @@ -29,26 +32,36 @@ type clientNode interface {
GetAskPrice(ctx context.Context, miner address.Address, askid uint64) (*types.AttoFIL, error)
}

type clientDeal struct {
Miner address.Address
Proposal *DealProposal
Response *DealResponse
}

// Client is used to make deals directly with storage miners.
type Client struct {
deals map[cid.Cid]*clientDealState
deals map[cid.Cid]*clientDeal
dealsDs repo.Datastore
dealsLk sync.Mutex

lkowalick marked this conversation as resolved.
Show resolved Hide resolved
node clientNode
}

type clientDealState struct {
miner address.Address
proposal *DealProposal
lastState *DealResponse
func init() {
cbor.RegisterCborType(clientDeal{})
}

// NewClient creaters a new storage miner client.
func NewClient(nd clientNode) *Client {
return &Client{
deals: make(map[cid.Cid]*clientDealState),
node: nd,
// NewClient creates a new storage client.
func NewClient(nd clientNode, dealsDs repo.Datastore) (*Client, error) {
smc := &Client{
deals: make(map[cid.Cid]*clientDeal),
node: nd,
dealsDs: dealsDs,
}
if err := smc.loadDeals(); err != nil {
return nil, errors.Wrap(err, "failed to load client deals")
}
return smc, nil
}

// ProposeDeal is
Expand Down Expand Up @@ -114,18 +127,17 @@ func (smc *Client) ProposeDeal(ctx context.Context, miner address.Address, data
func (smc *Client) recordResponse(resp *DealResponse, miner address.Address, p *DealProposal) error {
smc.dealsLk.Lock()
defer smc.dealsLk.Unlock()
_, ok := smc.deals[resp.Proposal]
_, ok := smc.deals[resp.ProposalCid]
if ok {
return fmt.Errorf("deal [%s] is already in progress", resp.Proposal.String())
return fmt.Errorf("deal [%s] is already in progress", resp.ProposalCid.String())
}

smc.deals[resp.Proposal] = &clientDealState{
lastState: resp,
miner: miner,
proposal: p,
smc.deals[resp.ProposalCid] = &clientDeal{
Miner: miner,
Proposal: p,
Response: resp,
}

return nil
return smc.saveDeal(resp.ProposalCid)
}

func (smc *Client) checkDealResponse(ctx context.Context, resp *DealResponse) error {
Expand All @@ -149,7 +161,7 @@ func (smc *Client) minerForProposal(c cid.Cid) (address.Address, error) {
return address.Address{}, fmt.Errorf("no such proposal by cid: %s", c)
}

return st.miner, nil
return st.Miner, nil
}

// QueryDeal queries an in-progress proposal.
Expand Down Expand Up @@ -237,3 +249,38 @@ func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Addres

return ask.Price, nil
}

func (smc *Client) loadDeals() error {
res, err := smc.dealsDs.Query(query.Query{})
if err != nil {
return errors.Wrap(err, "failed to query deals from datastore")
}

smc.deals = make(map[cid.Cid]*clientDeal)

for entry := range res.Next() {
var deal clientDeal
if err := cbor.DecodeInto(entry.Value, &deal); err != nil {
return errors.Wrap(err, "failed to unmarshal deals from datastore")
}
smc.deals[deal.Response.ProposalCid] = &deal
}

return nil
}

func (smc *Client) saveDeal(cid cid.Cid) error {
deal, ok := smc.deals[cid]
if !ok {
return errors.Errorf("Could not find client deal with cid: %s", cid.String())
}
datum, err := cbor.DumpObject(deal)
if err != nil {
return errors.Wrap(err, "could not marshal storageDeal")
}
err = smc.dealsDs.Put(datastore.NewKey(cid.String()), datum)
if err != nil {
return errors.Wrap(err, "could not save client deal to disk, in-memory deals differ from persisted deals!")
}
return nil
}
Loading