Skip to content

Commit

Permalink
feature(s3): implement ListCB
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Dec 2, 2024
1 parent cc910f3 commit af775c3
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
54 changes: 45 additions & 9 deletions backend/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,25 +1979,28 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec
return o, nil
}

// listDir lists files and directories to out
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
// List the objects and directories
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error {
// listDirCB calls callback on files as they are being listed.
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool, cb fs.ListCBCallback) error {
// List the objects and directories.
// Batch callbacks by 100 with walk.NewListRHelper (compatible callback type)
// in order to reduce the amount of allocations and function calls.
list := walk.NewListRHelper(fs.ListRCallback(cb))
err := f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error {
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
if err != nil {
return err
}
if entry != nil {
entries = append(entries, entry)
return list.Add(entry)
}
return nil
})
if err != nil {
return nil, err
return err
}
// bucket must be present if listing succeeded
f.cache.MarkOK(bucket)
return entries, nil
return list.Flush()
}

// listBuckets lists the buckets to out
Expand Down Expand Up @@ -2029,15 +2032,48 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error)
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
bucket, directory := f.split(dir)
if bucket == "" {
if directory != "" {
return nil, fs.ErrorListBucketRequired
}
return f.listBuckets(ctx)
}
return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "")

// Use callback for regular listing
var entries fs.DirEntries
err := f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", func(e fs.DirEntries) error {
entries = append(entries, e...)
return nil
})
if err != nil {
return nil, err
}
return entries, nil
}

// ListCB calls callback to the objects and directories in dir as they are being listed.
// The callback might be called for just a subset of directory entries.
// When listing buckets, the callback is called just once for all of them.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't found.
func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListCBCallback) error {
bucket, directory := f.split(dir)
if bucket == "" {
if directory != "" {
return fs.ErrorListBucketRequired
}
entries, err := f.listBuckets(ctx)
if err != nil {
return err
}
return cb(entries)
}
return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", cb)
}

// ListR lists the objects and directories of the Fs starting
Expand Down
1 change: 0 additions & 1 deletion fs/walk/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i
close(in)
wg.Wait()
close(errs)
close(errs)
// return the first error returned or nil
return <-errs
}
Expand Down

0 comments on commit af775c3

Please sign in to comment.