Skip to content

Commit

Permalink
🐛 An issue that caused cloud data corruption siyuan-note/siyuan#9144
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Sep 9, 2023
1 parent 5978b16 commit 4b41832
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 61 deletions.
2 changes: 1 addition & 1 deletion cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Cloud interface {
GetRepos() (repos []*Repo, size int64, err error)

// UploadObject 用于上传对象,overwrite 参数用于指示是否覆盖已有对象。
UploadObject(filePath string, overwrite bool) (err error)
UploadObject(filePath string, overwrite bool) (length int64, err error)

// DownloadObject 用于下载对象数据 data。
DownloadObject(filePath string) (data []byte, err error)
Expand Down
10 changes: 9 additions & 1 deletion cloud/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,19 @@ func (s3 *S3) GetRepos() (repos []*Repo, size int64, err error) {
return
}

func (s3 *S3) UploadObject(filePath string, overwrite bool) (err error) {
func (s3 *S3) UploadObject(filePath string, overwrite bool) (length int64, err error) {
svc := s3.getService()
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(s3.S3.Timeout)*time.Second)
defer cancelFn()

absFilePath := filepath.Join(s3.Conf.RepoPath, filePath)
info, err := os.Stat(absFilePath)
if nil != err {
logging.LogErrorf("stat failed: %s", err)
return
}
length = info.Size()

file, err := os.Open(absFilePath)
if nil != err {
return
Expand Down
9 changes: 8 additions & 1 deletion cloud/siyuan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloud
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"sort"
Expand Down Expand Up @@ -49,8 +50,14 @@ func NewSiYuan(baseCloud *BaseCloud) *SiYuan {
return &SiYuan{BaseCloud: baseCloud}
}

func (siyuan *SiYuan) UploadObject(filePath string, overwrite bool) (err error) {
func (siyuan *SiYuan) UploadObject(filePath string, overwrite bool) (length int64, err error) {
absFilePath := filepath.Join(siyuan.Conf.RepoPath, filePath)
info, err := os.Stat(absFilePath)
if nil != err {
logging.LogErrorf("stat failed: %s", err)
return
}
length = info.Size()

key := path.Join("siyuan", siyuan.Conf.UserID, "repo", siyuan.Conf.Dir, filePath)
keyUploadToken, scopeUploadToken, err := siyuan.requestScopeKeyUploadToken(key, overwrite)
Expand Down
13 changes: 10 additions & 3 deletions cloud/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"io/fs"
"math"
"os"
"path"
"path/filepath"
"sort"
Expand All @@ -28,7 +29,6 @@ import (

"github.com/88250/gulu"
"github.com/siyuan-note/dejavu/entity"
"github.com/siyuan-note/filelock"
"github.com/siyuan-note/logging"
"github.com/studio-b12/gowebdav"
)
Expand Down Expand Up @@ -62,9 +62,16 @@ func (webdav *WebDAV) GetRepos() (repos []*Repo, size int64, err error) {
return
}

func (webdav *WebDAV) UploadObject(filePath string, overwrite bool) (err error) {
func (webdav *WebDAV) UploadObject(filePath string, overwrite bool) (length int64, err error) {
absFilePath := filepath.Join(webdav.Conf.RepoPath, filePath)
data, err := filelock.ReadFile(absFilePath)
info, err := os.Stat(absFilePath)
if nil != err {
logging.LogErrorf("stat failed: %s", err)
return
}
length = info.Size()

data, err := os.ReadFile(absFilePath)
if nil != err {
return
}
Expand Down
72 changes: 18 additions & 54 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,12 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS
go func() {
defer waitGroup.Done()

uploadBytes, uploadErr := repo.updateCloudCheckIndex(checkIndex, context)
uploadErr := repo.updateCloudCheckIndex(checkIndex, context)
if nil != uploadErr {
logging.LogErrorf("update cloud check index failed: %s", uploadErr)
err = uploadErr
return
}
trafficStat.UploadFileCount++
trafficStat.UploadBytes += uploadBytes
}()

// 尝试上传修复云端缺失的数据对象
Expand Down Expand Up @@ -752,15 +750,8 @@ func (repo *Repo) existDataFile(files []*entity.File, file *entity.File) bool {

func (repo *Repo) updateCloudRef(ref string, context map[string]interface{}) (uploadBytes int64, err error) {
eventbus.Publish(eventbus.EvtCloudBeforeUploadRef, context, ref)

absFilePath := filepath.Join(repo.Path, ref)
info, err := os.Stat(absFilePath)
if nil != err {
logging.LogErrorf("stat failed: %s", err)
return
}
uploadBytes = info.Size()
err = repo.cloud.UploadObject(ref, true)
length, err := repo.cloud.UploadObject(ref, true)
uploadBytes += length
return
}

Expand Down Expand Up @@ -841,7 +832,8 @@ func (repo *Repo) uploadCloudMissingObjects(trafficStat *TrafficStat, context ma
filePath := "objects/" + objectPath
count++
eventbus.Publish(eventbus.EvtCloudBeforeFixObjects, context, count, total)
if uoErr := repo.cloud.UploadObject(filePath, false); nil != uoErr {
_, uoErr := repo.cloud.UploadObject(filePath, false)
if nil != uoErr {
uploadErr = uoErr
err = uploadErr
return
Expand Down Expand Up @@ -900,13 +892,13 @@ func (repo *Repo) uploadCloudMissingObjects(trafficStat *TrafficStat, context ma
return
}

if err = repo.cloud.UploadObject(checkReportKey, true); nil != err {
if _, err = repo.cloud.UploadObject(checkReportKey, true); nil != err {
logging.LogErrorf("upload check report failed: %s", err)
}
return
}

func (repo *Repo) updateCloudCheckIndex(checkIndex *entity.CheckIndex, context map[string]interface{}) (uploadBytes int64, err error) {
func (repo *Repo) updateCloudCheckIndex(checkIndex *entity.CheckIndex, context map[string]interface{}) (err error) {
eventbus.Publish(eventbus.EvtCloudBeforeUploadCheckIndex, context)

data, marshalErr := gulu.JSON.MarshalIndentJSON(checkIndex, "", "\t")
Expand All @@ -928,7 +920,7 @@ func (repo *Repo) updateCloudCheckIndex(checkIndex *entity.CheckIndex, context m
return
}

if err = repo.cloud.UploadObject("check/indexes/"+checkIndex.ID, false); nil != err {
if _, err = repo.cloud.UploadObject("check/indexes/"+checkIndex.ID, false); nil != err {
logging.LogErrorf("upload check index failed: %s", err)
return
}
Expand Down Expand Up @@ -996,23 +988,15 @@ func (repo *Repo) updateCloudIndexesV2(latest *entity.Index, context map[string]
return
}

err = repo.cloud.UploadObject("indexes-v2.json", true)
uploadBytes = int64(len(data))
length, err := repo.cloud.UploadObject("indexes-v2.json", true)
uploadBytes = length
return
}

func (repo *Repo) uploadIndex(index *entity.Index, context map[string]interface{}) (uploadBytes int64, err error) {
absFilePath := filepath.Join(repo.Path, "indexes", index.ID)
info, err := os.Stat(absFilePath)
if nil != err {
logging.LogErrorf("stat failed: %s", err)
return
}
length := info.Size()
uploadBytes += length

eventbus.Publish(eventbus.EvtCloudBeforeUploadIndex, context, index.ID)
err = repo.cloud.UploadObject(path.Join("indexes", index.ID), false)
length, err := repo.cloud.UploadObject(path.Join("indexes", index.ID), false)
uploadBytes += length
return
}

Expand All @@ -1021,18 +1005,6 @@ func (repo *Repo) uploadFiles(upsertFiles []*entity.File, context map[string]int
return
}

for _, upsertFile := range upsertFiles {
absFilePath := filepath.Join(repo.Path, "objects", upsertFile.ID[:2], upsertFile.ID[2:])
info, statErr := os.Stat(absFilePath)
if nil != statErr {
err = statErr
logging.LogErrorf("stat failed: %s", err)
return
}
length := info.Size()
uploadBytes += length
}

waitGroup := &sync.WaitGroup{}
var uploadErr error
poolSize := 8
Expand All @@ -1050,11 +1022,13 @@ func (repo *Repo) uploadFiles(upsertFiles []*entity.File, context map[string]int
filePath := path.Join("objects", upsertFileID[:2], upsertFileID[2:])
count++
eventbus.Publish(eventbus.EvtCloudBeforeUploadFile, context, count, total)
if uoErr := repo.cloud.UploadObject(filePath, false); nil != uoErr {
length, uoErr := repo.cloud.UploadObject(filePath, false)
if nil != uoErr {
uploadErr = uoErr
err = uploadErr
return
}
uploadBytes += length
})
if nil != err {
return
Expand Down Expand Up @@ -1082,18 +1056,6 @@ func (repo *Repo) uploadChunks(upsertChunkIDs []string, context map[string]inter
return
}

for _, upsertChunkID := range upsertChunkIDs {
absFilePath := filepath.Join(repo.Path, "objects", upsertChunkID[:2], upsertChunkID[2:])
info, statErr := os.Stat(absFilePath)
if nil != statErr {
err = statErr
logging.LogErrorf("stat failed: %s", err)
return
}
length := info.Size()
uploadBytes += length
}

waitGroup := &sync.WaitGroup{}
var uploadErr error
poolSize := 8
Expand All @@ -1111,11 +1073,13 @@ func (repo *Repo) uploadChunks(upsertChunkIDs []string, context map[string]inter
filePath := path.Join("objects", upsertChunkID[:2], upsertChunkID[2:])
count++
eventbus.Publish(eventbus.EvtCloudBeforeUploadChunk, context, count, total)
if uoErr := repo.cloud.UploadObject(filePath, false); nil != uoErr {
length, uoErr := repo.cloud.UploadObject(filePath, false)
if nil != uoErr {
uploadErr = uoErr
err = uploadErr
return
}
uploadBytes += length
})
if nil != err {
return
Expand Down
2 changes: 1 addition & 1 deletion sync_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (repo *Repo) lockCloud0(currentDeviceID string) (err error) {
return
}

err = repo.cloud.UploadObject(lockSyncKey, true)
_, err = repo.cloud.UploadObject(lockSyncKey, true)
if nil != err {
if errors.Is(err, cloud.ErrSystemTimeIncorrect) || errors.Is(err, cloud.ErrCloudAuthFailed) || errors.Is(err, cloud.ErrDeprecatedVersion) ||
errors.Is(err, cloud.ErrCloudCheckFailed) {
Expand Down

0 comments on commit 4b41832

Please sign in to comment.