Skip to content

Commit

Permalink
feat(azureblob): implement ListCB
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski committed Dec 3, 2024
1 parent bfebdcd commit 9f3550e
Showing 1 changed file with 77 additions and 63 deletions.
140 changes: 77 additions & 63 deletions backend/azureblob/azureblob.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Package azureblob provides an interface to the Microsoft Azure blob object storage system

//go:build !plan9 && !solaris && !js && go1.14
// +build !plan9,!solaris,!js,go1.14

package azureblob
Expand Down Expand Up @@ -861,6 +862,30 @@ func (f *Fs) list(ctx context.Context, container, directory, prefix string, addC
return nil
}

// listCB calls list with cb wrapped in walk.NewListRHelper.
func (f *Fs) listCB(ctx context.Context, container, directory, prefix string, addContainer, recourse bool, cb fs.ListRCallback) error {
if !f.containerOK(container) {
return fs.ErrorDirNotFound
}
list := walk.NewListRHelper(cb)
err := f.list(ctx, container, directory, prefix, addContainer, recourse, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
if entry != nil {
return list.Add(entry)
}
return nil
})
if err != nil {
return err
}
// container must be present if listing succeeded
f.cache.MarkOK(container)
return list.Flush()
}

// Convert a list item into a DirEntry
func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItemInternal, isDirectory bool) (fs.DirEntry, error) {
if isDirectory {
Expand Down Expand Up @@ -889,29 +914,6 @@ func (f *Fs) containerOK(container string) bool {
return false
}

// listDir lists a single directory
func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
if !f.containerOK(container) {
return nil, fs.ErrorDirNotFound
}
err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
if entry != nil {
entries = append(entries, entry)
}
return nil
})
if err != nil {
return nil, err
}
// container must be present if listing succeeded
f.cache.MarkOK(container)
return entries, nil
}

// listContainers returns all the containers to out
func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) {
if f.isLimited {
Expand Down Expand Up @@ -944,15 +946,45 @@ func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err err
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
container, directory := f.split(dir)
if container == "" {
if directory != "" {
return nil, fs.ErrorListBucketRequired
}
return f.listContainers(ctx)
}
return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "")

// Use callback for regular listing
var entries fs.DirEntries
err := f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", false, func(e fs.DirEntries) error {
entries = append(entries, e...)
return nil
})
if err != nil {
return nil, err
}
return entries, nil
}

// ListCB calls callback to the objects and directories in dir as they are being listed.
// The callback might be called for just a subset of directory entries.
// When listing buckets, the callback is called just once for all of them.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't found.
func (f *Fs) ListCB(ctx context.Context, dir string, cb fs.ListRCallback) error {
container, directory := f.split(dir)
if container == "" {
entries, err := f.listContainers(ctx)
if err != nil {
return err
}
return cb(entries)
}
return f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", false, cb)
}

// ListR lists the objects and directories of the Fs starting
Expand All @@ -971,48 +1003,28 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
func (f *Fs) ListR(ctx context.Context, dir string, cb fs.ListRCallback) error {
container, directory := f.split(dir)
list := walk.NewListRHelper(callback)
listR := func(container, directory, prefix string, addContainer bool) error {
return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItemInternal, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
return list.Add(entry)
})
}
if container == "" {
entries, err := f.listContainers(ctx)
if err != nil {
return err
}
for _, entry := range entries {
err = list.Add(entry)
// Call callback on container right before listing its files
err = cb(fs.DirEntries{entry})
if err != nil {
return err
}
container := entry.Remote()
err = listR(container, "", f.rootDirectory, true)
err = f.listCB(ctx, container, "", f.rootDirectory, true, true, cb)
if err != nil {
return err
}
// container must be present if listing succeeded
f.cache.MarkOK(container)
}
} else {
if !f.containerOK(container) {
return fs.ErrorDirNotFound
}
err = listR(container, directory, f.rootDirectory, f.rootContainer == "")
if err != nil {
return err
}
// container must be present if listing succeeded
f.cache.MarkOK(container)
return nil
}
return list.Flush()
return f.listCB(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, cb)
}

// listContainerFn is called from listContainersToFn to handle a container
Expand Down Expand Up @@ -1049,7 +1061,7 @@ func (f *Fs) listContainersToFn(fn listContainerFn) error {

// Put the object into the container
//
// Copy the reader in to the new object which is returned
// # Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
Expand Down Expand Up @@ -1181,9 +1193,9 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {

// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given
// # This is stored with the remote path given
//
// It returns the destination Object and a possible error
// # It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
Expand Down Expand Up @@ -1303,11 +1315,12 @@ func (o *Object) setMetadata(metadata azblob.Metadata) {
// decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in
//
// Sets
// o.id
// o.modTime
// o.size
// o.md5
// o.meta
//
// o.id
// o.modTime
// o.size
// o.md5
// o.meta
func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetPropertiesResponse) (err error) {
metadata := info.NewMetadata()
size := info.ContentLength()
Expand Down Expand Up @@ -1357,10 +1370,11 @@ func (o *Object) clearMetaData() {
// readMetaData gets the metadata if it hasn't already been fetched
//
// Sets
// o.id
// o.modTime
// o.size
// o.md5
//
// o.id
// o.modTime
// o.size
// o.md5
func (o *Object) readMetaData() (err error) {
if !o.modTime.IsZero() {
return nil
Expand Down

0 comments on commit 9f3550e

Please sign in to comment.