Skip to content

Commit

Permalink
sync: add zstd support
Browse files Browse the repository at this point in the history
  • Loading branch information
miku committed May 23, 2022
1 parent 22ef62d commit 660babd
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
27 changes: 21 additions & 6 deletions atomic/file.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package atomic

import (
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"

"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip"
)

// File behaves like os.File, but does an atomic rename operation at Close.
Expand Down Expand Up @@ -57,28 +59,41 @@ func (f *File) Abort() error {
return nil
}

// Compress and return path to compressed file.
func Compress(filename string) (string, error) {
func CompressType(filename string, ty string) (string, error) {
tf, err := ioutil.TempFile("", "span-atomic-")
if err != nil {
return "", err
}
defer tf.Close()
zw := gzip.NewWriter(tf)
var w io.WriteCloser
switch {
case ty == "zstd":
w, err = zstd.NewWriter(tf)
if err != nil {
return "", err
}
default:
w = gzip.NewWriter(tf)
}
f, err := os.Open(filename)
if err != nil {
return "", err
}
defer f.Close()
if _, err := io.Copy(zw, f); err != nil {
if _, err := io.Copy(w, f); err != nil {
return "", err
}
if err := zw.Close(); err != nil {
if err := w.Close(); err != nil {
return "", err
}
return tf.Name(), nil
}

// Compress and return path to compressed file.
func Compress(filename string) (string, error) {
return CompressType(filename, "gzip")
}

// WriteFile writes the data to a temp file and atomically move if everything else succeeds.
func WriteFile(filename string, data []byte, perm os.FileMode) error {
dir, name := path.Split(filename)
Expand Down
31 changes: 16 additions & 15 deletions cmd/span-crossref-sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,21 @@ import (
)

var (
cacheDir = flag.String("c", path.Join(xdg.CacheHome, "span/crossref-sync"), "cache directory")
apiEndpoint = flag.String("a", "https://api.crossref.org/works", "works api")
apiFilter = flag.String("f", "index", "filter")
apiEmail = flag.String("m", "[email protected]", "email address")
numRows = flag.Int("r", 1000, "number of docs per request")
userAgent = flag.String("ua", "span-crossref-sync/dev (https://github.com/miku/span)", "user agent string")
modeCount = flag.Bool("C", false, "just sum up all total results values")
debug = flag.Bool("debug", false, "print out intervals")
verbose = flag.Bool("verbose", false, "be verbose")
outputFile = flag.String("o", "", "output filename (stdout, otherwise)")
timeout = flag.Duration("t", 60*time.Second, "connectiont timeout")
maxRetries = flag.Int("x", 10, "max retries")
mode = flag.String("mode", "t", "t=tabs, s=sync")
intervals = flag.String("i", "d", "intervals: d=daily, w=weekly, m=monthly")
cacheDir = flag.String("c", path.Join(xdg.CacheHome, "span/crossref-sync"), "cache directory")
apiEndpoint = flag.String("a", "https://api.crossref.org/works", "works api")
apiFilter = flag.String("f", "index", "filter")
apiEmail = flag.String("m", "[email protected]", "email address")
numRows = flag.Int("r", 1000, "number of docs per request")
userAgent = flag.String("ua", "span-crossref-sync/dev (https://github.com/miku/span)", "user agent string")
modeCount = flag.Bool("C", false, "just sum up all total results values")
debug = flag.Bool("debug", false, "print out intervals")
verbose = flag.Bool("verbose", false, "be verbose")
outputFile = flag.String("o", "", "output filename (stdout, otherwise)")
timeout = flag.Duration("t", 60*time.Second, "connectiont timeout")
maxRetries = flag.Int("x", 10, "max retries")
mode = flag.String("mode", "t", "t=tabs, s=sync")
intervals = flag.String("i", "d", "intervals: d=daily, w=weekly, m=monthly")
compressProgram = flag.String("p", "gzip", "compress program: gzip or zstd")

syncStart xflag.Date = xflag.Date{Time: dateutil.MustParse("2021-01-01")}
syncEnd xflag.Date = xflag.Date{Time: time.Now().UTC().Add(-24 * time.Hour)}
Expand Down Expand Up @@ -548,7 +549,7 @@ func main() {
if err := cacheFile.Close(); err != nil {
log.Fatal(err)
}
compressed, err := atomic.Compress(cachePath)
compressed, err := atomic.CompressType(cachePath, *compressProgram)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/jmoiron/sqlx v1.3.4
github.com/joho/godotenv v1.4.0 // indirect
github.com/kennygrant/sanitize v1.2.4
github.com/klauspost/compress v1.14.2
github.com/klauspost/compress v1.15.4
github.com/klauspost/pgzip v1.2.5
github.com/kr/pretty v0.3.0 // indirect
github.com/lytics/logrus v0.0.0-20170528191427-4389a17ed024
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ github.com/kennygrant/sanitize v1.2.4 h1:gN25/otpP5vAsO2djbMhF/LQX6R7+O1TB4yv8Nz
github.com/kennygrant/sanitize v1.2.4/go.mod h1:LGsjYYtgxbetdg5owWB2mpgUL6e2nfw2eObZ0u0qvak=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.4 h1:1kn4/7MepF/CHmYub99/nNX8az0IJjfSOU/jbnTVfqQ=
github.com/klauspost/compress v1.15.4/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down

0 comments on commit 660babd

Please sign in to comment.