Skip to content

Commit

Permalink
feature(walk): call callback as files are listed within dir
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Dec 2, 2024
1 parent fcf7848 commit cc910f3
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 0 deletions.
1 change: 1 addition & 0 deletions fs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ConfigInfo struct {
Suffix string `yaml:"suffix"`
SuffixKeepExtension bool `yaml:"suffix_keep_extension"`
UseListR bool `yaml:"use_list_r"`
UseListCB bool `yaml:"use_list_cb"`
BufferSize SizeSuffix `yaml:"buffer_size"`
BwLimit BwTimetable `yaml:"bw_limit"`
BwLimitFile BwTimetable `yaml:"bw_limit_file"`
Expand Down
23 changes: 23 additions & 0 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,29 @@ type ListRer interface {
ListR(ctx context.Context, dir string, callback ListRCallback) error
}

// ListCBCallback defines a callback function for ListCB to use.
//
// It is called for each tranche of entries read from the listing and
// if it returns an error, the listing stops.
type ListCBCallback func(entry DirEntries) error

// ListCBer extends Fs with ListCB.
type ListCBer interface {
Fs
// ListCB calls callback on directory entries as they are being listed.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
ListCB(ctx context.Context, dir string, callback ListCBCallback) error
}

// RangeSeeker is the interface that wraps the RangeSeek method.
//
// Some of the returns from Object.Open() may optionally implement
Expand Down
31 changes: 31 additions & 0 deletions fs/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,37 @@ func DirSorted(ctx context.Context, f fs.Fs, includeAll bool, dir string) (entri
return filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f))
}

// Func is copied from imports github.com/rclone/rclone/fs/walk
// in order to avoid import cycle.
type Func func(path string, entries fs.DirEntries, err error) error

// DirCBFunc is the type of DirCB function.
type DirCBFunc func(ctx context.Context, fs fs.ListCBer, includeAll bool, dir string, cb Func) error

// DirCB works like DirSorted but uses ListCB instead of List for file listing.
func DirCB(ctx context.Context, f fs.ListCBer, includeAll bool, dir string, cb Func) error {
fi := filter.GetConfig(ctx)
// Get unfiltered entries from the fs
return f.ListCB(ctx, dir, func(entries fs.DirEntries) error {
// This should happen only if exclude files lives in the
// starting directory, otherwise ListDirSorted should not be
// called.
if !includeAll && fi.ListContainsExcludeFile(entries) {
fs.Debugf(dir, "Excluded")
return nil
}
var err error
entries, err = filterAndSortDir(ctx, entries, includeAll, dir, fi.IncludeObject, fi.IncludeDirectory(ctx, f))
if err != nil {
return err
}
if len(entries) > 0 {
return cb(dir, entries, nil)
}
return nil
})
}

// filter (if required) and check the entries, then sort them
func filterAndSortDir(ctx context.Context, entries fs.DirEntries, includeAll bool, dir string,
IncludeObject func(ctx context.Context, o fs.Object) bool,
Expand Down
103 changes: 103 additions & 0 deletions fs/walk/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func Walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i
if (maxLevel < 0 || maxLevel > 1) && ci.UseListR && f.Features().ListR != nil {
return walkListR(ctx, f, path, includeAll, maxLevel, fn)
}
if fcb, ok := f.(fs.ListCBer); ci.UseListCB && ok {
return walkCB(ctx, fcb, path, includeAll, maxLevel, fn, list.DirCB)
}
return walkListDirSorted(ctx, f, path, includeAll, maxLevel, fn)
}

Expand Down Expand Up @@ -446,6 +449,106 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i
close(in)
wg.Wait()
close(errs)
close(errs)
// return the first error returned or nil
return <-errs
}

func walkCB(ctx context.Context, f fs.ListCBer, path string, includeAll bool, maxLevel int, fn Func, listDir list.DirCBFunc) error {
var (
wg sync.WaitGroup // sync closing of go routines
traversing sync.WaitGroup // running directory traversals
doClose sync.Once // close the channel once
mu sync.Mutex // stop fn being called concurrently
ci = fs.GetConfig(ctx) // current config
)
// listJob describe a directory listing that needs to be done
type listJob struct {
remote string
depth int
}

in := make(chan listJob, ci.Checkers)
errs := make(chan error, 1)
quit := make(chan struct{})
closeQuit := func() {
doClose.Do(func() {
close(quit)
go func() {
for range in {
traversing.Done()
}
}()
})
}
for i := 0; i < ci.Checkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-in:
if !ok {
return
}
// Diff from walk
// Start
var jobs []listJob
err := listDir(ctx, f, includeAll, job.remote, func(path string, entries fs.DirEntries, err error) error {
if err == nil && job.depth != 0 {
entries.ForDir(func(dir fs.Directory) {
// Recurse for the directory
jobs = append(jobs, listJob{
remote: dir.Remote(),
depth: job.depth - 1,
})
})
}
mu.Lock()
defer mu.Unlock()
return fn(path, entries, err)
})
// End
// NB once we have passed entries to fn we mustn't touch it again
if err != nil && err != ErrorSkipDir {
traversing.Done()
err = fs.CountError(err)
fs.Errorf(job.remote, "error listing: %v", err)
closeQuit()
// Send error to error channel if space
select {
case errs <- err:
default:
}
continue
}
if err == nil && len(jobs) > 0 {
traversing.Add(len(jobs))
go func() {
// Now we have traversed this directory, send these
// jobs off for traversal in the background
for _, newJob := range jobs {
in <- newJob
}
}()
}
traversing.Done()
case <-quit:
return
}
}
}()
}
// Start the process
traversing.Add(1)
in <- listJob{
remote: path,
depth: maxLevel - 1,
}
traversing.Wait()
close(in)
wg.Wait()
close(errs)
// return the first error returned or nil
return <-errs
}
Expand Down

0 comments on commit cc910f3

Please sign in to comment.