Add microbenchmark binary and results

PiperOrigin-RevId: 412881864
diff --git a/.bazelci/presubmit.yml b/.bazelci/presubmit.yml
index 02fccaf..9604b09 100644
--- a/.bazelci/presubmit.yml
+++ b/.bazelci/presubmit.yml
@@ -13,10 +13,20 @@
 # limitations under the License.
 
 tasks:
-  ubuntu2004:
+  ubuntu2004-c++14:
+    platform: "ubuntu2004"
     build_targets:
       - "//..."
+      - "-//experiments/..."
+    build_flags:
+      - "--cxxopt=-std=c++14"
     test_flags:
       - "--test_tag_filters=-benchmark"
     test_targets:
       - "//..."
+  ubuntu2004-c++17:
+    platform: "ubuntu2004"
+    build_targets:
+      - "//..."
+    build_flags:
+      - "--cxxopt=-std=c++17"
diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel
index b42ac3f..e7c5173 100644
--- a/WORKSPACE.bazel
+++ b/WORKSPACE.bazel
@@ -125,3 +125,23 @@
         "https://github.com/google/iree/archive/7e6012468cbaafaaf30302748a2943771b40e2c3.zip",
     ],
 )
+
+# riegeli for file IO
+http_archive(
+  name = "com_github_google_riegeli",
+  sha256 = "3919f42bae25c890b89bf96525939debdd3679abfc6a81725a11397694cfbe07",
+  strip_prefix = "riegeli-accecad7fbbbe1af525ba5e35c6260f63eebf8c5",
+  urls = [
+    "https://github.com/google/riegeli/archive/accecad7fbbbe1af525ba5e35c6260f63eebf8c5.zip",
+  ],
+)
+
+# cppitertools for logging
+http_archive(
+  name = "com_github_ryanhaining_cppitertools",
+  sha256 = "83aedc4f593212d8112eac0b32b5f191219604f3db922cc218fd733ea448118c",
+  strip_prefix = "cppitertools-b2b98e60438f1ed6b04b77cdb6cc5d5516af301b",
+  urls = [
+    "https://github.com/ryanhaining/cppitertools/archive/b2b98e60438f1ed6b04b77cdb6cc5d5516af301b.zip",
+  ]
+)
diff --git a/experiments/BUILD b/experiments/BUILD
new file mode 100644
index 0000000..3fcf86b
--- /dev/null
+++ b/experiments/BUILD
@@ -0,0 +1,28 @@
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+
+package(
+    default_visibility = ["//visibility:private"],
+)
+
+licenses(["notice"])
+
+cc_binary(
+    name = "synthetic_data_benchmarks",
+    srcs = ["synthetic_data_benchmarks.cc"],
+    deps = [
+        "//dpf:distributed_point_function",
+        "//dpf:distributed_point_function_cc_proto",
+        "@com_github_google_benchmark//:benchmark",
+        "@com_github_google_glog//:glog",
+        "@com_github_google_riegeli//riegeli/bytes:fd_reader",
+        "@com_github_google_riegeli//riegeli/lines:line_reading",
+        "@com_github_ryanhaining_cppitertools//:cppitertools",
+        "@com_google_absl//absl/container:btree",
+        "@com_google_absl//absl/flags:flag",
+        "@com_google_absl//absl/flags:parse",
+        "@com_google_absl//absl/flags:usage",
+        "@com_google_absl//absl/random",
+        "@com_google_absl//absl/strings",
+        "@com_google_absl//absl/time",
+    ],
+)
diff --git a/experiments/README.md b/experiments/README.md
new file mode 100644
index 0000000..386dbf8
--- /dev/null
+++ b/experiments/README.md
@@ -0,0 +1,135 @@
+# DPF Microbenchmarks
+
+This folder contains a binary for running DPF microbenchmarks for a two-party
+sparse histogram aggregation. See below for usage and options. In the following,
+we report the results on a fixed set of synthetic input files.
+
+# Parameters
+
+We fix the number of non-zero points in the entire histogram to 2<sup>20</sup>,
+and choose three sets of non-zeros using the following three distributions:
+
+1.  Power law with 90% of nonzeros in 10% of the domain
+2.  Power law with 90% of nonzeros in 50% of the domain
+3.  Uniform
+
+We then evaluate in two evaluation modes:
+
+1.  Hierarchical evaluation. Here, we assume the non-zeros are not known in
+    advance, but instead have to be discovered during the evaluation. To
+    simulate this using microbenchmarks, we identify a prefix hierarchy of the
+    histogram domain, such that the full evaluation of each hierarchy level
+    contains no more evaluation points than 4 times the number of non-zeros
+    (i.e., 2<sup>22</sup>). We then perform a hierarchical DPF evaluation using
+    the resulting hierarchy.
+2.  Direct evaluation. Here, we assume that the set of non-zero indices is known
+    in advance. Thus, we can do a direct DPF evaluation at the given set of
+    non-zeros.
+
+All entries in the tables below record the time needed to expand a single DPF
+key at one of the two servers in the given setting. The evaluation is
+single-threaded and runs on an Intel(R) Xeon(R) CPU @ 2.30GHz.
+
+The size of the values being aggregated is fixed to 32 bits.
+
+## Domain size: 2<sup>32</sup>
+
+Prefix bit lengths to evaluate for hierarchical evaluation: 21,23,25,27,29,31,32
+
+<table>
+  <tr>
+   <td>Distribution
+   </td>
+   <td>1
+   </td>
+   <td>2
+   </td>
+   <td>3
+   </td>
+  </tr>
+  <tr>
+   <td>Hierarchical Evaluation
+   </td>
+   <td>1.36s
+   </td>
+   <td>2.22s
+   </td>
+   <td>3.31s
+   </td>
+  </tr>
+  <tr>
+   <td>Direct Evaluation
+   </td>
+   <td>0.67s
+   </td>
+   <td>0.68s
+   </td>
+   <td>0.70s
+   </td>
+  </tr>
+</table>
+
+## Domain size: 2<sup>128</sup>
+
+Levels to evaluate for hierarchical evaluation:
+21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59,61,63,65,67,69,71,73,75,77,79,81,83,85,87,89,91,93,95,97,99,101,103,105,107,109,111,113,115,117,119,121,123,125,127,128
+
+<table>
+  <tr>
+   <td>Distribution
+   </td>
+   <td>1
+   </td>
+   <td>2
+   </td>
+   <td>3
+   </td>
+  </tr>
+  <tr>
+   <td>Hierarchical Evaluation
+   </td>
+   <td>32.68s
+   </td>
+   <td>35.07s
+   </td>
+   <td>35.95s
+   </td>
+  </tr>
+  <tr>
+   <td>Direct Evaluation
+   </td>
+   <td>3.08s
+   </td>
+   <td>3.13s
+   </td>
+   <td>3.13s
+   </td>
+  </tr>
+</table>
+
+## Reproducing the benchmarks
+
+Usage:
+
+```bash
+bazel run --cxxopt=-std=c++17 -c opt --dynamic_mode=off experiments:synthetic_data_benchmarks -- [options]
+```
+
+Options:
+
+```none
+--input (CSV file containing non-zero buckets in the first column.);
+  default: "";
+--levels_to_evaluate (List of integers specifying the log domain sizes at
+  which to insert hierarchy levels.); default: ;
+--log_domain_size (Logarithm of the domain size. All non-zeros in `input`
+  must be in [0, 2^log_domain_size).); default: 20;
+--max_expansion_factor (Limits the maximum number of elements the expansion
+  at any hierarchy level can have to a multiple of the number of unique
+  buckets in the input file. Must be at least 2.); default: 2;
+--num_iterations (Number of iterations to benchmark.); default: 20;
+--only_nonzeros (Only evaluates at the nonzero indices of the input file
+  passed via --input, instead of performing hierarchical evaluation. If
+  true, all flags related to hierarchy levels will be ignored);
+  default: false;
+```
diff --git a/experiments/synthetic_data_benchmarks.cc b/experiments/synthetic_data_benchmarks.cc
new file mode 100644
index 0000000..c1a252f
--- /dev/null
+++ b/experiments/synthetic_data_benchmarks.cc
@@ -0,0 +1,282 @@
+#include <fcntl.h>
+#include <glog/logging.h>
+
+#include "absl/container/btree_set.h"
+#include "absl/flags/flag.h"
+#include "absl/flags/parse.h"
+#include "absl/flags/usage.h"
+#include "absl/random/random.h"
+#include "absl/strings/numbers.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/str_split.h"
+#include "absl/time/clock.h"
+#include "benchmark/benchmark.h"  // third_party/benchmark
+#include "dpf/distributed_point_function.h"
+#include "dpf/distributed_point_function.pb.h"
+#include "imap.hpp"
+#include "riegeli/bytes/fd_reader.h"
+#include "riegeli/lines/line_reading.h"
+
+ABSL_FLAG(std::string, input, "",
+          "CSV file containing non-zero buckets in the first column.");
+ABSL_FLAG(int, log_domain_size, 20,
+          "Logarithm of the domain size. All non-zeros in `input` must be in "
+          "[0, 2^log_domain_size).");
+ABSL_FLAG(int, num_iterations, 20, "Number of iterations to benchmark.");
+ABSL_FLAG(int, max_expansion_factor, 2,
+          "Limits the maximum number of elements the expansion at any "
+          "hierarchy level can have to a multiple of the number of unique "
+          "buckets in the input file. Must be at least 2.");
+ABSL_FLAG(bool, only_nonzeros, false,
+          "Only evaluates at the nonzero indices of the input file passed via "
+          "--input, instead of performing hierarchical evaluation. If true, "
+          "all flags related to hierarchy levels will be ignored");
+ABSL_FLAG(std::vector<std::string>, levels_to_evaluate, {},
+          "List of integers specifying the log domain sizes at which to insert "
+          "hierarchy levels.");
+
+#ifndef QCHECK
+#define QCHECK(x) CHECK(x)
+#endif
+
+namespace {
+
+const char* Usage() {
+  return "synthetic_data_benchmarks [OPTIONS]\n\n"
+         "Runs a single DPF key evaluation on the specified domain. If an "
+         "input file is specified with --input, it is read as a CSV file "
+         "containing the bucket IDs to expand in the first column. Otherwise, "
+         "the full domain will be expanded.";
+}
+
+void ValidateFlags() {
+  int log_domain_size = absl::GetFlag(FLAGS_log_domain_size);
+  QCHECK(log_domain_size >= 0) << "--log_domain_size must be non-negative";
+  int num_iterations = absl::GetFlag(FLAGS_num_iterations);
+  QCHECK(num_iterations > 0) << "--num_iterations must be positive";
+  if (absl::GetFlag(FLAGS_only_nonzeros)) {
+    QCHECK(!absl::GetFlag(FLAGS_input).empty())
+        << "--input is required when --only_nonzeros is true";
+  }
+  int max_expansion_factor = absl::GetFlag(FLAGS_max_expansion_factor);
+  QCHECK(max_expansion_factor >= 2)
+      << "--max_expansion_factor must be at least 2";
+  std::vector<std::string> levels_to_evaluate =
+      absl::GetFlag(FLAGS_levels_to_evaluate);
+  for (absl::string_view level_str : levels_to_evaluate) {
+    int level;
+    QCHECK(absl::SimpleAtoi(level_str, &level));
+    QCHECK(level > 0 && level <= log_domain_size)
+        << "--levels_to_evaluate must be in [1, log_domain_size]";
+  }
+}
+
+// Returns the prefixes of the given buckets using the bit lengths specified
+// in `parameters`.
+std::vector<std::vector<absl::uint128>> ComputePrefixes(
+    const absl::btree_set<absl::uint128>& last_level_prefixes,
+    int log_domain_size) {
+  std::vector<std::vector<absl::uint128>> result(log_domain_size + 1);
+  result.back() = std::vector<absl::uint128>(last_level_prefixes.begin(),
+                                             last_level_prefixes.end());
+
+  // Iterate backwards through previous levels, computing prefixes by
+  // appropriately shifting the ones from higher levels.
+  for (int i = static_cast<int>(result.size()) - 1; i > 1; --i) {
+    absl::btree_set<absl::uint128> current_level_prefixes;
+    for (const auto& x : result[i]) {
+      current_level_prefixes.insert(x >> 1);
+    }
+    result[i - 1] = std::vector<absl::uint128>(current_level_prefixes.begin(),
+                                               current_level_prefixes.end());
+  }
+  return result;
+}
+
+// Parses `input_file` as a CSV file and returns the unique integers in the
+// first column as a set.
+absl::btree_set<absl::uint128> ReadUniqueValuesFromFile(
+    absl::string_view input_file) {
+  absl::btree_set<absl::uint128> nonzeros;
+  LOG(INFO) << "Reading input file...";
+  int line_number = 0;
+  riegeli::FdReader reader(input_file, O_RDONLY);
+  absl::string_view line;
+  while (riegeli::ReadLine(reader, line)) {
+    std::vector<absl::string_view> fields =
+        absl::StrSplit(line, ',', absl::SkipWhitespace());
+    QCHECK(!fields.empty()) << "Line " << line_number << " is empty";
+    absl::uint128 nonzero;
+    QCHECK(absl::SimpleAtoi(fields[0], &nonzero))
+        << "Invalid bucket ID on line " << line_number;
+    nonzeros.insert(nonzero);
+    ++line_number;
+  }
+  QCHECK(reader.healthy());
+  LOG(INFO) << "Read " << nonzeros.size() << " nonzeros from " << line_number
+            << " lines";
+  return nonzeros;
+}
+
+std::vector<int> ComputeLevelsToEvaluate(
+    absl::Span<const std::vector<absl::uint128>> prefixes,
+    int log_domain_size) {
+  int num_nonzeros = prefixes.back().size();
+  if (num_nonzeros > 0) {
+    std::vector<int> levels_to_evaluate;
+    // The first level is chosen such that it has size at most expansion_factor
+    // * num_nonzeros.
+    int max_expansion_factor = absl::GetFlag(FLAGS_max_expansion_factor);
+    int first_level =
+        std::min(log_domain_size,
+                 static_cast<int>(std::log2(num_nonzeros) +
+                                  std::log2(max_expansion_factor))) -
+        1;
+    levels_to_evaluate.push_back(first_level);
+    while (levels_to_evaluate.back() < log_domain_size) {
+      int nonzeros_at_last_level =
+          prefixes[levels_to_evaluate.back() + 1].size();
+      // We want to evaluate as many levels as possible so that we get no more
+      // than expansion_factor * num_nonzeros. So 2^bits_to_next_level *
+      // nonzeros_at_last_level < expansion_factor * num_nonzeros.
+      levels_to_evaluate.push_back(std::min(
+          log_domain_size,
+          static_cast<int>(levels_to_evaluate.back() + std::log2(num_nonzeros) +
+                           std::log2(max_expansion_factor) -
+                           std::log2(nonzeros_at_last_level))));
+    }
+    return levels_to_evaluate;
+  }
+  return {log_domain_size};
+}
+
+template <typename T>
+void RunHierarchicalEvaluation(
+    const distributed_point_functions::DistributedPointFunction& dpf,
+    const distributed_point_functions::DpfKey& key,
+    absl::Span<const std::vector<absl::uint128>> prefixes, int num_iterations) {
+  const distributed_point_functions::EvaluationContext ctx =
+      dpf.CreateEvaluationContext(key).value();
+  CHECK_EQ(prefixes.size(), ctx.parameters_size());
+  for (int i = 0; i < num_iterations; ++i) {
+    distributed_point_functions::EvaluationContext ctx_copy = ctx;
+    for (int level = 0; level < static_cast<int>(prefixes.size()); ++level) {
+      std::vector<T> result =
+          dpf.EvaluateUntil<T>(level, prefixes[level], ctx_copy).value();
+      if (i == 0) {
+        LOG(INFO) << "Number of outputs at " << level
+                  << "-th level: " << result.size();
+        LOG(INFO) << "log_domain_size="
+                  << ctx.parameters(level).log_domain_size();
+      }
+      benchmark::DoNotOptimize(result);
+    }
+  }
+}
+
+template <typename T>
+void RunBatchedSinglePointEvaluation(
+    const distributed_point_functions::DistributedPointFunction& dpf,
+    const distributed_point_functions::DpfKey& key,
+    absl::Span<const absl::uint128> nonzeros, int num_iterations) {
+  // Check that we have a single hierarchy level.
+  CHECK_EQ(dpf.parameters().size(), 1);
+  for (int i = 0; i < num_iterations; ++i) {
+    std::vector<T> result = dpf.EvaluateAt<T>(key, 0, nonzeros).value();
+    CHECK_EQ(result.size(), nonzeros.size());
+    benchmark::DoNotOptimize(result);
+  }
+}
+
+}  // namespace
+
+int main(int argc, char* argv[]) {
+  google::InitGoogleLogging(argv[0]);
+  absl::SetProgramUsageMessage(Usage());
+  absl::ParseCommandLine(argc, argv);
+  FLAGS_logtostderr = 1;
+  ValidateFlags();
+
+  // Read nonzeros from input file, compute prefixes,
+  std::string input_file = absl::GetFlag(FLAGS_input);
+  const int log_domain_size = absl::GetFlag(FLAGS_log_domain_size);
+  std::vector<std::vector<absl::uint128>> prefixes(1);
+  if (!input_file.empty()) {
+    absl::btree_set<absl::uint128> nonzeros =
+        ReadUniqueValuesFromFile(input_file);
+    prefixes = ComputePrefixes(nonzeros, log_domain_size);
+  }
+  int num_nonzeros = prefixes.back().size();
+  LOG(INFO) << "Number of nonzeros: " << num_nonzeros;
+
+  // Compute levels to evaluate and choose the correct prefixes.
+  std::vector<std::string> levels_to_evaluate_str =
+      absl::GetFlag(FLAGS_levels_to_evaluate);
+  std::vector<int> levels_to_evaluate(levels_to_evaluate_str.size());
+  bool only_nonzeros = absl::GetFlag(FLAGS_only_nonzeros);
+  for (int i = 0; i < static_cast<int>(levels_to_evaluate.size()); ++i) {
+    CHECK(absl::SimpleAtoi(levels_to_evaluate_str[i], &levels_to_evaluate[i]));
+  }
+  if (levels_to_evaluate.empty()) {
+    if (!only_nonzeros) {
+      levels_to_evaluate = ComputeLevelsToEvaluate(prefixes, log_domain_size);
+    } else {
+      levels_to_evaluate = {log_domain_size};
+    }
+  }
+  LOG(INFO) << "Levels to evaluate: " << absl::StrJoin(levels_to_evaluate, ",");
+  std::vector<std::vector<absl::uint128>> prefixes_to_evaluate(1);
+  prefixes_to_evaluate.reserve(levels_to_evaluate.size());
+  for (int i = 1; i < levels_to_evaluate.size(); ++i) {
+    prefixes_to_evaluate.push_back(prefixes[levels_to_evaluate[i - 1]]);
+  }
+  LOG(INFO) << "Numbers of prefixes per level: "
+            << absl::StrJoin(iter::imap([](auto& c) { return c.size(); },
+                                        prefixes_to_evaluate),
+                             ",");
+  LOG(INFO) << "Numbers of prefixes per bit: "
+            << absl::StrJoin(
+                   iter::imap([](auto& c) { return c.size(); }, prefixes), ",");
+
+  // Set up parameters and create DPF instance.
+  std::vector<distributed_point_functions::DpfParameters> parameters(
+      levels_to_evaluate.size());
+  const int element_bitsize = 32;  // TODO(schoppmann): Make this a flag?
+  for (int i = 0; i < static_cast<int>(parameters.size()); ++i) {
+    parameters[i].mutable_value_type()->mutable_integer()->set_bitsize(
+        element_bitsize);
+    parameters[i].set_log_domain_size(levels_to_evaluate[i]);
+  }
+  std::unique_ptr<distributed_point_functions::DistributedPointFunction> dpf =
+      distributed_point_functions::DistributedPointFunction::CreateIncremental(
+          parameters)
+          .value();
+
+  // Generate DPF key.
+  absl::BitGen rng;
+  absl::uint128 alpha = absl::MakeUint128(absl::Uniform<uint64_t>(rng),
+                                          absl::Uniform<uint64_t>(rng));
+  if (log_domain_size < 128) {
+    alpha %= absl::uint128{1} << log_domain_size;
+  }
+  std::vector<absl::uint128> beta(parameters.size(), 1);
+  distributed_point_functions::DpfKey key;
+  std::tie(key, std::ignore) =
+      dpf->GenerateKeysIncremental(alpha, beta).value();
+
+  // Run the experiment and measure time.
+  int num_iterations = absl::GetFlag(FLAGS_num_iterations);
+  using T = uint32_t;
+  absl::Time start = absl::Now();
+  if (only_nonzeros) {
+    RunBatchedSinglePointEvaluation<T>(*dpf, key, prefixes.back(),
+                                       num_iterations);
+  } else {
+    RunHierarchicalEvaluation<T>(*dpf, key, prefixes_to_evaluate,
+                                 num_iterations);
+  }
+  absl::Duration wallclock = absl::Now() - start;
+  LOG(INFO) << "Wallclock time per iteration: "
+            << wallclock / absl::GetFlag(FLAGS_num_iterations);
+}