diff --git a/api/daemon.go b/api/daemon.go index d94120d7c0..2383ede841 100644 --- a/api/daemon.go +++ b/api/daemon.go @@ -32,6 +32,7 @@ type DaemonInitConfig struct { ClusterNightly bool // AutoSealIntervalSeconds, when set, configures the daemon to check for and seal any staged sectors on an interval AutoSealIntervalSeconds uint + DefaultAddress address.Address } // DaemonInitOpt is the signature a daemon init option has to fulfill. @@ -85,3 +86,10 @@ func AutoSealIntervalSeconds(autoSealIntervalSeconds uint) DaemonInitOpt { dc.AutoSealIntervalSeconds = autoSealIntervalSeconds } } + +// DefaultAddress sets the daemons's default address to the provided address. +func DefaultAddress(address address.Address) DaemonInitOpt { + return func(dc *DaemonInitConfig) { + dc.DefaultAddress = address + } +} diff --git a/api/impl/daemon.go b/api/impl/daemon.go index 83fb2fea35..79625d21ac 100644 --- a/api/impl/daemon.go +++ b/api/impl/daemon.go @@ -99,6 +99,14 @@ func (nd *nodeDaemon) Init(ctx context.Context, opts ...api.DaemonInitOpt) error } } + if cfg.DefaultAddress != (address.Address{}) { + newConfig := rep.Config() + newConfig.Wallet.DefaultAddress = cfg.DefaultAddress + if err := rep.ReplaceConfig(newConfig); err != nil { + return err + } + } + if cfg.ClusterTest && cfg.ClusterNightly { return fmt.Errorf(`cannot use both "--cluster-test" and "--cluster-nightly" options`) } diff --git a/commands/client.go b/commands/client.go index d701eb5c93..f07f28b415 100644 --- a/commands/client.go +++ b/commands/client.go @@ -142,9 +142,9 @@ be 2, 1 hour would be 120, and 1 day would be 2880. Type: storage.DealResponse{}, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, resp *storage.DealResponse) error { - fmt.Fprintf(w, "State: %s\n", resp.State.String()) // nolint: errcheck - fmt.Fprintf(w, "Message: %s\n", resp.Message) // nolint: errcheck - fmt.Fprintf(w, "DealID: %s\n", resp.Proposal.String()) // nolint: errcheck + fmt.Fprintf(w, "State: %s\n", resp.State.String()) // nolint: errcheck + fmt.Fprintf(w, "Message: %s\n", resp.Message) // nolint: errcheck + fmt.Fprintf(w, "DealID: %s\n", resp.ProposalCid.String()) // nolint: errcheck return nil }), }, diff --git a/commands/client_daemon_test.go b/commands/client_daemon_test.go index b6850cf569..28efe78dce 100644 --- a/commands/client_daemon_test.go +++ b/commands/client_daemon_test.go @@ -1,35 +1,82 @@ package commands import ( - "fmt" "strings" + "sync" "testing" - - "github.com/stretchr/testify/assert" + "time" "github.com/filecoin-project/go-filecoin/fixtures" th "github.com/filecoin-project/go-filecoin/testhelpers" + "github.com/stretchr/testify/assert" ) func TestListAsks(t *testing.T) { t.Parallel() assert := assert.New(t) - peer := th.NewDaemon(t, th.WithMiner(fixtures.TestMiners[0]), th.KeyFile(fixtures.KeyFilePaths()[2])).Start() - defer peer.ShutdownSuccess() - d := th.NewDaemon(t, th.KeyFile(fixtures.KeyFilePaths()[2])).Start() - defer d.ShutdownSuccess() - peer.ConnectSuccess(d) + minerDaemon := th.NewDaemon(t, + th.WithMiner(fixtures.TestMiners[0]), + th.KeyFile(fixtures.KeyFilePaths()[0]), + th.DefaultAddress(fixtures.TestAddresses[0]), + ).Start() + defer minerDaemon.ShutdownSuccess() + + minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10") + + listAsksOutput := minerDaemon.RunSuccess("client", "list-asks").ReadStdoutTrimNewlines() + assert.Equal(fixtures.TestMiners[0]+" 000 20 11", listAsksOutput) +} + +func TestStorageDealsAfterRestart(t *testing.T) { + t.Skip("Temporarily skipped to be fixed in subsequent refactor work") + + minerDaemon := th.NewDaemon(t, + th.WithMiner(fixtures.TestMiners[0]), + th.KeyFile(fixtures.KeyFilePaths()[0]), + th.DefaultAddress(fixtures.TestAddresses[0]), + th.AutoSealInterval("1"), + ).Start() + defer minerDaemon.ShutdownSuccess() + + clientDaemon := th.NewDaemon(t, + th.KeyFile(fixtures.KeyFilePaths()[1]), + th.DefaultAddress(fixtures.TestAddresses[1]), + ).Start() + defer clientDaemon.ShutdownSuccess() + + minerDaemon.UpdatePeerID() + minerDaemon.RunSuccess("mining", "start") + + minerDaemon.ConnectSuccess(clientDaemon) + + minerDaemon.CreateAsk(minerDaemon, fixtures.TestMiners[0], fixtures.TestAddresses[0], "20", "10") + dataCid := clientDaemon.RunWithStdin(strings.NewReader("HODLHODLHODL"), "client", "import").ReadStdoutTrimNewlines() + + proposeDealOutput := clientDaemon.RunSuccess("client", "propose-storage-deal", fixtures.TestMiners[0], dataCid, "0", "5").ReadStdoutTrimNewlines() + + splitOnSpace := strings.Split(proposeDealOutput, " ") + + dealCid := splitOnSpace[len(splitOnSpace)-1] + + minerDaemon.Restart() + minerDaemon.RunSuccess("mining", "start") - // create a miner with one ask - minerAddr := d.CreateMinerAddr(peer, fixtures.TestAddresses[2]) - d.CreateAsk(peer, minerAddr.String(), fixtures.TestAddresses[2], "20", "10") + clientDaemon.Restart() - // check ls - expectedBaseHeight := 2 - expectedExpiry := expectedBaseHeight + 10 - ls := d.RunSuccess("client", "list-asks") - expectedResult := fmt.Sprintf("%s 000 20 %d", minerAddr.String(), expectedExpiry) - assert.Equal(expectedResult, strings.Trim(ls.ReadStdout(), "\n")) + minerDaemon.ConnectSuccess(clientDaemon) + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + queryDealOutput := clientDaemon.RunSuccess("client", "query-storage-deal", dealCid).ReadStdout() + if strings.Contains(queryDealOutput, "posted") { + wg.Done() + break + } + time.Sleep(500 * time.Millisecond) + } + }() + th.WaitTimeout(&wg, 120*time.Second) } diff --git a/commands/init.go b/commands/init.go index 096c187fde..10629cc46f 100644 --- a/commands/init.go +++ b/commands/init.go @@ -20,6 +20,7 @@ var initCmd = &cmds.Command{ cmdkit.StringOption(GenesisFile, "path of file or HTTP(S) URL containing archive of genesis block DAG data"), cmdkit.StringOption(PeerKeyFile, "path of file containing key to use for new node's libp2p identity"), cmdkit.StringOption(WithMiner, "when set, creates a custom genesis block with a pre generated miner account, requires running the daemon using dev mode (--dev)"), + cmdkit.StringOption(DefaultAddress, "when set, sets the daemons's default address to the provided address"), cmdkit.UintOption(AutoSealIntervalSeconds, "when set to a number > 0, configures the daemon to check for and seal any staged sectors on an interval.").WithDefault(uint(120)), cmdkit.BoolOption(ClusterTest, "when set, populates config bootstrap addrs with the dns multiaddrs of the test cluster and other test cluster specific bootstrap parameters."), cmdkit.BoolOption(ClusterNightly, "when set, populates config bootstrap addrs with the dns multiaddrs of the nightly cluster and other nightly cluster specific bootstrap parameters"), @@ -45,6 +46,15 @@ var initCmd = &cmds.Command{ } } + var defaultAddress address.Address + if m, ok := req.Options[DefaultAddress].(string); ok { + var err error + defaultAddress, err = address.NewFromString(m) + if err != nil { + return err + } + } + return GetAPI(env).Daemon().Init( req.Context, api.RepoDir(repoDir), @@ -54,6 +64,7 @@ var initCmd = &cmds.Command{ api.ClusterTest(clusterTest), api.ClusterNightly(clusterNightly), api.AutoSealIntervalSeconds(autoSealIntervalSeconds), + api.DefaultAddress(defaultAddress), ) }, Encoders: cmds.EncoderMap{ diff --git a/commands/main.go b/commands/main.go index c9fb8543e5..471b920732 100644 --- a/commands/main.go +++ b/commands/main.go @@ -55,6 +55,9 @@ const ( // WithMiner when set, creates a custom genesis block with a pre generated miner account, requires to run the daemon using dev mode (--dev) WithMiner = "with-miner" + // DefaultAddress when set, sets the daemons's default address to the provided address + DefaultAddress = "default-address" + // GenesisFile is the path of file containing archive of genesis block DAG data GenesisFile = "genesisfile" diff --git a/node/block_propagate_test.go b/node/block_propagate_test.go index 09f26b95ac..aef7b6b121 100644 --- a/node/block_propagate_test.go +++ b/node/block_propagate_test.go @@ -140,7 +140,7 @@ func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions) ) seed.GiveKey(t, minerNode, 0) mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0) - _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.PlumbingAPI) + _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), minerNode.PlumbingAPI) assertions.NoError(err) clientNode := MakeNodeWithChainSeed(t, seed, configOpts) nodes := []*Node{minerNode, clientNode} diff --git a/node/node.go b/node/node.go index ce8f8acad6..833b6a3156 100644 --- a/node/node.go +++ b/node/node.go @@ -422,7 +422,11 @@ func (node *Node) Start(ctx context.Context) error { node.Host(), node.Lookup(), node.CallQueryMethod) - node.StorageMinerClient = storage.NewClient(cni) + var err error + node.StorageMinerClient, err = storage.NewClient(cni, node.Repo.ClientDealsDatastore()) + if err != nil { + return errors.Wrap(err, "Could not make new storage client") + } node.RetrievalClient = retrieval.NewClient(node) node.RetrievalMiner = retrieval.NewMiner(node) @@ -846,7 +850,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup") } - miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.PlumbingAPI) + miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.MinerDealsDatastore(), node.Repo.DealsAwaitingSealDatastore(), node.PlumbingAPI) if err != nil { return nil, errors.Wrap(err, "failed to instantiate storage miner") } diff --git a/node/node_test.go b/node/node_test.go index 038d172cec..60e579cff1 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -161,7 +161,7 @@ func TestNodeStartMining(t *testing.T) { seed.GiveKey(t, minerNode, 0) mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0) - _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI) + _, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI) assert.NoError(err) assert.NoError(minerNode.Start(ctx)) diff --git a/protocol/storage/client.go b/protocol/storage/client.go index 1ca61e89d9..5ffa2246e1 100644 --- a/protocol/storage/client.go +++ b/protocol/storage/client.go @@ -6,18 +6,21 @@ import ( "math/big" "sync" - cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor" "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" "gx/ipfs/QmabLh8TrJ3emfAoQk5AbqbLTbMyj7XqumMFmAFxa9epo8/go-multistream" "gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query" "github.com/filecoin-project/go-filecoin/abi" "github.com/filecoin-project/go-filecoin/actor/builtin/miner" "github.com/filecoin-project/go-filecoin/address" cbu "github.com/filecoin-project/go-filecoin/cborutil" "github.com/filecoin-project/go-filecoin/lookup" + "github.com/filecoin-project/go-filecoin/repo" "github.com/filecoin-project/go-filecoin/types" ) @@ -29,26 +32,36 @@ type clientNode interface { GetAskPrice(ctx context.Context, miner address.Address, askid uint64) (*types.AttoFIL, error) } +type clientDeal struct { + Miner address.Address + Proposal *DealProposal + Response *DealResponse +} + // Client is used to make deals directly with storage miners. type Client struct { - deals map[cid.Cid]*clientDealState + deals map[cid.Cid]*clientDeal + dealsDs repo.Datastore dealsLk sync.Mutex node clientNode } -type clientDealState struct { - miner address.Address - proposal *DealProposal - lastState *DealResponse +func init() { + cbor.RegisterCborType(clientDeal{}) } -// NewClient creaters a new storage miner client. -func NewClient(nd clientNode) *Client { - return &Client{ - deals: make(map[cid.Cid]*clientDealState), - node: nd, +// NewClient creates a new storage client. +func NewClient(nd clientNode, dealsDs repo.Datastore) (*Client, error) { + smc := &Client{ + deals: make(map[cid.Cid]*clientDeal), + node: nd, + dealsDs: dealsDs, + } + if err := smc.loadDeals(); err != nil { + return nil, errors.Wrap(err, "failed to load client deals") } + return smc, nil } // ProposeDeal is @@ -114,18 +127,17 @@ func (smc *Client) ProposeDeal(ctx context.Context, miner address.Address, data func (smc *Client) recordResponse(resp *DealResponse, miner address.Address, p *DealProposal) error { smc.dealsLk.Lock() defer smc.dealsLk.Unlock() - _, ok := smc.deals[resp.Proposal] + _, ok := smc.deals[resp.ProposalCid] if ok { - return fmt.Errorf("deal [%s] is already in progress", resp.Proposal.String()) + return fmt.Errorf("deal [%s] is already in progress", resp.ProposalCid.String()) } - smc.deals[resp.Proposal] = &clientDealState{ - lastState: resp, - miner: miner, - proposal: p, + smc.deals[resp.ProposalCid] = &clientDeal{ + Miner: miner, + Proposal: p, + Response: resp, } - - return nil + return smc.saveDeal(resp.ProposalCid) } func (smc *Client) checkDealResponse(ctx context.Context, resp *DealResponse) error { @@ -149,7 +161,7 @@ func (smc *Client) minerForProposal(c cid.Cid) (address.Address, error) { return address.Address{}, fmt.Errorf("no such proposal by cid: %s", c) } - return st.miner, nil + return st.Miner, nil } // QueryDeal queries an in-progress proposal. @@ -237,3 +249,38 @@ func (cni *ClientNodeImpl) GetAskPrice(ctx context.Context, maddr address.Addres return ask.Price, nil } + +func (smc *Client) loadDeals() error { + res, err := smc.dealsDs.Query(query.Query{}) + if err != nil { + return errors.Wrap(err, "failed to query deals from datastore") + } + + smc.deals = make(map[cid.Cid]*clientDeal) + + for entry := range res.Next() { + var deal clientDeal + if err := cbor.DecodeInto(entry.Value, &deal); err != nil { + return errors.Wrap(err, "failed to unmarshal deals from datastore") + } + smc.deals[deal.Response.ProposalCid] = &deal + } + + return nil +} + +func (smc *Client) saveDeal(cid cid.Cid) error { + deal, ok := smc.deals[cid] + if !ok { + return errors.Errorf("Could not find client deal with cid: %s", cid.String()) + } + datum, err := cbor.DumpObject(deal) + if err != nil { + return errors.Wrap(err, "could not marshal storageDeal") + } + err = smc.dealsDs.Put(datastore.NewKey(cid.String()), datum) + if err != nil { + return errors.Wrap(err, "could not save client deal to disk, in-memory deals differ from persisted deals!") + } + return nil +} diff --git a/protocol/storage/miner.go b/protocol/storage/miner.go index 20db4da33d..7e6e813826 100644 --- a/protocol/storage/miner.go +++ b/protocol/storage/miner.go @@ -2,7 +2,10 @@ package storage import ( "context" + "encoding/json" "fmt" + "github.com/filecoin-project/go-filecoin/repo" + "github.com/filecoin-project/go-filecoin/util/convert" "math/big" "math/rand" "strconv" @@ -13,13 +16,16 @@ import ( unixfs "gx/ipfs/QmQXze9tG878pa4Euya4rrDpyTNX3kQe4dhCaBzBozGgpe/go-unixfs" "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" hamt "gx/ipfs/QmRXf2uUSdGSunRJsM9wXSUNVwLUGCY3So5fAs7h2CBJVf/go-hamt-ipld" + cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor" dag "gx/ipfs/QmTQdH4848iTVCJmKXYyRiK72HufWTLYQQ8iN3JaQ8K1Hq/go-merkledag" "gx/ipfs/QmVmDhyTTUcQXFD1rRQ64fGLMSAoaQvNH3hwuaCFAPq2hy/errors" bserv "gx/ipfs/QmYPZzd9VqmJDwxUnThfeSbV1Y5o53aVPDijTB7j7rS9Ep/go-blockservice" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" - host "gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host" + "gx/ipfs/QmaoXrM4Z41PD48JY36YqQGKQpLGjyLA2cKcLsES7YddAq/go-libp2p-host" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore" + "gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query" "github.com/filecoin-project/go-filecoin/abi" "github.com/filecoin-project/go-filecoin/actor/builtin/miner" @@ -42,28 +48,31 @@ const queryDealProtocol = protocol.ID("/fil/storage/qry/1.0.0") const submitPostGasPrice = 0 const submitPostGasLimit = 100000000000 +const dealsAwatingSeal = "dealsAwaitingSeal" + // Miner represents a storage miner. type Miner struct { minerAddr address.Address minerOwnerAddr address.Address // deals is a list of deals we made. It is indexed by the CID of the proposal. - deals map[cid.Cid]*storageDealState + deals map[cid.Cid]*storageDeal + dealsDs repo.Datastore dealsLk sync.Mutex postInProcessLk sync.Mutex postInProcess *types.BlockHeight - dealsAwaitingSeal *dealsAwaitingSealStruct + dealsAwaitingSealDs repo.Datastore + dealsAwaitingSeal *dealsAwaitingSealStruct plumbingAPI plumbing node node } -type storageDealState struct { - proposal *DealProposal - - state *DealResponse +type storageDeal struct { + Proposal *DealProposal + Response *DealResponse } // plumbing is the subset of the plumbing API that storage.Miner needs. @@ -95,19 +104,33 @@ type generatePostInput struct { sectorID uint64 } +func init() { + cbor.RegisterCborType(storageDeal{}) + cbor.RegisterCborType(dealsAwaitingSealStruct{}) +} + // NewMiner is -func NewMiner(ctx context.Context, minerAddr, minerOwnerAddr address.Address, nd node, plumbingAPI plumbing) (*Miner, error) { +func NewMiner(ctx context.Context, minerAddr, minerOwnerAddr address.Address, nd node, dealsDs repo.Datastore, dealsAwaitingSealDs repo.Datastore, plumbingAPI plumbing) (*Miner, error) { sm := &Miner{ - minerAddr: minerAddr, - minerOwnerAddr: minerOwnerAddr, - deals: make(map[cid.Cid]*storageDealState), - node: nd, - plumbingAPI: plumbingAPI, + minerAddr: minerAddr, + minerOwnerAddr: minerOwnerAddr, + deals: make(map[cid.Cid]*storageDeal), + plumbingAPI: plumbingAPI, + dealsDs: dealsDs, + dealsAwaitingSealDs: dealsAwaitingSealDs, + node: nd, + } + + if err := sm.loadDealsAwaitingSeal(); err != nil { + return nil, errors.Wrap(err, "failed to load dealAwaitingSeal when creating miner") } - sm.dealsAwaitingSeal = newDealsAwaitingSeal() sm.dealsAwaitingSeal.onSuccess = sm.onCommitSuccess sm.dealsAwaitingSeal.onFail = sm.onCommitFail + if err := sm.loadDeals(); err != nil { + return nil, errors.Wrap(err, "failed to load miner deals when creating miner") + } + nd.Host().SetStreamHandler(makeDealProtocol, sm.handleMakeDeal) nd.Host().SetStreamHandler(queryDealProtocol, sm.handleQueryDeal) @@ -154,45 +177,53 @@ func (sm *Miner) acceptProposal(ctx context.Context, p *DealProposal) (*DealResp return nil, errors.New("Mining disabled, can not process proposal") } - // TODO: we don't really actually want to put this in our general storage - // but we just want to get its cid, as a way to uniquely track it - propcid, err := sm.node.CborStore().Put(ctx, p) + proposalCid, err := convert.ToCid(p) if err != nil { return nil, errors.Wrap(err, "failed to get cid of proposal") } resp := &DealResponse{ - State: Accepted, - Proposal: propcid, - Signature: types.Signature("signaturrreee"), + State: Accepted, + ProposalCid: proposalCid, + Signature: types.Signature("signaturrreee"), } sm.dealsLk.Lock() defer sm.dealsLk.Unlock() - // TODO: clear out deals when appropriate. - sm.deals[propcid] = &storageDealState{ - proposal: p, - state: resp, + sm.deals[proposalCid] = &storageDeal{ + Proposal: p, + Response: resp, + } + if err := sm.saveDeal(proposalCid); err != nil { + sm.deals[proposalCid].Response.State = Failed + sm.deals[proposalCid].Response.Message = "Could not persist deal due to internal error" + return nil, errors.Wrap(err, "failed to save miner deal") } // TODO: use some sort of nicer scheduler - go sm.processStorageDeal(propcid) + go sm.processStorageDeal(proposalCid) return resp, nil } -func (sm *Miner) getStorageDeal(c cid.Cid) *storageDealState { +func (sm *Miner) getStorageDeal(c cid.Cid) *storageDeal { sm.dealsLk.Lock() defer sm.dealsLk.Unlock() return sm.deals[c] } -func (sm *Miner) updateDealState(c cid.Cid, f func(*DealResponse)) { +func (sm *Miner) updateDealResponse(proposalCid cid.Cid, f func(*DealResponse)) error { sm.dealsLk.Lock() defer sm.dealsLk.Unlock() - f(sm.deals[c].state) - log.Debugf("Miner.updateDealState(%s) - %d", c.String(), sm.deals[c].state) + f(sm.deals[proposalCid].Response) + err := sm.saveDeal(proposalCid) + if err != nil { + return errors.Wrap(err, "failed to store updated deal response in datastore") + } + + log.Debugf("Miner.updateDealResponse(%s) - %d", proposalCid.String(), sm.deals[proposalCid].Response) + return nil } func (sm *Miner) processStorageDeal(c cid.Cid) { @@ -201,7 +232,7 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { defer cancel() d := sm.getStorageDeal(c) - if d.state.State != Accepted { + if d.Response.State != Accepted { // TODO: handle resumption of deal processing across miner restarts log.Error("attempted to process an already started deal") return @@ -211,27 +242,33 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { // TODO: this is not a great way to do this. At least use a session // Also, this needs to be fetched into a staging area for miners to prepare and seal in data log.Debug("Miner.processStorageDeal - FetchGraph") - if err := dag.FetchGraph(ctx, d.proposal.PieceRef, dag.NewDAGService(sm.node.BlockService())); err != nil { + if err := dag.FetchGraph(ctx, d.Proposal.PieceRef, dag.NewDAGService(sm.node.BlockService())); err != nil { log.Errorf("failed to fetch data: %s", err) - sm.updateDealState(c, func(resp *DealResponse) { + err := sm.updateDealResponse(c, func(resp *DealResponse) { resp.Message = "Transfer failed" resp.State = Failed // TODO: signature? }) + if err != nil { + log.Errorf("could not update to deal to 'Failed' state: %s", err) + } return } fail := func(message, logerr string) { log.Errorf(logerr) - sm.updateDealState(c, func(resp *DealResponse) { + err := sm.updateDealResponse(c, func(resp *DealResponse) { resp.Message = message resp.State = Failed }) + if err != nil { + log.Errorf("could not update to deal to 'Failed' state in fail callback: %s", err) + } } pi := §orbuilder.PieceInfo{ - Ref: d.proposal.PieceRef, - Size: d.proposal.Size.Uint64(), + Ref: d.Proposal.PieceRef, + Size: d.Proposal.Size.Uint64(), } // There is a race here that requires us to use dealsAwaitingSeal below. If the @@ -249,13 +286,19 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { return } - sm.updateDealState(c, func(resp *DealResponse) { + err = sm.updateDealResponse(c, func(resp *DealResponse) { resp.State = Staged }) + if err != nil { + log.Errorf("could update to 'Staged': %s", err) + } // Careful: this might update state to success or failure so it should go after // updating state to Staged. sm.dealsAwaitingSeal.add(sectorID, c) + if err := sm.saveDealsAwaitingSeal(); err != nil { + log.Errorf("could not save deal awaiting seal: %s", err) + } } // dealsAwaitingSealStruct is a container for keeping track of which sectors have @@ -266,29 +309,51 @@ func (sm *Miner) processStorageDeal(c cid.Cid) { type dealsAwaitingSealStruct struct { l sync.Mutex // Maps from sector id to the deal cids with pieces in the sector. - sectorsToDeals map[uint64][]cid.Cid + SectorsToDeals map[uint64][]cid.Cid // Maps from sector id to sector. - successfulSectors map[uint64]*sectorbuilder.SealedSectorMetadata + SuccessfulSectors map[uint64]*sectorbuilder.SealedSectorMetadata // Maps from sector id to seal failure error string. - failedSectors map[uint64]string + FailedSectors map[uint64]string onSuccess func(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) onFail func(dealCid cid.Cid, message string) } -func newDealsAwaitingSeal() *dealsAwaitingSealStruct { - return &dealsAwaitingSealStruct{ - sectorsToDeals: make(map[uint64][]cid.Cid), - successfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), - failedSectors: make(map[uint64]string), +func (sm *Miner) loadDealsAwaitingSeal() error { + sm.dealsAwaitingSeal = &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } + + result, notFound := sm.dealsAwaitingSealDs.Get(datastore.NewKey(dealsAwatingSeal)) + if notFound == nil { + if err := json.Unmarshal(result, &sm.dealsAwaitingSeal); err != nil { + return errors.Wrap(err, "failed to unmarshal deals awaiting seal from datastore") + } + } + + return nil +} + +func (sm *Miner) saveDealsAwaitingSeal() error { + marshalledDealsAwaitingSeal, err := json.Marshal(sm.dealsAwaitingSeal) + if err != nil { + return errors.Wrap(err, "Could not marshal dealsAwaitingSeal") + } + err = sm.dealsAwaitingSealDs.Put(datastore.NewKey(dealsAwatingSeal), marshalledDealsAwaitingSeal) + if err != nil { + return errors.Wrap(err, "could not save deal awaiting seal record to disk, in-memory deals differ from persisted deals!") } + + return nil } func (dealsAwaitingSeal *dealsAwaitingSealStruct) add(sectorID uint64, dealCid cid.Cid) { dealsAwaitingSeal.l.Lock() defer dealsAwaitingSeal.l.Unlock() - if sector, ok := dealsAwaitingSeal.successfulSectors[sectorID]; ok { + if sector, ok := dealsAwaitingSeal.SuccessfulSectors[sectorID]; ok { dealsAwaitingSeal.onSuccess(dealCid, sector) // Don't keep references to sectors around forever. Assume that at most // one success-before-add call will happen (eg, in a test). Sector sealing @@ -297,17 +362,17 @@ func (dealsAwaitingSeal *dealsAwaitingSealStruct) add(sectorID uint64, dealCid c // the state around for longer for some reason we need to limit how many // sectors we hang onto, eg keep a fixed-length slice of successes // and failures and shift the oldest off and the newest on. - delete(dealsAwaitingSeal.successfulSectors, sectorID) - } else if message, ok := dealsAwaitingSeal.failedSectors[sectorID]; ok { + delete(dealsAwaitingSeal.SuccessfulSectors, sectorID) + } else if message, ok := dealsAwaitingSeal.FailedSectors[sectorID]; ok { dealsAwaitingSeal.onFail(dealCid, message) // Same as above. - delete(dealsAwaitingSeal.failedSectors, sectorID) + delete(dealsAwaitingSeal.FailedSectors, sectorID) } else { - deals, ok := dealsAwaitingSeal.sectorsToDeals[sectorID] + deals, ok := dealsAwaitingSeal.SectorsToDeals[sectorID] if ok { - dealsAwaitingSeal.sectorsToDeals[sectorID] = append(deals, dealCid) + dealsAwaitingSeal.SectorsToDeals[sectorID] = append(deals, dealCid) } else { - dealsAwaitingSeal.sectorsToDeals[sectorID] = []cid.Cid{dealCid} + dealsAwaitingSeal.SectorsToDeals[sectorID] = []cid.Cid{dealCid} } } } @@ -316,24 +381,24 @@ func (dealsAwaitingSeal *dealsAwaitingSealStruct) success(sector *sectorbuilder. dealsAwaitingSeal.l.Lock() defer dealsAwaitingSeal.l.Unlock() - dealsAwaitingSeal.successfulSectors[sector.SectorID] = sector + dealsAwaitingSeal.SuccessfulSectors[sector.SectorID] = sector - for _, dealCid := range dealsAwaitingSeal.sectorsToDeals[sector.SectorID] { + for _, dealCid := range dealsAwaitingSeal.SectorsToDeals[sector.SectorID] { dealsAwaitingSeal.onSuccess(dealCid, sector) } - delete(dealsAwaitingSeal.sectorsToDeals, sector.SectorID) + delete(dealsAwaitingSeal.SectorsToDeals, sector.SectorID) } func (dealsAwaitingSeal *dealsAwaitingSealStruct) fail(sectorID uint64, message string) { dealsAwaitingSeal.l.Lock() defer dealsAwaitingSeal.l.Unlock() - dealsAwaitingSeal.failedSectors[sectorID] = message + dealsAwaitingSeal.FailedSectors[sectorID] = message - for _, dealCid := range dealsAwaitingSeal.sectorsToDeals[sectorID] { + for _, dealCid := range dealsAwaitingSeal.SectorsToDeals[sectorID] { dealsAwaitingSeal.onFail(dealCid, message) } - delete(dealsAwaitingSeal.sectorsToDeals, sectorID) + delete(dealsAwaitingSeal.SectorsToDeals, sectorID) } // OnCommitmentAddedToChain is a callback, called when a sector seal message was posted to the chain. @@ -342,18 +407,21 @@ func (sm *Miner) OnCommitmentAddedToChain(sector *sectorbuilder.SealedSectorMeta log.Debug("Miner.OnCommitmentAddedToChain") if err != nil { - // we failed to seal this sector, cancel all the deals - errMsg := fmt.Sprintf("failed sealing sector: %v: %s; canceling all outstanding deals", sectorID, err) - log.Errorf(errMsg) + errMsg := fmt.Sprintf("failed sealing sector: %v: %s:", sectorID, err) + log.Error(errMsg) + sm.dealsAwaitingSeal.fail(sector.SectorID, errMsg) + } else { + sm.dealsAwaitingSeal.success(sector) + } + if err := sm.saveDealsAwaitingSeal(); err != nil { + errMsg := fmt.Sprintf("failed persisting deals awaiting seal: %s", err) + log.Error(errMsg) sm.dealsAwaitingSeal.fail(sector.SectorID, errMsg) - return } - - sm.dealsAwaitingSeal.success(sector) } func (sm *Miner) onCommitSuccess(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) { - sm.updateDealState(dealCid, func(resp *DealResponse) { + err := sm.updateDealResponse(dealCid, func(resp *DealResponse) { resp.State = Posted resp.ProofInfo = &ProofInfo{ SectorID: sector.SectorID, @@ -361,13 +429,17 @@ func (sm *Miner) onCommitSuccess(dealCid cid.Cid, sector *sectorbuilder.SealedSe CommD: sector.CommD[:], } }) + if err != nil { + log.Errorf("commit succeeded but could not update to deal 'Posted' state: %s", err) + } } func (sm *Miner) onCommitFail(dealCid cid.Cid, message string) { - sm.updateDealState(dealCid, func(resp *DealResponse) { + err := sm.updateDealResponse(dealCid, func(resp *DealResponse) { resp.Message = message resp.State = Failed }) + log.Errorf("commit failure but could not update to deal 'Failed' state: %s", err) } // OnNewHeaviestTipSet is a callback called by node, everytime the the latest head is updated. @@ -558,7 +630,7 @@ func (sm *Miner) Query(ctx context.Context, c cid.Cid) *DealResponse { } } - return d.state + return d.Response } func (sm *Miner) handleQueryDeal(s inet.Stream) { @@ -592,3 +664,34 @@ func getFileSize(ctx context.Context, c cid.Cid, dserv ipld.DAGService) (uint64, return 0, fmt.Errorf("unrecognized node type: %T", fnode) } } + +func (sm *Miner) loadDeals() error { + res, err := sm.dealsDs.Query(query.Query{}) + if err != nil { + return errors.Wrap(err, "failed to query deals from datastore") + } + + sm.deals = make(map[cid.Cid]*storageDeal) + + for entry := range res.Next() { + var deal storageDeal + if err := cbor.DecodeInto(entry.Value, &deal); err != nil { + return errors.Wrap(err, "failed to unmarshal deals from datastore") + } + sm.deals[deal.Response.ProposalCid] = &deal + } + + return nil +} + +func (sm *Miner) saveDeal(proposalCid cid.Cid) error { + marshalledDeal, err := cbor.DumpObject(sm.deals[proposalCid]) + if err != nil { + return errors.Wrap(err, "Could not marshal storageDeal") + } + err = sm.dealsDs.Put(datastore.NewKey(proposalCid.String()), marshalledDeal) + if err != nil { + return errors.Wrap(err, "could not save client storage deal") + } + return nil +} diff --git a/protocol/storage/miner_test.go b/protocol/storage/miner_test.go index 64d6d71f5d..ab9fca840c 100644 --- a/protocol/storage/miner_test.go +++ b/protocol/storage/miner_test.go @@ -1,15 +1,17 @@ package storage import ( - cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" "testing" "github.com/filecoin-project/go-filecoin/proofs/sectorbuilder" + "github.com/filecoin-project/go-filecoin/repo" "github.com/filecoin-project/go-filecoin/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" ) -func TestNewDealsAwaitingSeal(t *testing.T) { +func TestDealsAwaitingSeal(t *testing.T) { newCid := types.NewCidForTestGetter() cid0 := newCid() cid1 := newCid() @@ -21,11 +23,38 @@ func TestNewDealsAwaitingSeal(t *testing.T) { wantMessage := "boom" + t.Run("saveDealsAwaitingSeal saves, loadDealsAwaitingSeal loads", func(t *testing.T) { + t.Parallel() + assert := assert.New(t) + require := require.New(t) + + miner := &Miner{ + dealsAwaitingSeal: &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + }, + dealsAwaitingSealDs: repo.NewInMemoryRepo().DealsAwaitingSealDatastore(), + } + + miner.dealsAwaitingSeal.add(wantSectorID, cid0) + + require.NoError(miner.saveDealsAwaitingSeal()) + miner.dealsAwaitingSeal = &dealsAwaitingSealStruct{} + require.NoError(miner.loadDealsAwaitingSeal()) + + assert.Equal(cid0, miner.dealsAwaitingSeal.SectorsToDeals[42][0]) + }) + t.Run("add before success", func(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onSuccess = func(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) { assert.Equal(sector, wantSector) @@ -44,7 +73,11 @@ func TestNewDealsAwaitingSeal(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onSuccess = func(dealCid cid.Cid, sector *sectorbuilder.SealedSectorMetadata) { assert.Equal(sector, wantSector) @@ -63,7 +96,11 @@ func TestNewDealsAwaitingSeal(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onFail = func(dealCid cid.Cid, message string) { assert.Equal(message, wantMessage) @@ -82,7 +119,11 @@ func TestNewDealsAwaitingSeal(t *testing.T) { t.Parallel() assert := assert.New(t) - dealsAwaitingSeal := newDealsAwaitingSeal() + dealsAwaitingSeal := &dealsAwaitingSealStruct{ + SectorsToDeals: make(map[uint64][]cid.Cid), + SuccessfulSectors: make(map[uint64]*sectorbuilder.SealedSectorMetadata), + FailedSectors: make(map[uint64]string), + } gotCids := []cid.Cid{} dealsAwaitingSeal.onFail = func(dealCid cid.Cid, message string) { assert.Equal(message, wantMessage) diff --git a/protocol/storage/storage_protocol_test.go b/protocol/storage/storage_protocol_test.go index 20645d3adc..42ab4d23b1 100644 --- a/protocol/storage/storage_protocol_test.go +++ b/protocol/storage/storage_protocol_test.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/go-filecoin/plumbing/mthdsig" "github.com/filecoin-project/go-filecoin/proofs" . "github.com/filecoin-project/go-filecoin/protocol/storage" + th "github.com/filecoin-project/go-filecoin/testhelpers" "github.com/filecoin-project/go-filecoin/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -104,10 +105,10 @@ func TestStorageProtocolBasic(t *testing.T) { return [][]byte{enc}, 0, nil }, ) - c := NewClient(cni) - m, err := NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, plumbingAPI) + c, err := NewClient(cni, clientNode.Repo.ClientDealsDatastore()) + require.NoError(err) + _, err = NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.MinerDealsDatastore(), minerNode.Repo.DealsAwaitingSealDatastore(), plumbingAPI) assert.NoError(err) - _ = m assert.NoError(minerNode.Start(ctx)) assert.NoError(clientNode.Start(ctx)) @@ -173,7 +174,7 @@ func TestStorageProtocolBasic(t *testing.T) { ref, err := c.ProposeDeal(ctx, mineraddr, protonode.Cid(), 1, 150) assert.NoError(err) requireQueryDeal := func() (DealState, string) { - resp, err := c.QueryDeal(ctx, ref.Proposal) + resp, err := c.QueryDeal(ctx, ref.ProposalCid) require.NoError(err) return resp.State, resp.Message } @@ -191,7 +192,7 @@ func TestStorageProtocolBasic(t *testing.T) { } require.True(done) - if waitTimeout(&wg, 120*time.Second) { + if th.WaitTimeout(&wg, 120*time.Second) { state, message := requireQueryDeal() require.NotEqual(Failed, state, message) require.Failf("TestStorageProtocolBasic failed", "waiting for submission timed out. Saw %d blocks with %d messages while waiting", bCount, mCount) @@ -202,7 +203,7 @@ func TestStorageProtocolBasic(t *testing.T) { // Now all things should be ready done = false for i := 0; i < 10; i++ { - resp, err := c.QueryDeal(ctx, ref.Proposal) + resp, err := c.QueryDeal(ctx, ref.ProposalCid) assert.NoError(err) assert.NotEqual(Failed, resp.State, resp.Message) if resp.State == Posted { @@ -215,19 +216,3 @@ func TestStorageProtocolBasic(t *testing.T) { assert.True(done, "failed to finish transfer") } - -// waitTimeout waits for the waitgroup for the specified max timeout. -// Returns true if waiting timed out. -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { - c := make(chan struct{}) - go func() { - defer close(c) - wg.Wait() - }() - select { - case <-c: - return false // completed normally - case <-time.After(timeout): - return true // timed out - } -} diff --git a/protocol/storage/types.go b/protocol/storage/types.go index 2dbae6a748..c648178378 100644 --- a/protocol/storage/types.go +++ b/protocol/storage/types.go @@ -41,7 +41,7 @@ type DealResponse struct { Message string // Proposal is the cid of the StorageDealProposal object this response is for - Proposal cid.Cid + ProposalCid cid.Cid // ProofInfo is a collection of information needed to convince the client that // the miner has sealed the data into a sector. diff --git a/repo/fsrepo.go b/repo/fsrepo.go index be7be029e0..4ee8993290 100644 --- a/repo/fsrepo.go +++ b/repo/fsrepo.go @@ -24,15 +24,18 @@ import ( const ( // APIFile is the filename containing the filecoin node's api address. - APIFile = "api" - configFilename = "config.json" - tempConfigFilename = ".config.json.temp" - lockFile = "repo.lock" - versionFilename = "version" - walletDatastorePrefix = "wallet" - chainDatastorePrefix = "chain" - snapshotStorePrefix = "snapshots" - snapshotFilenamePrefix = "snapshot" + APIFile = "api" + configFilename = "config.json" + tempConfigFilename = ".config.json.temp" + lockFile = "repo.lock" + versionFilename = "version" + walletDatastorePrefix = "wallet" + chainDatastorePrefix = "chain" + minerDealsDatastorePrefix = "miner_deals" + clientDealsDatastorePrefix = "client_deals" + dealsAwaitingSealDatastorePrefix = "deals_awaiting_seal" + snapshotStorePrefix = "snapshots" + snapshotFilenamePrefix = "snapshot" ) // NoRepoError is returned when trying to open a repo where one does not exist @@ -52,12 +55,15 @@ type FSRepo struct { version uint // lk protects the config file - lk sync.RWMutex - cfg *config.Config - ds Datastore - keystore keystore.Keystore - walletDs Datastore - chainDs Datastore + lk sync.RWMutex + cfg *config.Config + ds Datastore + keystore keystore.Keystore + walletDs Datastore + chainDs Datastore + minerDealsDs Datastore + clientDealsDs Datastore + dealsAwaitingSealDs Datastore // lockfile is the file system lock to prevent others from opening the same repo. lockfile io.Closer @@ -127,6 +133,18 @@ func (r *FSRepo) loadFromDisk() error { if err := r.openChainDatastore(); err != nil { return errors.Wrap(err, "failed to open chain datastore") } + + if err := r.openMinerDealsDatastore(); err != nil { + return errors.Wrap(err, "failed to open miner deals datastore") + } + + if err := r.openClientDealsDatastore(); err != nil { + return errors.Wrap(err, "failed to open client deals datastore") + } + + if err := r.openDealsAwaitingSealDatastore(); err != nil { + return errors.Wrap(err, "failed to open deals awaiting seal datastore") + } return nil } @@ -216,6 +234,21 @@ func (r *FSRepo) ChainDatastore() Datastore { return r.chainDs } +// MinerDealsDatastore returns the deals datastore for a miner. +func (r *FSRepo) MinerDealsDatastore() Datastore { + return r.minerDealsDs +} + +// ClientDealsDatastore returns the deals datastore for a client. +func (r *FSRepo) ClientDealsDatastore() Datastore { + return r.clientDealsDs +} + +// DealsAwaitingSealDatastore returns the deals awaiting seals. +func (r *FSRepo) DealsAwaitingSealDatastore() Datastore { + return r.dealsAwaitingSealDs +} + // Version returns the version of the repo func (r *FSRepo) Version() uint { return r.version @@ -233,11 +266,23 @@ func (r *FSRepo) Close() error { } if err := r.walletDs.Close(); err != nil { - return errors.Wrap(err, "failed to close datastore") + return errors.Wrap(err, "failed to close wallet datastore") } if err := r.chainDs.Close(); err != nil { - return errors.Wrap(err, "failed to close datastore") + return errors.Wrap(err, "failed to close chain datastore") + } + + if err := r.minerDealsDs.Close(); err != nil { + return errors.Wrap(err, "failed to close miner deals datastore") + } + + if err := r.clientDealsDs.Close(); err != nil { + return errors.Wrap(err, "failed to close client deals datastore") + } + + if err := r.dealsAwaitingSealDs.Close(); err != nil { + return errors.Wrap(err, "failed to close deals awaiting seal datastore") } if err := r.removeAPIFile(); err != nil { @@ -351,6 +396,39 @@ func (r *FSRepo) openWalletDatastore() error { return nil } +func (r *FSRepo) openMinerDealsDatastore() error { + ds, err := badgerds.NewDatastore(filepath.Join(r.path, minerDealsDatastorePrefix), nil) + if err != nil { + return err + } + + r.minerDealsDs = ds + + return nil +} + +func (r *FSRepo) openClientDealsDatastore() error { + ds, err := badgerds.NewDatastore(filepath.Join(r.path, clientDealsDatastorePrefix), nil) + if err != nil { + return err + } + + r.clientDealsDs = ds + + return nil +} + +func (r *FSRepo) openDealsAwaitingSealDatastore() error { + ds, err := badgerds.NewDatastore(filepath.Join(r.path, dealsAwaitingSealDatastorePrefix), nil) + if err != nil { + return err + } + + r.dealsAwaitingSealDs = ds + + return nil +} + func initVersion(p string, version uint) error { return ioutil.WriteFile(filepath.Join(p, versionFilename), []byte(strconv.Itoa(int(version))), 0644) } diff --git a/repo/mem.go b/repo/mem.go index 833b7c8f15..ebd4ad7654 100644 --- a/repo/mem.go +++ b/repo/mem.go @@ -16,16 +16,19 @@ import ( // implementation of the Repo interface. type MemRepo struct { // lk guards the config - lk sync.RWMutex - C *config.Config - D Datastore - Ks keystore.Keystore - W Datastore - Chain Datastore - version uint - apiAddress string - stagingDir string - sealedDir string + lk sync.RWMutex + C *config.Config + D Datastore + Ks keystore.Keystore + W Datastore + Chain Datastore + MinerDealsDs Datastore + ClientDealsDs Datastore + DealsAwaitingSealDs Datastore + version uint + apiAddress string + stagingDir string + sealedDir string } var _ Repo = (*MemRepo)(nil) @@ -50,14 +53,17 @@ func NewInMemoryRepo() *MemRepo { // sector-storage. func NewInMemoryRepoWithSectorDirectories(staging, sealedDir string) *MemRepo { return &MemRepo{ - C: config.NewDefaultConfig(), - D: dss.MutexWrap(datastore.NewMapDatastore()), - Ks: keystore.MutexWrap(keystore.NewMemKeystore()), - W: dss.MutexWrap(datastore.NewMapDatastore()), - Chain: dss.MutexWrap(datastore.NewMapDatastore()), - version: Version, - stagingDir: staging, - sealedDir: sealedDir, + C: config.NewDefaultConfig(), + D: dss.MutexWrap(datastore.NewMapDatastore()), + Ks: keystore.MutexWrap(keystore.NewMemKeystore()), + W: dss.MutexWrap(datastore.NewMapDatastore()), + Chain: dss.MutexWrap(datastore.NewMapDatastore()), + MinerDealsDs: dss.MutexWrap(datastore.NewMapDatastore()), + ClientDealsDs: dss.MutexWrap(datastore.NewMapDatastore()), + DealsAwaitingSealDs: dss.MutexWrap(datastore.NewMapDatastore()), + version: Version, + stagingDir: staging, + sealedDir: sealedDir, } } @@ -99,6 +105,21 @@ func (mr *MemRepo) ChainDatastore() Datastore { return mr.Chain } +// MinerDealsDatastore returns the deals datastore for miners. +func (mr *MemRepo) MinerDealsDatastore() Datastore { + return mr.MinerDealsDs +} + +// ClientDealsDatastore returns the deals datastore for miners. +func (mr *MemRepo) ClientDealsDatastore() Datastore { + return mr.ClientDealsDs +} + +// DealsAwaitingSealDatastore returns the deals awaiting seal datastore. +func (mr *MemRepo) DealsAwaitingSealDatastore() Datastore { + return mr.DealsAwaitingSealDs +} + // Version returns the version of the repo. func (mr *MemRepo) Version() uint { return mr.version diff --git a/repo/repo.go b/repo/repo.go index 39238c85f4..4db0f2600b 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -35,6 +35,15 @@ type Repo interface { // ChainDatastore is a specific storage solution, only used to store already validated chain data. ChainDatastore() Datastore + // MinerDealsDatastore holds deals data. + MinerDealsDatastore() Datastore + + // ClientDealsDatastore holds deals data. + ClientDealsDatastore() Datastore + + // DealsAwaitingSealDatastore holds deals awaiting seal data. + DealsAwaitingSealDatastore() Datastore + // SetAPIAddr sets the address of the running API. SetAPIAddr(string) error diff --git a/testhelpers/commands.go b/testhelpers/commands.go index c73cc4043e..bbc526fdff 100644 --- a/testhelpers/commands.go +++ b/testhelpers/commands.go @@ -96,27 +96,27 @@ func RunSuccessLines(td *TestDaemon, args ...string) []string { // TestDaemon is used to manage a Filecoin daemon instance for testing purposes. type TestDaemon struct { - cmdAddr string - swarmAddr string - repoDir string - genesisFile string - keyFiles []string - withMiner string + cmdAddr string + swarmAddr string + repoDir string + genesisFile string + keyFiles []string + withMiner string + autoSealInterval string firstRun bool init bool - // The filecoin daemon process - process *exec.Cmd - lk sync.Mutex Stdin io.Writer Stdout io.Reader Stderr io.Reader - test *testing.T - - cmdTimeout time.Duration + process *exec.Cmd + test *testing.T + cmdTimeout time.Duration + defaultAddress string + daemonArgs []string } // RepoDir returns the repo directory of the test daemon. @@ -355,6 +355,8 @@ func (td *TestDaemon) ReadStderr() string { // Start starts up the daemon. func (td *TestDaemon) Start() *TestDaemon { + td.createNewProcess() + require.NoError(td.test, td.process.Start()) err := td.WaitForAPI() @@ -376,7 +378,23 @@ func (td *TestDaemon) Start() *TestDaemon { return td } -// Shutdown stops the daemon. +// Stop stops the daemon +func (td *TestDaemon) Stop() *TestDaemon { + if err := td.process.Process.Signal(syscall.SIGINT); err != nil { + panic(err) + } + if _, err := td.process.Process.Wait(); err != nil { + panic(err) + } + return td +} + +// Restart restarts the daemon +func (td *TestDaemon) Restart() *TestDaemon { + return td.Stop().Start() +} + +// Shutdown stops the daemon and deletes the repository. func (td *TestDaemon) Shutdown() { if err := td.process.Process.Signal(syscall.SIGTERM); err != nil { td.test.Errorf("Daemon Stderr:\n%s", td.ReadStderr()) @@ -480,6 +498,24 @@ func (td *TestDaemon) CreateAsk(peer *TestDaemon, minerAddr string, fromAddr str return &askID } +// UpdatePeerID updates a miner's peer ID +func (td *TestDaemon) UpdatePeerID() { + require := require.New(td.test) + assert := assert.New(td.test) + + var idOutput map[string]interface{} + peerIDJSON := td.RunSuccess("id").ReadStdout() + err := json.Unmarshal([]byte(peerIDJSON), &idOutput) + require.NoError(err) + updateCidStr := td.RunSuccess("miner", "update-peerid", "--price=0", "--limit=999999", td.GetMinerAddress().String(), idOutput["ID"].(string)).ReadStdoutTrimNewlines() + updateCid, err := cid.Parse(updateCidStr) + require.NoError(err) + assert.NotNil(updateCid) + + td.RunSuccess("mining once") + td.WaitForMessageRequireSuccess(updateCid) +} + // WaitForMessageRequireSuccess accepts a message cid and blocks until a message with matching cid is included in a // block. The receipt is then inspected to ensure that the corresponding message receipt had a 0 exit code. func (td *TestDaemon) WaitForMessageRequireSuccess(msgCid cid.Cid) *types.MessageReceipt { @@ -674,6 +710,20 @@ func KeyFile(kf string) func(*TestDaemon) { } } +// DefaultAddress specifies a key file for this daemon to add to their wallet during init +func DefaultAddress(defaultAddr string) func(*TestDaemon) { + return func(td *TestDaemon) { + td.defaultAddress = defaultAddr + } +} + +// AutoSealInterval specifies an interval for automatically sealing +func AutoSealInterval(autoSealInterval string) func(*TestDaemon) { + return func(td *TestDaemon) { + td.autoSealInterval = autoSealInterval + } +} + // GenesisFile allows setting the `genesisFile` config option on the daemon. func GenesisFile(a string) func(*TestDaemon) { return func(td *TestDaemon) { @@ -700,11 +750,10 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { } td := &TestDaemon{ - test: t, - repoDir: dir, - init: true, // we want to init unless told otherwise - firstRun: true, - + test: t, + repoDir: dir, + init: true, // we want to init unless told otherwise + firstRun: true, cmdTimeout: DefaultDaemonCmdTimeout, genesisFile: GenesisFilePath(), // default file includes all test addresses, } @@ -718,9 +767,7 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { blockTimeFlag := fmt.Sprintf("--block-time=%s", BlockTimeTest) // build command options - initopts := []string{ - repoDirFlag, - } + initopts := []string{repoDirFlag} if td.genesisFile != "" { initopts = append(initopts, fmt.Sprintf("--genesisfile=%s", td.genesisFile)) @@ -730,6 +777,14 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { initopts = append(initopts, fmt.Sprintf("--with-miner=%s", td.withMiner)) } + if td.defaultAddress != "" { + initopts = append(initopts, fmt.Sprintf("--default-address=%s", td.defaultAddress)) + } + + if td.autoSealInterval != "" { + initopts = append(initopts, fmt.Sprintf("--auto-seal-interval-seconds=%s", td.autoSealInterval)) + } + if td.init { t.Logf("run: go-filecoin init %s", initopts) out, err := RunInit(td, initopts...) @@ -755,29 +810,7 @@ func NewDaemon(t *testing.T, options ...func(*TestDaemon)) *TestDaemon { swarmListenFlag := fmt.Sprintf("--swarmlisten=%s", td.swarmAddr) cmdAPIAddrFlag := fmt.Sprintf("--cmdapiaddr=%s", td.cmdAddr) - finalArgs := []string{"daemon", repoDirFlag, cmdAPIAddrFlag, swarmListenFlag, blockTimeFlag} - td.test.Logf("(%s) run: %q\n", td.swarmAddr, strings.Join(finalArgs, " ")) - - // define filecoin daemon process - td.process = exec.Command(filecoinBin, finalArgs...) - // disable REUSEPORT, it creates problems in tests - td.process.Env = append(os.Environ(), "IPFS_REUSEPORT=false") - - // setup process pipes - td.Stdout, err = td.process.StdoutPipe() - if err != nil { - t.Fatal(err) - } - // uncomment this and comment out the following 4 lines to output daemon stderr to os stderr - //td.process.Stderr = os.Stderr - td.Stderr, err = td.process.StderrPipe() - if err != nil { - t.Fatal(err) - } - td.Stdin, err = td.process.StdinPipe() - if err != nil { - t.Fatal(err) - } + td.daemonArgs = []string{filecoinBin, "daemon", repoDirFlag, cmdAPIAddrFlag, swarmListenFlag, blockTimeFlag} return td } @@ -809,3 +842,28 @@ func ProjectRoot(paths ...string) string { return filepath.Join(allPaths...) } + +func (td *TestDaemon) createNewProcess() { + td.test.Logf("(%s) run: %q\n", td.swarmAddr, strings.Join(td.daemonArgs, " ")) + + td.process = exec.Command(td.daemonArgs[0], td.daemonArgs[1:]...) + // disable REUSEPORT, it creates problems in tests + td.process.Env = append(os.Environ(), "IPFS_REUSEPORT=false") + + // setup process pipes + var err error + td.Stdout, err = td.process.StdoutPipe() + if err != nil { + td.test.Fatal(err) + } + // uncomment this and comment out the following 4 lines to output daemon stderr to os stderr + //td.process.Stderr = os.Stderr + td.Stderr, err = td.process.StderrPipe() + if err != nil { + td.test.Fatal(err) + } + td.Stdin, err = td.process.StdinPipe() + if err != nil { + td.test.Fatal(err) + } +} diff --git a/testhelpers/util.go b/testhelpers/util.go index 2d9c2232f7..a1b7255b4b 100644 --- a/testhelpers/util.go +++ b/testhelpers/util.go @@ -5,6 +5,7 @@ import ( "net" "os" "path/filepath" + "sync" "time" "gx/ipfs/QmdcULN1WCzgoQmcCaUAmEhwcxHYsDrbZ2LvRJKCL8dMrK/go-homedir" @@ -82,3 +83,19 @@ func WaitForIt(count int, delay time.Duration, cb func() (bool, error)) error { return nil } + +// WaitTimeout waits for the waitgroup for the specified max timeout. +// Returns true if waiting timed out. +func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} diff --git a/util/convert/convert.go b/util/convert/convert.go new file mode 100644 index 0000000000..174331beef --- /dev/null +++ b/util/convert/convert.go @@ -0,0 +1,18 @@ +package convert + +import ( + "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" + cbor "gx/ipfs/QmRoARq3nkUb13HSKZGepCZSWe5GrVPwx7xURJGZ7KWv9V/go-ipld-cbor" + + "github.com/filecoin-project/go-filecoin/types" + "github.com/pkg/errors" +) + +// ToCid gets the Cid for the argument passed in +func ToCid(object interface{}) (cid.Cid, error) { + cborNode, err := cbor.WrapObject(object, types.DefaultHashFunction, -1) + if err != nil { + return cid.Cid{}, errors.Wrap(err, "failed to get cid of proposal") + } + return cborNode.Cid(), nil +}