Add microbenchmark binary and results PiperOrigin-RevId: 412881864
diff --git a/.bazelci/presubmit.yml b/.bazelci/presubmit.yml index 02fccaf..9604b09 100644 --- a/.bazelci/presubmit.yml +++ b/.bazelci/presubmit.yml
@@ -13,10 +13,20 @@ # limitations under the License. tasks: - ubuntu2004: + ubuntu2004-c++14: + platform: "ubuntu2004" build_targets: - "//..." + - "-//experiments/..." + build_flags: + - "--cxxopt=-std=c++14" test_flags: - "--test_tag_filters=-benchmark" test_targets: - "//..." + ubuntu2004-c++17: + platform: "ubuntu2004" + build_targets: + - "//..." + build_flags: + - "--cxxopt=-std=c++17"
diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel index b42ac3f..e7c5173 100644 --- a/WORKSPACE.bazel +++ b/WORKSPACE.bazel
@@ -125,3 +125,23 @@ "https://github.com/google/iree/archive/7e6012468cbaafaaf30302748a2943771b40e2c3.zip", ], ) + +# riegeli for file IO +http_archive( + name = "com_github_google_riegeli", + sha256 = "3919f42bae25c890b89bf96525939debdd3679abfc6a81725a11397694cfbe07", + strip_prefix = "riegeli-accecad7fbbbe1af525ba5e35c6260f63eebf8c5", + urls = [ + "https://github.com/google/riegeli/archive/accecad7fbbbe1af525ba5e35c6260f63eebf8c5.zip", + ], +) + +# cppitertools for logging +http_archive( + name = "com_github_ryanhaining_cppitertools", + sha256 = "83aedc4f593212d8112eac0b32b5f191219604f3db922cc218fd733ea448118c", + strip_prefix = "cppitertools-b2b98e60438f1ed6b04b77cdb6cc5d5516af301b", + urls = [ + "https://github.com/ryanhaining/cppitertools/archive/b2b98e60438f1ed6b04b77cdb6cc5d5516af301b.zip", + ] +)
diff --git a/experiments/BUILD b/experiments/BUILD new file mode 100644 index 0000000..3fcf86b --- /dev/null +++ b/experiments/BUILD
@@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +package( + default_visibility = ["//visibility:private"], +) + +licenses(["notice"]) + +cc_binary( + name = "synthetic_data_benchmarks", + srcs = ["synthetic_data_benchmarks.cc"], + deps = [ + "//dpf:distributed_point_function", + "//dpf:distributed_point_function_cc_proto", + "@com_github_google_benchmark//:benchmark", + "@com_github_google_glog//:glog", + "@com_github_google_riegeli//riegeli/bytes:fd_reader", + "@com_github_google_riegeli//riegeli/lines:line_reading", + "@com_github_ryanhaining_cppitertools//:cppitertools", + "@com_google_absl//absl/container:btree", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/flags:usage", + "@com_google_absl//absl/random", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/time", + ], +)
diff --git a/experiments/README.md b/experiments/README.md new file mode 100644 index 0000000..386dbf8 --- /dev/null +++ b/experiments/README.md
@@ -0,0 +1,135 @@ +# DPF Microbenchmarks + +This folder contains a binary for running DPF microbenchmarks for a two-party +sparse histogram aggregation. See below for usage and options. In the following, +we report the results on a fixed set of synthetic input files. + +# Parameters + +We fix the number of non-zero points in the entire histogram to 2<sup>20</sup>, +and choose three sets of non-zeros using the following three distributions: + +1. Power law with 90% of nonzeros in 10% of the domain +2. Power law with 90% of nonzeros in 50% of the domain +3. Uniform + +We then evaluate in two evaluation modes: + +1. Hierarchical evaluation. Here, we assume the non-zeros are not known in + advance, but instead have to be discovered during the evaluation. To + simulate this using microbenchmarks, we identify a prefix hierarchy of the + histogram domain, such that the full evaluation of each hierarchy level + contains no more evaluation points than 4 times the number of non-zeros + (i.e., 2<sup>22</sup>). We then perform a hierarchical DPF evaluation using + the resulting hierarchy. +2. Direct evaluation. Here, we assume that the set of non-zero indices is known + in advance. Thus, we can do a direct DPF evaluation at the given set of + non-zeros. + +All entries in the tables below record the time needed to expand a single DPF +key at one of the two servers in the given setting. The evaluation is +single-threaded and runs on an Intel(R) Xeon(R) CPU @ 2.30GHz. + +The size of the values being aggregated is fixed to 32 bits. + +## Domain size: 2<sup>32</sup> + +Prefix bit lengths to evaluate for hierarchical evaluation: 21,23,25,27,29,31,32 + +<table> + <tr> + <td>Distribution + </td> + <td>1 + </td> + <td>2 + </td> + <td>3 + </td> + </tr> + <tr> + <td>Hierarchical Evaluation + </td> + <td>1.36s + </td> + <td>2.22s + </td> + <td>3.31s + </td> + </tr> + <tr> + <td>Direct Evaluation + </td> + <td>0.67s + </td> + <td>0.68s + </td> + <td>0.70s + </td> + </tr> +</table> + +## Domain size: 2<sup>128</sup> + +Levels to evaluate for hierarchical evaluation: +21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59,61,63,65,67,69,71,73,75,77,79,81,83,85,87,89,91,93,95,97,99,101,103,105,107,109,111,113,115,117,119,121,123,125,127,128 + +<table> + <tr> + <td>Distribution + </td> + <td>1 + </td> + <td>2 + </td> + <td>3 + </td> + </tr> + <tr> + <td>Hierarchical Evaluation + </td> + <td>32.68s + </td> + <td>35.07s + </td> + <td>35.95s + </td> + </tr> + <tr> + <td>Direct Evaluation + </td> + <td>3.08s + </td> + <td>3.13s + </td> + <td>3.13s + </td> + </tr> +</table> + +## Reproducing the benchmarks + +Usage: + +```bash +bazel run --cxxopt=-std=c++17 -c opt --dynamic_mode=off experiments:synthetic_data_benchmarks -- [options] +``` + +Options: + +```none +--input (CSV file containing non-zero buckets in the first column.); + default: ""; +--levels_to_evaluate (List of integers specifying the log domain sizes at + which to insert hierarchy levels.); default: ; +--log_domain_size (Logarithm of the domain size. All non-zeros in `input` + must be in [0, 2^log_domain_size).); default: 20; +--max_expansion_factor (Limits the maximum number of elements the expansion + at any hierarchy level can have to a multiple of the number of unique + buckets in the input file. Must be at least 2.); default: 2; +--num_iterations (Number of iterations to benchmark.); default: 20; +--only_nonzeros (Only evaluates at the nonzero indices of the input file + passed via --input, instead of performing hierarchical evaluation. If + true, all flags related to hierarchy levels will be ignored); + default: false; +```
diff --git a/experiments/synthetic_data_benchmarks.cc b/experiments/synthetic_data_benchmarks.cc new file mode 100644 index 0000000..c1a252f --- /dev/null +++ b/experiments/synthetic_data_benchmarks.cc
@@ -0,0 +1,282 @@ +#include <fcntl.h> +#include <glog/logging.h> + +#include "absl/container/btree_set.h" +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/flags/usage.h" +#include "absl/random/random.h" +#include "absl/strings/numbers.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/str_join.h" +#include "absl/strings/str_split.h" +#include "absl/time/clock.h" +#include "benchmark/benchmark.h" // third_party/benchmark +#include "dpf/distributed_point_function.h" +#include "dpf/distributed_point_function.pb.h" +#include "imap.hpp" +#include "riegeli/bytes/fd_reader.h" +#include "riegeli/lines/line_reading.h" + +ABSL_FLAG(std::string, input, "", + "CSV file containing non-zero buckets in the first column."); +ABSL_FLAG(int, log_domain_size, 20, + "Logarithm of the domain size. All non-zeros in `input` must be in " + "[0, 2^log_domain_size)."); +ABSL_FLAG(int, num_iterations, 20, "Number of iterations to benchmark."); +ABSL_FLAG(int, max_expansion_factor, 2, + "Limits the maximum number of elements the expansion at any " + "hierarchy level can have to a multiple of the number of unique " + "buckets in the input file. Must be at least 2."); +ABSL_FLAG(bool, only_nonzeros, false, + "Only evaluates at the nonzero indices of the input file passed via " + "--input, instead of performing hierarchical evaluation. If true, " + "all flags related to hierarchy levels will be ignored"); +ABSL_FLAG(std::vector<std::string>, levels_to_evaluate, {}, + "List of integers specifying the log domain sizes at which to insert " + "hierarchy levels."); + +#ifndef QCHECK +#define QCHECK(x) CHECK(x) +#endif + +namespace { + +const char* Usage() { + return "synthetic_data_benchmarks [OPTIONS]\n\n" + "Runs a single DPF key evaluation on the specified domain. If an " + "input file is specified with --input, it is read as a CSV file " + "containing the bucket IDs to expand in the first column. Otherwise, " + "the full domain will be expanded."; +} + +void ValidateFlags() { + int log_domain_size = absl::GetFlag(FLAGS_log_domain_size); + QCHECK(log_domain_size >= 0) << "--log_domain_size must be non-negative"; + int num_iterations = absl::GetFlag(FLAGS_num_iterations); + QCHECK(num_iterations > 0) << "--num_iterations must be positive"; + if (absl::GetFlag(FLAGS_only_nonzeros)) { + QCHECK(!absl::GetFlag(FLAGS_input).empty()) + << "--input is required when --only_nonzeros is true"; + } + int max_expansion_factor = absl::GetFlag(FLAGS_max_expansion_factor); + QCHECK(max_expansion_factor >= 2) + << "--max_expansion_factor must be at least 2"; + std::vector<std::string> levels_to_evaluate = + absl::GetFlag(FLAGS_levels_to_evaluate); + for (absl::string_view level_str : levels_to_evaluate) { + int level; + QCHECK(absl::SimpleAtoi(level_str, &level)); + QCHECK(level > 0 && level <= log_domain_size) + << "--levels_to_evaluate must be in [1, log_domain_size]"; + } +} + +// Returns the prefixes of the given buckets using the bit lengths specified +// in `parameters`. +std::vector<std::vector<absl::uint128>> ComputePrefixes( + const absl::btree_set<absl::uint128>& last_level_prefixes, + int log_domain_size) { + std::vector<std::vector<absl::uint128>> result(log_domain_size + 1); + result.back() = std::vector<absl::uint128>(last_level_prefixes.begin(), + last_level_prefixes.end()); + + // Iterate backwards through previous levels, computing prefixes by + // appropriately shifting the ones from higher levels. + for (int i = static_cast<int>(result.size()) - 1; i > 1; --i) { + absl::btree_set<absl::uint128> current_level_prefixes; + for (const auto& x : result[i]) { + current_level_prefixes.insert(x >> 1); + } + result[i - 1] = std::vector<absl::uint128>(current_level_prefixes.begin(), + current_level_prefixes.end()); + } + return result; +} + +// Parses `input_file` as a CSV file and returns the unique integers in the +// first column as a set. +absl::btree_set<absl::uint128> ReadUniqueValuesFromFile( + absl::string_view input_file) { + absl::btree_set<absl::uint128> nonzeros; + LOG(INFO) << "Reading input file..."; + int line_number = 0; + riegeli::FdReader reader(input_file, O_RDONLY); + absl::string_view line; + while (riegeli::ReadLine(reader, line)) { + std::vector<absl::string_view> fields = + absl::StrSplit(line, ',', absl::SkipWhitespace()); + QCHECK(!fields.empty()) << "Line " << line_number << " is empty"; + absl::uint128 nonzero; + QCHECK(absl::SimpleAtoi(fields[0], &nonzero)) + << "Invalid bucket ID on line " << line_number; + nonzeros.insert(nonzero); + ++line_number; + } + QCHECK(reader.healthy()); + LOG(INFO) << "Read " << nonzeros.size() << " nonzeros from " << line_number + << " lines"; + return nonzeros; +} + +std::vector<int> ComputeLevelsToEvaluate( + absl::Span<const std::vector<absl::uint128>> prefixes, + int log_domain_size) { + int num_nonzeros = prefixes.back().size(); + if (num_nonzeros > 0) { + std::vector<int> levels_to_evaluate; + // The first level is chosen such that it has size at most expansion_factor + // * num_nonzeros. + int max_expansion_factor = absl::GetFlag(FLAGS_max_expansion_factor); + int first_level = + std::min(log_domain_size, + static_cast<int>(std::log2(num_nonzeros) + + std::log2(max_expansion_factor))) - + 1; + levels_to_evaluate.push_back(first_level); + while (levels_to_evaluate.back() < log_domain_size) { + int nonzeros_at_last_level = + prefixes[levels_to_evaluate.back() + 1].size(); + // We want to evaluate as many levels as possible so that we get no more + // than expansion_factor * num_nonzeros. So 2^bits_to_next_level * + // nonzeros_at_last_level < expansion_factor * num_nonzeros. + levels_to_evaluate.push_back(std::min( + log_domain_size, + static_cast<int>(levels_to_evaluate.back() + std::log2(num_nonzeros) + + std::log2(max_expansion_factor) - + std::log2(nonzeros_at_last_level)))); + } + return levels_to_evaluate; + } + return {log_domain_size}; +} + +template <typename T> +void RunHierarchicalEvaluation( + const distributed_point_functions::DistributedPointFunction& dpf, + const distributed_point_functions::DpfKey& key, + absl::Span<const std::vector<absl::uint128>> prefixes, int num_iterations) { + const distributed_point_functions::EvaluationContext ctx = + dpf.CreateEvaluationContext(key).value(); + CHECK_EQ(prefixes.size(), ctx.parameters_size()); + for (int i = 0; i < num_iterations; ++i) { + distributed_point_functions::EvaluationContext ctx_copy = ctx; + for (int level = 0; level < static_cast<int>(prefixes.size()); ++level) { + std::vector<T> result = + dpf.EvaluateUntil<T>(level, prefixes[level], ctx_copy).value(); + if (i == 0) { + LOG(INFO) << "Number of outputs at " << level + << "-th level: " << result.size(); + LOG(INFO) << "log_domain_size=" + << ctx.parameters(level).log_domain_size(); + } + benchmark::DoNotOptimize(result); + } + } +} + +template <typename T> +void RunBatchedSinglePointEvaluation( + const distributed_point_functions::DistributedPointFunction& dpf, + const distributed_point_functions::DpfKey& key, + absl::Span<const absl::uint128> nonzeros, int num_iterations) { + // Check that we have a single hierarchy level. + CHECK_EQ(dpf.parameters().size(), 1); + for (int i = 0; i < num_iterations; ++i) { + std::vector<T> result = dpf.EvaluateAt<T>(key, 0, nonzeros).value(); + CHECK_EQ(result.size(), nonzeros.size()); + benchmark::DoNotOptimize(result); + } +} + +} // namespace + +int main(int argc, char* argv[]) { + google::InitGoogleLogging(argv[0]); + absl::SetProgramUsageMessage(Usage()); + absl::ParseCommandLine(argc, argv); + FLAGS_logtostderr = 1; + ValidateFlags(); + + // Read nonzeros from input file, compute prefixes, + std::string input_file = absl::GetFlag(FLAGS_input); + const int log_domain_size = absl::GetFlag(FLAGS_log_domain_size); + std::vector<std::vector<absl::uint128>> prefixes(1); + if (!input_file.empty()) { + absl::btree_set<absl::uint128> nonzeros = + ReadUniqueValuesFromFile(input_file); + prefixes = ComputePrefixes(nonzeros, log_domain_size); + } + int num_nonzeros = prefixes.back().size(); + LOG(INFO) << "Number of nonzeros: " << num_nonzeros; + + // Compute levels to evaluate and choose the correct prefixes. + std::vector<std::string> levels_to_evaluate_str = + absl::GetFlag(FLAGS_levels_to_evaluate); + std::vector<int> levels_to_evaluate(levels_to_evaluate_str.size()); + bool only_nonzeros = absl::GetFlag(FLAGS_only_nonzeros); + for (int i = 0; i < static_cast<int>(levels_to_evaluate.size()); ++i) { + CHECK(absl::SimpleAtoi(levels_to_evaluate_str[i], &levels_to_evaluate[i])); + } + if (levels_to_evaluate.empty()) { + if (!only_nonzeros) { + levels_to_evaluate = ComputeLevelsToEvaluate(prefixes, log_domain_size); + } else { + levels_to_evaluate = {log_domain_size}; + } + } + LOG(INFO) << "Levels to evaluate: " << absl::StrJoin(levels_to_evaluate, ","); + std::vector<std::vector<absl::uint128>> prefixes_to_evaluate(1); + prefixes_to_evaluate.reserve(levels_to_evaluate.size()); + for (int i = 1; i < levels_to_evaluate.size(); ++i) { + prefixes_to_evaluate.push_back(prefixes[levels_to_evaluate[i - 1]]); + } + LOG(INFO) << "Numbers of prefixes per level: " + << absl::StrJoin(iter::imap([](auto& c) { return c.size(); }, + prefixes_to_evaluate), + ","); + LOG(INFO) << "Numbers of prefixes per bit: " + << absl::StrJoin( + iter::imap([](auto& c) { return c.size(); }, prefixes), ","); + + // Set up parameters and create DPF instance. + std::vector<distributed_point_functions::DpfParameters> parameters( + levels_to_evaluate.size()); + const int element_bitsize = 32; // TODO(schoppmann): Make this a flag? + for (int i = 0; i < static_cast<int>(parameters.size()); ++i) { + parameters[i].mutable_value_type()->mutable_integer()->set_bitsize( + element_bitsize); + parameters[i].set_log_domain_size(levels_to_evaluate[i]); + } + std::unique_ptr<distributed_point_functions::DistributedPointFunction> dpf = + distributed_point_functions::DistributedPointFunction::CreateIncremental( + parameters) + .value(); + + // Generate DPF key. + absl::BitGen rng; + absl::uint128 alpha = absl::MakeUint128(absl::Uniform<uint64_t>(rng), + absl::Uniform<uint64_t>(rng)); + if (log_domain_size < 128) { + alpha %= absl::uint128{1} << log_domain_size; + } + std::vector<absl::uint128> beta(parameters.size(), 1); + distributed_point_functions::DpfKey key; + std::tie(key, std::ignore) = + dpf->GenerateKeysIncremental(alpha, beta).value(); + + // Run the experiment and measure time. + int num_iterations = absl::GetFlag(FLAGS_num_iterations); + using T = uint32_t; + absl::Time start = absl::Now(); + if (only_nonzeros) { + RunBatchedSinglePointEvaluation<T>(*dpf, key, prefixes.back(), + num_iterations); + } else { + RunHierarchicalEvaluation<T>(*dpf, key, prefixes_to_evaluate, + num_iterations); + } + absl::Duration wallclock = absl::Now() - start; + LOG(INFO) << "Wallclock time per iteration: " + << wallclock / absl::GetFlag(FLAGS_num_iterations); +}