Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sharding for parity with flatfs #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ The config file should include the following:
"rootDirectory": "$bucketsubdirectory",
"accessKey": "",
"secretKey": "",
"keyTransform": "$keytransformmethod"
"shardFunc": "$shardfunc"
},
"mountpoint": "/blocks",
"prefix": "s3.datastore",
Expand All @@ -83,16 +83,19 @@ The config file should include the following:

If the access and secret key are blank they will be loaded from the usual ~/.aws/.

The key transform allows you to specify how data is stored behind S3 keys. It must be one of the available methods:
The shard function allows you to specify how data is stored behind S3 keys. It must be one of the available methods:

`default`
`/repo/s3/shard/v1/identity/0`
- No sharding.

`suffix`
- Shards by storing block data at a key with a `data` suffix. E.g. `CIQJ7IHPGOFUJT5UMXIW6CUDSNH6AVKMEOXI3UM3VLYJRZUISUMGCXQ/data`
`/repo/s3/shard/v1/prefix/<n>`
- Shards by storing block data at a key with the n first characters of its CID. E.g. `CIQJ7IHP/CIQJ7IHPGOFUJT5UMXIW6CUDSNH6AVKMEOXI3UM3VLYJRZUISUMGCXQ`

`next-to-last/2`
- Shards by storing block data based on the second to last 2 characters of its key. E.g. `CX/CIQJ7IHPGOFUJT5UMXIW6CUDSNH6AVKMEOXI3UM3VLYJRZUISUMGCXQ`
`/repo/s3/shard/v1/suffix/<n>`
- Shards by storing block data at a key with the n last characters of its CID. E.g. `CXQ/CIQJ7IHPGOFUJT5UMXIW6CUDSNH6AVKMEOXI3UM3VLYJRZUISUMGCXQ`

`/repo/s3/shard/v1/next-to-last/<n>`
- Shards by storing block data at a key with the second to last n characters of its CID. E.g. `CX/CIQJ7IHPGOFUJT5UMXIW6CUDSNH6AVKMEOXI3UM3VLYJRZUISUMGCXQ`

If you are on another S3 compatible provider, e.g. Linode, then your config should be:

Expand All @@ -112,7 +115,7 @@ If you are on another S3 compatible provider, e.g. Linode, then your config shou
"regionEndpoint": "us-east-1.linodeobjects.com",
"accessKey": "",
"secretKey": "",
"keyTransform": "$keytransformmethod"
"shardFunc": "$shardfunc"
},
"mountpoint": "/blocks",
"prefix": "s3.datastore",
Expand All @@ -137,7 +140,9 @@ This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/c
### Local Development

```
go install
go mod vendor
go test
```

### Want to hack on IPFS?
Expand Down
12 changes: 6 additions & 6 deletions plugin/s3ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ func (s3p S3Plugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
}
}

var keyTransform string
if v, ok := m["keyTransform"]; ok {
var shardFunc string
if v, ok := m["shardFunc"]; ok {
if v == "" {
keyTransform = "default"
shardFunc = "/repo/s3/shard/v1/identity/0"
} else {
keyTransform, ok = v.(string)
shardFunc, ok = v.(string)
if !ok {
return nil, fmt.Errorf("s3ds: keyTransform is not a valid key transform method")
return nil, fmt.Errorf("s3ds: shardFunc is not a valid string")
}
}
}
Expand All @@ -121,7 +121,7 @@ func (s3p S3Plugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
Workers: workers,
RegionEndpoint: endpoint,
CredentialsEndpoint: credentialsEndpoint,
KeyTransform: keyTransform,
ShardFunc: shardFunc,
},
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/s3ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestS3PluginDatastoreConfigParser(t *testing.T) {
"regionEndpoint": "someendpoint",
"workers": 42.0,
"credentialsEndpoint": "somecredendpoint",
"keyTransform": "default",
"shardFunc": "/repo/s3/shard/v1/next-to-last/2",
},
Want: &S3Config{cfg: s3ds.Config{
Region: "someregion",
Expand All @@ -59,7 +59,7 @@ func TestS3PluginDatastoreConfigParser(t *testing.T) {
RegionEndpoint: "someendpoint",
Workers: 42,
CredentialsEndpoint: "somecredendpoint",
KeyTransform: "default",
ShardFunc: "/repo/s3/shard/v1/next-to-last/2",
}},
},
}
Expand Down
37 changes: 15 additions & 22 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,7 @@ type Config struct {
RootDirectory string
Workers int
CredentialsEndpoint string
KeyTransform string
}

var KeyTransforms = map[string]func(ds.Key) string{
"default": func(k ds.Key) string {
return k.String()
},
"suffix": func(k ds.Key) string {
return k.String() + "/data"
},
"next-to-last/2": func(k ds.Key) string {
s := k.String()
offset := 1
start := len(s) - 2 - offset
return s[start:start+2] + "/" + s
},
ShardFunc string
}

func NewS3Datastore(conf Config) (*S3Bucket, error) {
Expand Down Expand Up @@ -124,7 +109,7 @@ func NewS3Datastore(conf Config) (*S3Bucket, error) {
func (s *S3Bucket) Put(ctx context.Context, k ds.Key, value []byte) error {
_, err := s.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(s.s3Path(prepareKey(s.Config, k))),
Key: aws.String(s.s3Path(s.prepareKey(k))),
Body: bytes.NewReader(value),
})
return err
Expand All @@ -137,7 +122,7 @@ func (s *S3Bucket) Sync(ctx context.Context, prefix ds.Key) error {
func (s *S3Bucket) Get(ctx context.Context, k ds.Key) ([]byte, error) {
resp, err := s.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(s.s3Path(prepareKey(s.Config, k))),
Key: aws.String(s.s3Path(s.prepareKey(k))),
})
if err != nil {
if isNotFound(err) {
Expand All @@ -164,7 +149,7 @@ func (s *S3Bucket) Has(ctx context.Context, k ds.Key) (exists bool, err error) {
func (s *S3Bucket) GetSize(ctx context.Context, k ds.Key) (size int, err error) {
resp, err := s.S3.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(s.s3Path(prepareKey(s.Config, k))),
Key: aws.String(s.s3Path(s.prepareKey(k))),
})
if err != nil {
if s3Err, ok := err.(awserr.Error); ok && s3Err.Code() == "NotFound" {
Expand All @@ -178,7 +163,7 @@ func (s *S3Bucket) GetSize(ctx context.Context, k ds.Key) (size int, err error)
func (s *S3Bucket) Delete(ctx context.Context, k ds.Key) error {
_, err := s.S3.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(s.s3Path(prepareKey(s.Config, k))),
Key: aws.String(s.s3Path(s.prepareKey(k))),
})
if isNotFound(err) {
// delete is idempotent
Expand All @@ -187,8 +172,16 @@ func (s *S3Bucket) Delete(ctx context.Context, k ds.Key) error {
return err
}

func prepareKey(cfg Config, k ds.Key) string {
return KeyTransforms[cfg.KeyTransform](k)
func (s *S3Bucket) prepareKey(k ds.Key) string {
dir, file := s.encode(k)
return path.Join(dir, file)
}

func (s *S3Bucket) encode(k ds.Key) (dir, file string) {
noslash := k.String()[1:]
id, _ := ParseShardFunc(s.Config.ShardFunc)
dir = id.fun(noslash)
return dir, noslash
}

func (s *S3Bucket) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
Expand Down
8 changes: 4 additions & 4 deletions s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func TestSuiteLocalS3(t *testing.T) {
t.Skipf("skipping test suit; LOCAL_S3 is not set.")
}

localBucketName, localBucketNameSet := os.LookupEnv("LOCAL_BUCKET_NAME")
if !localBucketNameSet {
localBucketName = fmt.Sprintf("localbucketname%d", time.Now().UnixNano())
localBucketName, localBucketNameSet := os.LookupEnv("LOCAL_BUCKET_NAME")
if !localBucketNameSet {
localBucketName = fmt.Sprintf("localbucketname%d", time.Now().UnixNano())
}

config := Config{
Expand All @@ -30,7 +30,7 @@ func TestSuiteLocalS3(t *testing.T) {
Region: "local",
AccessKey: "test",
SecretKey: "testdslocal",
KeyTransform: "default",
ShardFunc: "/repo/s3/shard/v1/identity/0",
}

s3ds, err := NewS3Datastore(config)
Expand Down
123 changes: 123 additions & 0 deletions shard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package s3ds

import (
"fmt"
"strconv"
"strings"
)

var IPFS_DEF_SHARD = NextToLast(2)
var IPFS_DEF_SHARD_STR = IPFS_DEF_SHARD.String()

const PREFIX = "/repo/s3/shard/"

// TODO: Read existing sharding method
// const SHARDING_FN = "SHARDING"
// const README_FN = "_README"

type ShardFunc func(string) string

type ShardIdV1 struct {
funName string
param int
fun ShardFunc
}

func (f *ShardIdV1) String() string {
return fmt.Sprintf("%sv1/%s/%d", PREFIX, f.funName, f.param)
}

func (f *ShardIdV1) Func() ShardFunc {
return f.fun
}

func Identity() *ShardIdV1 {
return &ShardIdV1{
funName: "identity",
param: 0,
fun: func(noslash string) string {
return ""
},
}
}

func Prefix(prefixLen int) *ShardIdV1 {
padding := strings.Repeat("_", prefixLen)
return &ShardIdV1{
funName: "prefix",
param: prefixLen,
fun: func(noslash string) string {
return (noslash + padding)[:prefixLen]
},
}
}

func Suffix(suffixLen int) *ShardIdV1 {
padding := strings.Repeat("_", suffixLen)
return &ShardIdV1{
funName: "suffix",
param: suffixLen,
fun: func(noslash string) string {
str := padding + noslash
return str[len(str)-suffixLen:]
},
}
}

func NextToLast(suffixLen int) *ShardIdV1 {
padding := strings.Repeat("_", suffixLen+1)
return &ShardIdV1{
funName: "next-to-last",
param: suffixLen,
fun: func(noslash string) string {
str := padding + noslash
offset := len(str) - suffixLen - 1
return str[offset : offset+suffixLen]
},
}
}

func ParseShardFunc(str string) (*ShardIdV1, error) {
str = strings.TrimSpace(str)

if len(str) == 0 {
return nil, fmt.Errorf("empty shard identifier")
}

trimmed := strings.TrimPrefix(str, PREFIX)
if str == trimmed { // nothing trimmed
return nil, fmt.Errorf("invalid or no prefix in shard identifier: %s", str)
}
str = trimmed

parts := strings.Split(str, "/")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid shard identifier: %s", str)
}

version := parts[0]
if version != "v1" {
return nil, fmt.Errorf("expected 'v1' for version string got: %s", version)
}

funName := parts[1]

param, err := strconv.Atoi(parts[2])
if err != nil {
return nil, fmt.Errorf("invalid parameter: %v", err)
}

switch funName {
case "identity":
return Identity(), nil
case "prefix":
return Prefix(param), nil
case "suffix":
return Suffix(param), nil
case "next-to-last":
return NextToLast(param), nil
default:
return nil, fmt.Errorf("expected 'identity', 'prefix', 'suffix' or 'next-to-last' got: %s", funName)
}

}
Loading