From 04455c2d267291a68b62962624bd14556bd47d08 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 13 Jan 2024 00:20:33 +0000 Subject: [PATCH] Auto-select dirty nif based on byte_size Using a dirty_nif for smaller objects has a performance penalty so: - by default don't use the dirty nif if compressing < 250KB or uncompressing < 50KB - allow application to override and specifically request either dirty or quick method. --- c_src/zstd_nif.c | 6 ++-- src/zstd.erl | 32 +++++++++++++++++++-- test/zstd_tests.erl | 69 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 4 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index 1287d8e..ddbade3 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -405,8 +405,10 @@ static int zstd_on_upgrade(ErlNifEnv *env, void **priv, void **old, ERL_NIF_TERM } static ErlNifFunc nif_funcs[] = { - { "compress" , 2, zstd_nif_compress }, - { "decompress" , 1, zstd_nif_decompress }, + { "dirty_compress" , 2, zstd_nif_compress , ERL_DIRTY_JOB_CPU_BOUND }, + { "dirty_decompress" , 1, zstd_nif_decompress , ERL_DIRTY_JOB_CPU_BOUND }, + { "quick_compress" , 2, zstd_nif_compress }, + { "quick_decompress" , 1, zstd_nif_decompress }, { "new_compression_stream" , 0, zstd_nif_new_compression_stream }, { "new_decompression_stream" , 0, zstd_nif_new_decompression_stream }, diff --git a/src/zstd.erl b/src/zstd.erl index 1080bec..e4485d5 100644 --- a/src/zstd.erl +++ b/src/zstd.erl @@ -2,6 +2,8 @@ -export([compress/1, compress/2]). -export([decompress/1]). +-export([quick_compress/2, quick_decompress/1]). +-export([dirty_compress/2, dirty_decompress/1]). -export([new_compression_stream/0, new_decompression_stream/0, compression_stream_init/1, compression_stream_init/2, decompression_stream_init/1, compression_stream_reset/2, compression_stream_reset/1, decompression_stream_reset/1, stream_flush/1, @@ -12,17 +14,43 @@ -define(APPNAME, zstd). -define(LIBNAME, zstd_nif). +% Thresholds at which it is preferable to use a dirty_nif +-define(UNCOMPRESSED_SIZE_DIRTY, 250000). +-define(COMPRESSED_SIZE_DIRTY, 50000). + -spec compress(Uncompressed :: binary()) -> Compressed :: binary(). compress(Binary) -> compress(Binary, 1). -spec compress(Uncompressed :: binary(), CompressionLevel :: 0..22) -> Compressed :: binary(). -compress(_, _) -> +compress(Uncompressed, Level) when byte_size(Uncompressed) > ?UNCOMPRESSED_SIZE_DIRTY -> + dirty_compress(Uncompressed, Level); +compress(Uncompressed, Level) -> + quick_compress(Uncompressed, Level). + +-spec dirty_compress( + Uncompressed :: binary(), CompressionLevel :: 0..22) -> Compressed :: binary(). +dirty_compress(_, _) -> + erlang:nif_error(?LINE). + +-spec quick_compress( + Uncompressed :: binary(), CompressionLevel :: 0..22) -> Compressed :: binary(). +quick_compress(_, _) -> erlang:nif_error(?LINE). -spec decompress(Compressed :: binary()) -> Uncompressed :: binary() | error. -decompress(_) -> +decompress(Compressed) when byte_size(Compressed) > ?COMPRESSED_SIZE_DIRTY -> + dirty_decompress(Compressed); +decompress(Compressed) -> + quick_decompress(Compressed). + +-spec dirty_decompress(Compressed :: binary()) -> Uncompressed :: binary() | error. +dirty_decompress(_) -> + erlang:nif_error(?LINE). + +-spec quick_decompress(Compressed :: binary()) -> Uncompressed :: binary() | error. +quick_decompress(_) -> erlang:nif_error(?LINE). -spec new_compression_stream() -> reference(). diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index a67ea1b..37113ba 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -20,3 +20,72 @@ zstd_stream_test() -> {ok, DBin1} = zstd:stream_decompress(DStream, CompressionBin), {ok, DBin2} = zstd:stream_decompress(DStream, FlushBin), ?assertEqual(Bin, <>). + +generate_randomkeys(Count, BucketRangeLow, BucketRangeHigh) -> + generate_randomkeys(Count, [], BucketRangeLow, BucketRangeHigh). + +generate_randomkeys(0, Acc, _BucketLow, _BucketHigh) -> + Acc; +generate_randomkeys(Count, Acc, BucketLow, BRange) -> + BNumber = + lists:flatten( + io_lib:format( + "~4..0B", [BucketLow + rand:uniform(BRange)])), + KNumber = + lists:flatten( + io_lib:format("~4..0B", [rand:uniform(1000)])), + K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber, null}, + RandKey = + {K, {Count + 1, {active, infinity}, erlang:phash2(K), null}}, + generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange). + + +compression_perf_test_() -> + {timeout, 60, fun compression_perf_testsizes/0}. + +compression_perf_testsizes() -> + compression_perf_tester(128), + compression_perf_tester(256), + compression_perf_tester(512), + compression_perf_tester(1024), + compression_perf_tester(2048), + compression_perf_tester(4096), + compression_perf_tester(8192). + +compression_perf_tester(N) -> + Loops = 100, + {TotalCS, TotalDS, TotalDC, TotalDD, TotalQC, TotalQD, TotalAC, TotalAD} = + lists:foldl( + fun(_A, {CST, DST, CTDT, DTDT, CTQT, DTQT, CTT, DTT}) -> + RB0 = + term_to_binary( + {base64:encode(crypto:strong_rand_bytes(N * 8)), + (generate_randomkeys(N, 1, 4))}), + {CTD0, CD0} = timer:tc(fun() -> zstd:dirty_compress(RB0, 1) end), + {DTD0, DD0} = timer:tc(fun() -> zstd:dirty_decompress(CD0) end), + {CTQ0, CQ0} = timer:tc(fun() -> zstd:quick_compress(RB0, 1) end), + {DTQ0, DQ0} = timer:tc(fun() -> zstd:quick_decompress(CQ0) end), + {CT0, C0} = timer:tc(fun() -> zstd:compress(RB0) end), + {DT0, D0} = timer:tc(fun() -> zstd:decompress(C0) end), + + ?assertMatch(RB0, DD0), + ?assertMatch(DD0, DQ0), + ?assertMatch(DQ0, D0), + + {CST + byte_size(RB0), DST + byte_size(C0), + CTDT + CTD0, DTDT + DTD0, CTQT + CTQ0, + DTQT + DTQ0, CTT + CT0, DTT + DT0} + end, + {0, 0, 0, 0, 0, 0, 0, 0}, + lists:seq(1, Loops) + ), + + io:format( + user, + "Over ~w loops tested size ~w compress_size ~w~n" + "mean compress time dirty_nif ~w quick_nif ~w auto_nif ~w~n" + "mean decompress time dirty_nif ~w quick_nif ~w auto_nif ~w~n~n", + [Loops, TotalCS div Loops, TotalDS div Loops, + TotalDC div Loops, TotalQC div Loops, TotalAC div Loops, + TotalDD div Loops, TotalQD div Loops, TotalAD div Loops] + ). \ No newline at end of file