Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unlock GVL for simple compression and decompression #84

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ jobs:
with:
ruby-version: ${{ matrix.ruby-version }}
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- name: Install dependencies
run: bundle install
- name: Compile
run: bundle exec rake compile
- name: Run tests
run: bundle exec rspec
- name: Run benchmarks
working-directory: benchmarks
run: |
bundle install
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_comporess.rb city.json
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_decomporess.rb city.json
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_streaming_comporess.rb city.json
THREADS=4 /usr/bin/time -f "command %C\treal %e\tuser %U\tsys %S\tCPU %P" bundle exec ruby multi_thread_streaming_decomporess.rb city.json
bundle exec ruby large_bytes.rb
11 changes: 4 additions & 7 deletions benchmarks/large_bytes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
require 'zstd-ruby'
require 'securerandom'

source_data = ""
512.times { source_data += SecureRandom.uuid }
# 1<<17 だと GitHub ActionsでOOMになる
source_data = Random.bytes(1<<16)

puts "source_data.size:#{source_data.size}"

# Test compressing and de-compressing our source data 100,000 times. The cycles
# are intended to exercise the libary and reproduce a memory leak.
100_000.times do |i|
10.times do |i|
compressed_data = Zstd.compress(source_data)
expanded_data = Zstd.decompress(compressed_data)
unless expanded_data == source_data
puts "Error: expanded data does not match source data"
end
if i % 1000 == 0
puts " - #{i}: c:#{compressed_data.size} e:#{expanded_data.size} memory:#{`ps -o rss= -p #{Process.pid}`.to_i}"
end

puts " - #{i}: c:#{compressed_data.size} e:#{expanded_data.size} memory:#{`ps -o rss= -p #{Process.pid}`.to_i}"
end
22 changes: 22 additions & 0 deletions benchmarks/multi_thread_comporess.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
$LOAD_PATH.unshift '../lib'
require 'zstd-ruby'
require 'thread'

GUESSES = (ENV['GUESSES'] || 1000).to_i
THREADS = (ENV['THREADS'] || 1).to_i

p GUESSES: GUESSES, THREADS: THREADS

sample_file_name = ARGV[0]
json_string = File.read("./samples/#{sample_file_name}")

queue = Queue.new
GUESSES.times { queue << json_string }
THREADS.times { queue << nil }
THREADS.times.map {
Thread.new {
while str = queue.pop
Zstd.compress(json_string)
end
}
}.each(&:join)
23 changes: 23 additions & 0 deletions benchmarks/multi_thread_decomporess.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
$LOAD_PATH.unshift '../lib'
require 'zstd-ruby'
require 'thread'

GUESSES = (ENV['GUESSES'] || 1000).to_i
THREADS = (ENV['THREADS'] || 1).to_i

p GUESSES: GUESSES, THREADS: THREADS

sample_file_name = ARGV[0]
json_string = File.read("./samples/#{sample_file_name}")
target = Zstd.compress(json_string)

queue = Queue.new
GUESSES.times { queue << target }
THREADS.times { queue << nil }
THREADS.times.map {
Thread.new {
while str = queue.pop
Zstd.decompress(str)
end
}
}.each(&:join)
Binary file modified benchmarks/results/city.json.gzip
Binary file not shown.
74 changes: 68 additions & 6 deletions ext/zstdruby/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,37 @@ static size_t zstd_stream_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output,
#endif
}

struct compress_params {
ZSTD_CCtx* ctx;
char* output_data;
size_t output_size;
char* input_data;
size_t input_size;
size_t ret;
};

static void* compress_wrapper(void* args)
{
struct compress_params* params = args;
params->ret = ZSTD_compress2(params->ctx ,params->output_data, params->output_size, params->input_data, params->input_size);
return NULL;
}

static size_t zstd_compress(ZSTD_CCtx* const ctx, char* output_data, size_t output_size, char* input_data, size_t input_size, bool gvl)
{
#ifdef HAVE_RUBY_THREAD_H
if (gvl) {
return ZSTD_compress2(ctx , output_data, output_size, input_data, input_size);
} else {
struct compress_params params = { ctx, output_data, output_size, input_data, input_size };
rb_thread_call_without_gvl(compress_wrapper, &params, NULL, NULL);
return params.ret;
}
#else
return ZSTD_compress2(ctx , output_data, output_size, input_data, input_size);
#endif
}

static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
{
ID kwargs_keys[1];
Expand All @@ -92,33 +123,64 @@ static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
}
}

struct decompress_params {
struct stream_decompress_params {
ZSTD_DCtx* dctx;
ZSTD_outBuffer* output;
ZSTD_inBuffer* input;
size_t ret;
};

static void* decompress_wrapper(void* args)
static void* stream_decompress_wrapper(void* args)
{
struct decompress_params* params = args;
struct stream_decompress_params* params = args;
params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input);
return NULL;
}

static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
static size_t zstd_stream_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
{
#ifdef HAVE_RUBY_THREAD_H
if (gvl) {
return ZSTD_decompressStream(dctx, output, input);
} else {
struct decompress_params params = { dctx, output, input };
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
struct stream_decompress_params params = { dctx, output, input };
rb_thread_call_without_gvl(stream_decompress_wrapper, &params, NULL, NULL);
return params.ret;
}
#else
return ZSTD_decompressStream(dctx, output, input);
#endif
}

struct decompress_params {
ZSTD_DCtx* dctx;
char* output_data;
size_t output_size;
char* input_data;
size_t input_size;
size_t ret;
};

static void* decompress_wrapper(void* args)
{
struct decompress_params* params = args;
params->ret = ZSTD_decompressDCtx(params->dctx, params->output_data, params->output_size, params->input_data, params->input_size);
return NULL;
}

static size_t zstd_decompress(ZSTD_DCtx* const dctx, char* output_data, size_t output_size, char* input_data, size_t input_size, bool gvl)
{
#ifdef HAVE_RUBY_THREAD_H
if (gvl) {
return ZSTD_decompressDCtx(dctx, output_data, output_size, input_data, input_size);
} else {
struct decompress_params params = { dctx, output_data, output_size, input_data, input_size };
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
return params.ret;
}
#else
return ZSTD_decompressDCtx(dctx, output_data, output_size, input_data, input_size);
#endif
}

#endif /* ZSTD_RUBY_H */
2 changes: 1 addition & 1 deletion ext/zstdruby/streaming_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
VALUE result = rb_str_new(0, 0);
while (input.pos < input.size) {
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
size_t const ret = zstd_decompress(sd->dctx, &output, &input, false);
size_t const ret = zstd_stream_decompress(sd->dctx, &output, &input, false);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
}
Expand Down
10 changes: 5 additions & 5 deletions ext/zstdruby/zstdruby.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ static VALUE rb_compress(int argc, VALUE *argv, VALUE self)
char* input_data = RSTRING_PTR(input_value);
size_t input_size = RSTRING_LEN(input_value);

size_t const max_compressed_size = ZSTD_compressBound(input_size);
size_t max_compressed_size = ZSTD_compressBound(input_size);
VALUE output = rb_str_new(NULL, max_compressed_size);
const char* output_data = RSTRING_PTR(output);
char* output_data = RSTRING_PTR(output);

size_t const ret = ZSTD_compress2(ctx,(void*)output_data, max_compressed_size, (void*)input_data, input_size);
size_t const ret = zstd_compress(ctx, output_data, max_compressed_size, input_data, input_size, false);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
}
Expand Down Expand Up @@ -96,7 +96,7 @@ static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t
rb_str_resize(output_string, output.size);
output.dst = RSTRING_PTR(output_string);

size_t ret = zstd_decompress(dctx, &output, &input, true);
size_t ret = zstd_stream_decompress(dctx, &output, &input, false);
if (ZSTD_isError(ret)) {
ZSTD_freeDCtx(dctx);
rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret));
Expand Down Expand Up @@ -134,7 +134,7 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self)
VALUE output = rb_str_new(NULL, uncompressed_size);
char* output_data = RSTRING_PTR(output);

size_t const decompress_size = ZSTD_decompressDCtx(dctx, output_data, uncompressed_size, input_data, input_size);
size_t const decompress_size = zstd_decompress(dctx, output_data, uncompressed_size, input_data, input_size, false);
if (ZSTD_isError(decompress_size)) {
rb_raise(rb_eRuntimeError, "%s: %s", "decompress error", ZSTD_getErrorName(decompress_size));
}
Expand Down