Skip to content

Commit

Permalink
I think I broke stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
rkowalick committed Feb 8, 2019
1 parent 28673b6 commit 0b2f9c7
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 35 deletions.
8 changes: 4 additions & 4 deletions node/block_propagate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBlockPropsManyNodes(t *testing.T) {
assert := assert.New(t)

numNodes := 4
minerAddr, nodes := makeNodes(ctx, t, assert, numNodes)
minerAddr, nodes := makeNodes(t, assert, numNodes)
startNodes(t, nodes)
defer stopNodes(nodes)

Expand Down Expand Up @@ -101,7 +101,7 @@ func TestChainSync(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)

minerAddr, nodes := makeNodes(ctx, t, assert, 2)
minerAddr, nodes := makeNodes(t, assert, 2)
startNodes(t, nodes)
defer stopNodes(nodes)

Expand Down Expand Up @@ -140,7 +140,7 @@ func (r *zeroRewarder) GasReward(ctx context.Context, st state.Tree, minerAddr a
}

// makeNodes makes at least two nodes, a miner and a client; numNodes is the total wanted
func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions, numNodes int) (address.Address, []*Node) {
func makeNodes(t *testing.T, assertions *assert.Assertions, numNodes int) (address.Address, []*Node) {
seed := MakeChainSeed(t, TestGenCfg)
configOpts := []ConfigOpt{RewarderConfigOption(&zeroRewarder{})}
minerNode := MakeNodeWithChainSeed(t, seed, configOpts,
Expand All @@ -149,7 +149,7 @@ func makeNodes(ctx context.Context, t *testing.T, assertions *assert.Assertions,
)
seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.DealsDatastore(), minerNode.PorcelainAPI)
_, err := storage.NewMiner(mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.DealsDatastore(), minerNode.PorcelainAPI)
assertions.NoError(err)

nodes := []*Node{minerNode}
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func initStorageMinerForNode(ctx context.Context, node *Node) (*storage.Miner, e
return nil, errors.Wrap(err, "no mining owner available, skipping storage miner setup")
}

miner, err := storage.NewMiner(ctx, minerAddr, miningOwnerAddr, node, node.Repo.DealsDatastore(), node.PorcelainAPI)
miner, err := storage.NewMiner(minerAddr, miningOwnerAddr, node, node.Repo.DealsDatastore(), node.PorcelainAPI)
if err != nil {
return nil, errors.Wrap(err, "failed to instantiate storage miner")
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestNodeStartMining(t *testing.T) {

seed.GiveKey(t, minerNode, 0)
mineraddr, minerOwnerAddr := seed.GiveMiner(t, minerNode, 0)
_, err := storage.NewMiner(ctx, mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.DealsDatastore(), porcelainAPI)
_, err := storage.NewMiner(mineraddr, minerOwnerAddr, minerNode, minerNode.Repo.DealsDatastore(), porcelainAPI)
assert.NoError(err)

assert.NoError(minerNode.Start(ctx))
Expand Down
41 changes: 16 additions & 25 deletions protocol/storage/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore"
"gx/ipfs/Qmf4xQhNomPNhrtZc67qSnfJSjxjXs9LWvknJtSXwimPrM/go-datastore/query"

"github.com/filecoin-project/go-filecoin/abi"
"github.com/filecoin-project/go-filecoin/actor/builtin/miner"
Expand Down Expand Up @@ -67,15 +66,16 @@ type Miner struct {
porcelainAPI porcelainAPI
node node

proposalAcceptor func(ctx context.Context, m *Miner, p *deal.Proposal) (*deal.Response, error)
proposalRejector func(ctx context.Context, m *Miner, p *deal.Proposal, reason string) (*deal.Response, error)
proposalAcceptor func(m *Miner, p *deal.Proposal) (*deal.Response, error)
proposalRejector func(m *Miner, p *deal.Proposal, reason string) (*deal.Response, error)
}

// porcelainAPI is the subset of the porcelain API that storage.Miner needs.
type porcelainAPI interface {
MessageSendWithRetry(ctx context.Context, numRetries uint, waitDuration time.Duration, from, to address.Address, val *types.AttoFIL, method string, gasPrice types.AttoFIL, gasLimit types.GasUnits, params ...interface{}) error
MessageQuery(ctx context.Context, optFrom, to address.Address, method string, params ...interface{}) ([][]byte, *exec.FunctionSignature, error)
ConfigGet(dottedPath string) (interface{}, error)
DealsLs() (<-chan *deal.Deal, <-chan error)
}

// node is subset of node on which this protocol depends. These deps
Expand Down Expand Up @@ -103,7 +103,7 @@ func init() {
}

// NewMiner is
func NewMiner(ctx context.Context, minerAddr, minerOwnerAddr address.Address, nd node, dealsDs repo.Datastore, porcelainAPI porcelainAPI) (*Miner, error) {
func NewMiner(minerAddr, minerOwnerAddr address.Address, nd node, dealsDs repo.Datastore, porcelainAPI porcelainAPI) (*Miner, error) {
sm := &Miner{
minerAddr: minerAddr,
minerOwnerAddr: minerOwnerAddr,
Expand Down Expand Up @@ -169,15 +169,15 @@ func (sm *Miner) receiveStorageProposal(ctx context.Context, p *deal.Proposal) (
return nil, errors.New("Could not retrieve storagePrice from config")
}
if p.TotalPrice.LessThan(storagePriceAF) {
return sm.proposalRejector(ctx, sm, p,
return sm.proposalRejector(sm, p,
fmt.Sprintf("proposed price %s is less that miner's current asking price: %s", p.TotalPrice, storagePriceAF))
}

// Payment is valid, everything else checks out, let's accept this proposal
return sm.proposalAcceptor(ctx, sm, p)
return sm.proposalAcceptor(sm, p)
}

func acceptProposal(ctx context.Context, sm *Miner, p *deal.Proposal) (*deal.Response, error) {
func acceptProposal(sm *Miner, p *deal.Proposal) (*deal.Response, error) {
if sm.node.SectorBuilder() == nil {
return nil, errors.New("Mining disabled, can not process proposal")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func acceptProposal(ctx context.Context, sm *Miner, p *deal.Proposal) (*deal.Res
return resp, nil
}

func rejectProposal(ctx context.Context, sm *Miner, p *deal.Proposal, reason string) (*deal.Response, error) {
func rejectProposal(sm *Miner, p *deal.Proposal, reason string) (*deal.Response, error) {
proposalCid, err := convert.ToCid(p)
if err != nil {
return nil, errors.Wrap(err, "failed to get cid of proposal")
Expand Down Expand Up @@ -641,7 +641,7 @@ func (sm *Miner) submitPoSt(start, end *types.BlockHeight, inputs []generatePost
}

// Query responds to a query for the proposal referenced by the given cid
func (sm *Miner) Query(ctx context.Context, c cid.Cid) *deal.Response {
func (sm *Miner) Query(c cid.Cid) *deal.Response {
sm.dealsLk.Lock()
defer sm.dealsLk.Unlock()
d, ok := sm.deals[c]
Expand All @@ -664,8 +664,7 @@ func (sm *Miner) handleQueryDeal(s inet.Stream) {
return
}

ctx := context.Background()
resp := sm.Query(ctx, q.Cid)
resp := sm.Query(q.Cid)

if err := cbu.NewMsgWriter(s).WriteMsg(resp); err != nil {
log.Errorf("failed to write query response: %s", err)
Expand All @@ -688,23 +687,15 @@ func getFileSize(ctx context.Context, c cid.Cid, dserv ipld.DAGService) (uint64,
}

func (sm *Miner) loadDeals() error {
res, err := sm.dealsDs.Query(query.Query{
Prefix: "/" + deal.ClientDatastorePrefix,
})
if err != nil {
return errors.Wrap(err, "failed to query deals from datastore")
}

sm.deals = make(map[cid.Cid]*deal.Deal)

for entry := range res.Next() {
var storageDeal deal.Deal
if err := cbor.DecodeInto(entry.Value, &storageDeal); err != nil {
return errors.Wrap(err, "failed to unmarshal deals from datastore")
}
sm.deals[storageDeal.Response.ProposalCid] = &storageDeal
deals, doneOrError := sm.porcelainAPI.DealsLs()
select {
case storageDeal := <-deals:
sm.deals[storageDeal.Response.ProposalCid] = storageDeal
case errOrNil := <-doneOrError:
return errOrNil
}

return nil
}

Expand Down
24 changes: 20 additions & 4 deletions protocol/storage/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ func (mtp *minerTestPorcelain) ConfigGet(dottedPath string) (interface{}, error)
return mtp.config.Get(dottedPath)
}

func (mtp *minerTestPorcelain) DealsLs() (<-chan *deal.Deal, <-chan error) {
out, errOrDoneC := make(chan *deal.Deal), make(chan error)
go func() {
defer close(out)
defer close(errOrDoneC)
out <- &deal.Deal{Miner: address.Address{}, Proposal: &deal.Proposal{}, Response: &deal.Response{
State: deal.Accepted,
Message: "OK",
ProposalCid: cid.Cid{},
}}
errOrDoneC <- nil
}()

return out, errOrDoneC
}

func TestReceiveStorageProposal(t *testing.T) {
t.Run("Accepts proposals with sufficient TotalPrice", func(t *testing.T) {
assert := assert.New(t)
Expand All @@ -51,11 +67,11 @@ func TestReceiveStorageProposal(t *testing.T) {
porcelainAPI := newminerTestPorcelain()
miner := Miner{
porcelainAPI: porcelainAPI,
proposalAcceptor: func(ctx context.Context, m *Miner, p *deal.Proposal) (*deal.Response, error) {
proposalAcceptor: func(m *Miner, p *deal.Proposal) (*deal.Response, error) {
accepted = true
return &deal.Response{}, nil
},
proposalRejector: func(ctx context.Context, m *Miner, p *deal.Proposal, reason string) (*deal.Response, error) {
proposalRejector: func(m *Miner, p *deal.Proposal, reason string) (*deal.Response, error) {
rejected = true
return &deal.Response{Message: reason}, nil
},
Expand Down Expand Up @@ -85,11 +101,11 @@ func TestReceiveStorageProposal(t *testing.T) {
porcelainAPI := newminerTestPorcelain()
miner := Miner{
porcelainAPI: porcelainAPI,
proposalAcceptor: func(ctx context.Context, m *Miner, p *deal.Proposal) (*deal.Response, error) {
proposalAcceptor: func(m *Miner, p *deal.Proposal) (*deal.Response, error) {
accepted = true
return &deal.Response{}, nil
},
proposalRejector: func(ctx context.Context, m *Miner, p *deal.Proposal, reason string) (*deal.Response, error) {
proposalRejector: func(m *Miner, p *deal.Proposal, reason string) (*deal.Response, error) {
rejected = true
return &deal.Response{Message: reason}, nil
},
Expand Down

0 comments on commit 0b2f9c7

Please sign in to comment.