From af775c3b7580ddc6d67b9738faeaf61ea3e45561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 2 Dec 2024 12:27:50 +0100 Subject: [PATCH] feature(s3): implement ListCB --- backend/s3/s3.go | 54 ++++++++++++++++++++++++++++++++++++++++-------- fs/walk/walk.go | 1 - 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 8c22b09432acb..a77a5b98313ae 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -1979,25 +1979,28 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec return o, nil } -// listDir lists files and directories to out -func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { - // List the objects and directories - err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { +// listDirCB calls callback on files as they are being listed. +func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool, cb fs.ListCBCallback) error { + // List the objects and directories. + // Batch callbacks by 100 with walk.NewListRHelper (compatible callback type) + // in order to reduce the amount of allocations and function calls. + list := walk.NewListRHelper(fs.ListRCallback(cb)) + err := f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) if err != nil { return err } if entry != nil { - entries = append(entries, entry) + return list.Add(entry) } return nil }) if err != nil { - return nil, err + return err } // bucket must be present if listing succeeded f.cache.MarkOK(bucket) - return entries, nil + return list.Flush() } // listBuckets lists the buckets to out @@ -2029,7 +2032,7 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) // // This should return ErrDirNotFound if the directory isn't // found. -func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { bucket, directory := f.split(dir) if bucket == "" { if directory != "" { @@ -2037,7 +2040,40 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e } return f.listBuckets(ctx) } - return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") + + // Use callback for regular listing + var entries fs.DirEntries + err := f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", func(e fs.DirEntries) error { + entries = append(entries, e...) + return nil + }) + if err != nil { + return nil, err + } + return entries, nil +} + +// ListCB calls callback to the objects and directories in dir as they are being listed. +// The callback might be called for just a subset of directory entries. +// When listing buckets, the callback is called just once for all of them. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't found. +func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListCBCallback) error { + bucket, directory := f.split(dir) + if bucket == "" { + if directory != "" { + return fs.ErrorListBucketRequired + } + entries, err := f.listBuckets(ctx) + if err != nil { + return err + } + return cb(entries) + } + return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", cb) } // ListR lists the objects and directories of the Fs starting diff --git a/fs/walk/walk.go b/fs/walk/walk.go index f9a23eef7a3cb..a7cb4183c159e 100644 --- a/fs/walk/walk.go +++ b/fs/walk/walk.go @@ -449,7 +449,6 @@ func walk(ctx context.Context, f fs.Fs, path string, includeAll bool, maxLevel i close(in) wg.Wait() close(errs) - close(errs) // return the first error returned or nil return <-errs }