diff --git a/ipfs/pointers.go b/ipfs/pointers.go index c84284d88d..f79afcf5bc 100644 --- a/ipfs/pointers.go +++ b/ipfs/pointers.go @@ -89,6 +89,7 @@ func PutPointerToPeer(dht *routing.IpfsDHT, ctx context.Context, peer peer.ID, p func GetPointersFromPeer(dht *routing.IpfsDHT, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) { pmes := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, key.Bytes(), 0) + log.Debugf("Fetching pointers from: %v\n", p.Pretty()) resp, err := dht.SendRequest(ctx, p, pmes) if err != nil { return []*ps.PeerInfo{}, err diff --git a/mobile/node.go b/mobile/node.go index 1ab091a016..5030e302b9 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -480,12 +480,13 @@ func (n *Node) start() error { SendAck: n.OpenBazaarNode.SendOfflineAck, SendError: n.OpenBazaarNode.SendError, }) + go MR.ResetPointerList() go MR.Run() n.OpenBazaarNode.MessageRetriever = MR PR := rep.NewPointerRepublisher(n.OpenBazaarNode.DHT, n.OpenBazaarNode.Datastore, n.OpenBazaarNode.PushNodes, n.OpenBazaarNode.IsModerator) go PR.Run() n.OpenBazaarNode.PointerRepublisher = PR - MR.Wait() + // MR.Wait() n.OpenBazaarNode.PublishLock.Unlock() publishUnlocked = true diff --git a/net/retriever/retriever.go b/net/retriever/retriever.go index a7399a28dc..c4c69dce3e 100644 --- a/net/retriever/retriever.go +++ b/net/retriever/retriever.go @@ -29,7 +29,11 @@ import ( const DefaultPointerPrefixLength = 14 -var log = logging.MustGetLogger("retriever") +var ( + // Initialize a clear pointerList for the DHT on start + pointerList = []string{} + log = logging.MustGetLogger("retriever") +) type MRConfig struct { Db repo.Datastore @@ -66,6 +70,20 @@ type offlineMessage struct { env pb.Envelope } +func stringInSlice(str string, list []string) bool { + for _, v := range list { + if v == str { + return true + } + } + return false +} + +// Reset on startup +func (m *MessageRetriever) ResetPointerList() { + pointerList = []string{} +} + func NewMessageRetriever(cfg MRConfig) *MessageRetriever { var client *http.Client if cfg.Dialer != nil { @@ -100,8 +118,8 @@ func (m *MessageRetriever) Run() { peers := time.NewTicker(time.Minute) defer dht.Stop() defer peers.Stop() - go m.fetchPointersFromDHT() go m.fetchPointersFromPushNodes() + go m.fetchPointersFromDHT() for { select { case <-dht.C: @@ -159,7 +177,9 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) { inFlight := make(map[string]bool) // Iterate over the pointers, adding 1 to the waitgroup for each pointer found for p := range peerOut { - if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !inFlight[p.Addrs[0].String()] { + if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !stringInSlice(p.Addrs[0].String(), pointerList) && !inFlight[p.Addrs[0].String()] { + pointerList = append(pointerList, p.Addrs[0].String()) + log.Debugf("Looking for pointer [%v] at %v\n", p.ID.Pretty(), p.Addrs) inFlight[p.Addrs[0].String()] = true log.Debugf("Found pointer with location %s", p.Addrs[0].String()) // IPFS @@ -215,12 +235,15 @@ func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerI wg.Add(1) go func(pid peer.ID) { defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*35) defer cancel() + time.Sleep(time.Second * 15) provs, err := ipfs.GetPointersFromPeer(m.routing, ctx, pid, &k) if err != nil { + log.Errorf("Could not get pointers from push node because: %v", err) return } + log.Debugf("Successfully queried %s for pointers", pid.Pretty()) for _, pi := range provs { peerOut <- *pi } diff --git a/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go b/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go index bc02a43aed..d8416ab585 100644 --- a/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go +++ b/vendor/gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/lookup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket" pb "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/pb" @@ -65,6 +66,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee // since the query doesnt actually pass our context down // we have to hack this here. whyrusleeping isnt a huge fan of goprocess parent := ctx + ctx, _ = context.WithTimeout(ctx, time.Second*3) query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { // For DHT query command notif.PublishQueryEvent(parent, ¬if.QueryEvent{ @@ -72,6 +74,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee ID: p, }) + ctx, _ = context.WithTimeout(ctx, time.Second*3) pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) if err != nil { logger.Debugf("error getting closer peers: %s", err) diff --git a/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go b/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go index 0cc7e0dc59..6f52b98d1a 100644 --- a/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go +++ b/vendor/gx/ipfs/QmdR6WN3TUEAVQ9KWE2UiFJikWTbUvgBJay6mjB4yUJebq/go-libp2p-kad-dht/lookup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket" cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid" @@ -65,6 +66,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee // since the query doesnt actually pass our context down // we have to hack this here. whyrusleeping isnt a huge fan of goprocess parent := ctx + ctx, _ = context.WithTimeout(ctx, time.Second*3) query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { // For DHT query command notif.PublishQueryEvent(parent, ¬if.QueryEvent{ @@ -72,6 +74,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee ID: p, }) + ctx, _ = context.WithTimeout(ctx, time.Second*3) pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key)) if err != nil { logger.Debugf("error getting closer peers: %s", err)