1// Use ALWAYS at the tag level. Control is performed manually during command
2// line processing.
3#define ATRACE_TAG ATRACE_TAG_ALWAYS
4#include <utils/Trace.h>
5
6#include <base/files/file_util.h>
7#include <base/logging.h>
8#include <base/strings/string_split.h>
9#include <errno.h>
10#include <getopt.h>
11#include <pdx/client.h>
12#include <pdx/default_transport/client_channel_factory.h>
13#include <pdx/default_transport/service_endpoint.h>
14#include <pdx/rpc/buffer_wrapper.h>
15#include <pdx/rpc/default_initialization_allocator.h>
16#include <pdx/rpc/message_buffer.h>
17#include <pdx/rpc/remote_method.h>
18#include <pdx/rpc/serializable.h>
19#include <pdx/service.h>
20#include <sys/prctl.h>
21#include <time.h>
22#include <unistd.h>
23
24#include <atomic>
25#include <cstdlib>
26#include <functional>
27#include <future>
28#include <iomanip>
29#include <ios>
30#include <iostream>
31#include <memory>
32#include <numeric>
33#include <sstream>
34#include <string>
35#include <thread>
36#include <vector>
37
38using android::pdx::Channel;
39using android::pdx::ClientBase;
40using android::pdx::Endpoint;
41using android::pdx::ErrorStatus;
42using android::pdx::Message;
43using android::pdx::Service;
44using android::pdx::ServiceBase;
45using android::pdx::default_transport::ClientChannelFactory;
46using android::pdx::Status;
47using android::pdx::Transaction;
48using android::pdx::rpc::BufferWrapper;
49using android::pdx::rpc::DefaultInitializationAllocator;
50using android::pdx::rpc::MessageBuffer;
51using android::pdx::rpc::DispatchRemoteMethod;
52using android::pdx::rpc::RemoteMethodReturn;
53using android::pdx::rpc::ReplyBuffer;
54using android::pdx::rpc::Void;
55using android::pdx::rpc::WrapBuffer;
56
57namespace {
58
59constexpr size_t kMaxMessageSize = 4096 * 1024;
60
61std::string GetServicePath(const std::string& path, int instance_id) {
62  return path + std::to_string(instance_id);
63}
64
65void SetThreadName(const std::string& name) {
66  prctl(PR_SET_NAME, reinterpret_cast<unsigned long>(name.c_str()), 0, 0, 0);
67}
68
69constexpr uint64_t kNanosPerSecond = 1000000000llu;
70
71uint64_t GetClockNs() {
72  timespec t;
73  clock_gettime(CLOCK_MONOTONIC, &t);
74  return kNanosPerSecond * t.tv_sec + t.tv_nsec;
75}
76
77template <typename T>
78ssize_t ssizeof(const T&) {
79  return static_cast<ssize_t>(sizeof(T));
80}
81
82class SchedStats {
83 public:
84  SchedStats() : SchedStats(gettid()) {}
85  SchedStats(pid_t task_id) : task_id_(task_id) {}
86  SchedStats(const SchedStats&) = default;
87  SchedStats& operator=(const SchedStats&) = default;
88
89  void Update() {
90    const std::string stats_path =
91        "/proc/" + std::to_string(task_id_) + "/schedstat";
92
93    std::string line;
94    base::ReadFileToString(base::FilePath{stats_path}, &line);
95    std::vector<std::string> stats = base::SplitString(
96        line, " ", base::TRIM_WHITESPACE, base::SPLIT_WANT_ALL);
97
98    CHECK_EQ(3u, stats.size());
99
100    // Calculate the deltas since the last update. Each value is absolute since
101    // the task started.
102    uint64_t current_cpu_time_ns = std::stoull(stats[0]);
103    uint64_t current_wait_ns = std::stoull(stats[1]);
104    uint64_t current_timeslices = std::stoull(stats[2]);
105    cpu_time_ns_ = current_cpu_time_ns - last_cpu_time_ns_;
106    wait_ns_ = current_wait_ns - last_wait_ns_;
107    timeslices_ = current_timeslices - last_timeslices_;
108    last_cpu_time_ns_ = current_cpu_time_ns;
109    last_wait_ns_ = current_wait_ns;
110    last_timeslices_ = current_timeslices;
111  }
112
113  pid_t task_id() const { return task_id_; }
114  uint64_t cpu_time_ns() const { return cpu_time_ns_; }
115  uint64_t wait_ns() const { return wait_ns_; }
116  uint64_t timeslices() const { return timeslices_; }
117
118  double cpu_time_s() const {
119    return static_cast<double>(cpu_time_ns_) / kNanosPerSecond;
120  }
121  double wait_s() const {
122    return static_cast<double>(wait_ns_) / kNanosPerSecond;
123  }
124
125 private:
126  int32_t task_id_;
127  uint64_t cpu_time_ns_ = 0;
128  uint64_t last_cpu_time_ns_ = 0;
129  uint64_t wait_ns_ = 0;
130  uint64_t last_wait_ns_ = 0;
131  uint64_t timeslices_ = 0;
132  uint64_t last_timeslices_ = 0;
133
134  PDX_SERIALIZABLE_MEMBERS(SchedStats, task_id_, cpu_time_ns_, wait_ns_,
135                           timeslices_);
136};
137
138// Opcodes for client/service protocol.
139struct BenchmarkOps {
140  enum : int {
141    Nop,
142    Read,
143    Write,
144    Echo,
145    Stats,
146    WriteVector,
147    EchoVector,
148    Quit,
149  };
150};
151
152struct BenchmarkRPC {
153  PDX_REMOTE_METHOD(Stats, BenchmarkOps::Stats,
154                    std::tuple<uint64_t, uint64_t, SchedStats>(Void));
155  PDX_REMOTE_METHOD(WriteVector, BenchmarkOps::WriteVector,
156                    int(const BufferWrapper<std::vector<uint8_t>> data));
157  PDX_REMOTE_METHOD(EchoVector, BenchmarkOps::EchoVector,
158                    BufferWrapper<std::vector<uint8_t>>(
159                        const BufferWrapper<std::vector<uint8_t>> data));
160};
161
162struct BenchmarkResult {
163  int thread_id = 0;
164  int service_id = 0;
165  double time_delta_s = 0.0;
166  uint64_t bytes_sent = 0;
167  SchedStats sched_stats = {};
168};
169
170// Global command line option values.
171struct Options {
172  bool verbose = false;
173  int threads = 1;
174  int opcode = BenchmarkOps::Read;
175  int blocksize = 1;
176  int count = 1;
177  int instances = 1;
178  int timeout = 1;
179  int warmup = 0;
180} ProgramOptions;
181
182// Command line option names.
183const char kOptionService[] = "service";
184const char kOptionClient[] = "client";
185const char kOptionVerbose[] = "verbose";
186const char kOptionOpcode[] = "op";
187const char kOptionBlocksize[] = "bs";
188const char kOptionCount[] = "count";
189const char kOptionThreads[] = "threads";
190const char kOptionInstances[] = "instances";
191const char kOptionTimeout[] = "timeout";
192const char kOptionTrace[] = "trace";
193const char kOptionWarmup[] = "warmup";
194
195// getopt() long options.
196static option long_options[] = {
197    {kOptionService, required_argument, 0, 0},
198    {kOptionClient, required_argument, 0, 0},
199    {kOptionVerbose, no_argument, 0, 0},
200    {kOptionOpcode, required_argument, 0, 0},
201    {kOptionBlocksize, required_argument, 0, 0},
202    {kOptionCount, required_argument, 0, 0},
203    {kOptionThreads, required_argument, 0, 0},
204    {kOptionInstances, required_argument, 0, 0},
205    {kOptionTimeout, required_argument, 0, 0},
206    {kOptionTrace, no_argument, 0, 0},
207    {kOptionWarmup, required_argument, 0, 0},
208    {0, 0, 0, 0},
209};
210
211// Parses the argument for kOptionOpcode and sets the value of
212// ProgramOptions.opcode.
213void ParseOpcodeOption(const std::string& argument) {
214  if (argument == "read") {
215    ProgramOptions.opcode = BenchmarkOps::Read;
216  } else if (argument == "write") {
217    ProgramOptions.opcode = BenchmarkOps::Write;
218  } else if (argument == "echo") {
219    ProgramOptions.opcode = BenchmarkOps::Echo;
220  } else if (argument == "writevec") {
221    ProgramOptions.opcode = BenchmarkOps::WriteVector;
222  } else if (argument == "echovec") {
223    ProgramOptions.opcode = BenchmarkOps::EchoVector;
224  } else if (argument == "quit") {
225    ProgramOptions.opcode = BenchmarkOps::Quit;
226  } else if (argument == "nop") {
227    ProgramOptions.opcode = BenchmarkOps::Nop;
228  } else if (argument == "stats") {
229    ProgramOptions.opcode = BenchmarkOps::Stats;
230  } else {
231    ProgramOptions.opcode = std::stoi(argument);
232  }
233}
234
235// Implements the service side of the benchmark.
236class BenchmarkService : public ServiceBase<BenchmarkService> {
237 public:
238  std::shared_ptr<Channel> OnChannelOpen(Message& message) override {
239    VLOG(1) << "BenchmarkService::OnChannelCreate: cid="
240            << message.GetChannelId();
241    return nullptr;
242  }
243
244  void OnChannelClose(Message& message,
245                      const std::shared_ptr<Channel>& /*channel*/) override {
246    VLOG(1) << "BenchmarkService::OnChannelClose: cid="
247            << message.GetChannelId();
248  }
249
250  Status<void> HandleMessage(Message& message) override {
251    ATRACE_NAME("BenchmarkService::HandleMessage");
252
253    switch (message.GetOp()) {
254      case BenchmarkOps::Nop:
255        VLOG(1) << "BenchmarkService::HandleMessage: op=nop";
256        {
257          ATRACE_NAME("Reply");
258          CHECK(message.Reply(0));
259        }
260        return {};
261
262      case BenchmarkOps::Write: {
263        VLOG(1) << "BenchmarkService::HandleMessage: op=write send_length="
264                << message.GetSendLength()
265                << " receive_length=" << message.GetReceiveLength();
266
267        Status<void> status;
268        if (message.GetSendLength())
269          status = message.ReadAll(send_buffer.data(), message.GetSendLength());
270
271        {
272          ATRACE_NAME("Reply");
273          if (!status)
274            CHECK(message.ReplyError(status.error()));
275          else
276            CHECK(message.Reply(message.GetSendLength()));
277        }
278        return {};
279      }
280
281      case BenchmarkOps::Read: {
282        VLOG(1) << "BenchmarkService::HandleMessage: op=read send_length="
283                << message.GetSendLength()
284                << " receive_length=" << message.GetReceiveLength();
285
286        Status<void> status;
287        if (message.GetReceiveLength()) {
288          status = message.WriteAll(receive_buffer.data(),
289                                    message.GetReceiveLength());
290        }
291
292        {
293          ATRACE_NAME("Reply");
294          if (!status)
295            CHECK(message.ReplyError(status.error()));
296          else
297            CHECK(message.Reply(message.GetReceiveLength()));
298        }
299        return {};
300      }
301
302      case BenchmarkOps::Echo: {
303        VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
304                << message.GetSendLength()
305                << " receive_length=" << message.GetReceiveLength();
306
307        Status<void> status;
308        if (message.GetSendLength())
309          status = message.ReadAll(send_buffer.data(), message.GetSendLength());
310
311        if (!status) {
312          CHECK(message.ReplyError(status.error()));
313          return {};
314        }
315
316        if (message.GetSendLength()) {
317          status =
318              message.WriteAll(send_buffer.data(), message.GetSendLength());
319        }
320
321        {
322          ATRACE_NAME("Reply");
323          if (!status)
324            CHECK(message.ReplyError(status.error()));
325          else
326            CHECK(message.Reply(message.GetSendLength()));
327        }
328        return {};
329      }
330
331      case BenchmarkOps::Stats: {
332        VLOG(1) << "BenchmarkService::HandleMessage: op=echo send_length="
333                << message.GetSendLength()
334                << " receive_length=" << message.GetReceiveLength();
335
336        // Snapshot the stats when the message is received.
337        const uint64_t receive_time_ns = GetClockNs();
338        sched_stats_.Update();
339
340        // Use the RPC system to return the results.
341        RemoteMethodReturn<BenchmarkRPC::Stats>(
342            message, BenchmarkRPC::Stats::Return{receive_time_ns, GetClockNs(),
343                                                 sched_stats_});
344        return {};
345      }
346
347      case BenchmarkOps::WriteVector:
348        VLOG(1) << "BenchmarkService::HandleMessage: op=writevec send_length="
349                << message.GetSendLength()
350                << " receive_length=" << message.GetReceiveLength();
351
352        DispatchRemoteMethod<BenchmarkRPC::WriteVector>(
353            *this, &BenchmarkService::OnWriteVector, message, kMaxMessageSize);
354        return {};
355
356      case BenchmarkOps::EchoVector:
357        VLOG(1) << "BenchmarkService::HandleMessage: op=echovec send_length="
358                << message.GetSendLength()
359                << " receive_length=" << message.GetReceiveLength();
360
361        DispatchRemoteMethod<BenchmarkRPC::EchoVector>(
362            *this, &BenchmarkService::OnEchoVector, message, kMaxMessageSize);
363        return {};
364
365      case BenchmarkOps::Quit:
366        Cancel();
367        return ErrorStatus{ESHUTDOWN};
368
369      default:
370        VLOG(1) << "BenchmarkService::HandleMessage: default case; op="
371                << message.GetOp();
372        return Service::DefaultHandleMessage(message);
373    }
374  }
375
376  // Updates the scheduler stats from procfs for this thread.
377  void UpdateSchedStats() { sched_stats_.Update(); }
378
379 private:
380  friend BASE;
381
382  BenchmarkService(std::unique_ptr<Endpoint> endpoint)
383      : BASE("BenchmarkService", std::move(endpoint)),
384        send_buffer(kMaxMessageSize),
385        receive_buffer(kMaxMessageSize) {}
386
387  std::vector<uint8_t> send_buffer;
388  std::vector<uint8_t> receive_buffer;
389
390  // Each service thread has its own scheduler stats object.
391  static thread_local SchedStats sched_stats_;
392
393  using BufferType = BufferWrapper<
394      std::vector<uint8_t, DefaultInitializationAllocator<uint8_t>>>;
395
396  int OnWriteVector(Message&, const BufferType& data) { return data.size(); }
397  BufferType OnEchoVector(Message&, BufferType&& data) {
398    return std::move(data);
399  }
400
401  BenchmarkService(const BenchmarkService&) = delete;
402  void operator=(const BenchmarkService&) = delete;
403};
404
405thread_local SchedStats BenchmarkService::sched_stats_;
406
407// Implements the client side of the benchmark.
408class BenchmarkClient : public ClientBase<BenchmarkClient> {
409 public:
410  int Nop() {
411    ATRACE_NAME("BenchmarkClient::Nop");
412    VLOG(1) << "BenchmarkClient::Nop";
413    Transaction transaction{*this};
414    return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Nop));
415  }
416
417  int Write(const void* buffer, size_t length) {
418    ATRACE_NAME("BenchmarkClient::Write");
419    VLOG(1) << "BenchmarkClient::Write: buffer=" << buffer
420            << " length=" << length;
421    Transaction transaction{*this};
422    return ReturnStatusOrError(
423        transaction.Send<int>(BenchmarkOps::Write, buffer, length, nullptr, 0));
424    // return write(endpoint_fd(), buffer, length);
425  }
426
427  int Read(void* buffer, size_t length) {
428    ATRACE_NAME("BenchmarkClient::Read");
429    VLOG(1) << "BenchmarkClient::Read: buffer=" << buffer
430            << " length=" << length;
431    Transaction transaction{*this};
432    return ReturnStatusOrError(
433        transaction.Send<int>(BenchmarkOps::Read, nullptr, 0, buffer, length));
434    // return read(endpoint_fd(), buffer, length);
435  }
436
437  int Echo(const void* send_buffer, size_t send_length, void* receive_buffer,
438           size_t receive_length) {
439    ATRACE_NAME("BenchmarkClient::Echo");
440    VLOG(1) << "BenchmarkClient::Echo: send_buffer=" << send_buffer
441            << " send_length=" << send_length
442            << " receive_buffer=" << receive_buffer
443            << " receive_length=" << receive_length;
444    Transaction transaction{*this};
445    return ReturnStatusOrError(
446        transaction.Send<int>(BenchmarkOps::Echo, send_buffer, send_length,
447                              receive_buffer, receive_length));
448  }
449
450  int Stats(std::tuple<uint64_t, uint64_t, SchedStats>* stats_out) {
451    ATRACE_NAME("BenchmarkClient::Stats");
452    VLOG(1) << "BenchmarkClient::Stats";
453
454    auto status = InvokeRemoteMethodInPlace<BenchmarkRPC::Stats>(stats_out);
455    return status ? 0 : -status.error();
456  }
457
458  int WriteVector(const BufferWrapper<std::vector<uint8_t>>& data) {
459    ATRACE_NAME("BenchmarkClient::Stats");
460    VLOG(1) << "BenchmarkClient::Stats";
461
462    auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
463    return ReturnStatusOrError(status);
464  }
465
466  template <typename T>
467  int WriteVector(const BufferWrapper<T>& data) {
468    ATRACE_NAME("BenchmarkClient::WriteVector");
469    VLOG(1) << "BenchmarkClient::WriteVector";
470
471    auto status = InvokeRemoteMethod<BenchmarkRPC::WriteVector>(data);
472    return ReturnStatusOrError(status);
473  }
474
475  template <typename T, typename U>
476  int EchoVector(const BufferWrapper<T>& data, BufferWrapper<U>* data_out) {
477    ATRACE_NAME("BenchmarkClient::EchoVector");
478    VLOG(1) << "BenchmarkClient::EchoVector";
479
480    MessageBuffer<ReplyBuffer>::Reserve(kMaxMessageSize - 1);
481    auto status =
482        InvokeRemoteMethodInPlace<BenchmarkRPC::EchoVector>(data_out, data);
483    return status ? 0 : -status.error();
484  }
485
486  int Quit() {
487    VLOG(1) << "BenchmarkClient::Quit";
488    Transaction transaction{*this};
489    return ReturnStatusOrError(transaction.Send<int>(BenchmarkOps::Echo));
490  }
491
492 private:
493  friend BASE;
494
495  BenchmarkClient(const std::string& service_path)
496      : BASE(ClientChannelFactory::Create(service_path),
497             ProgramOptions.timeout) {}
498
499  BenchmarkClient(const BenchmarkClient&) = delete;
500  void operator=(const BenchmarkClient&) = delete;
501};
502
503// Creates a benchmark service at |path| and dispatches messages.
504int ServiceCommand(const std::string& path) {
505  if (path.empty())
506    return -EINVAL;
507
508  // Start the requested number of dispatch threads.
509  std::vector<std::thread> dispatch_threads;
510  int service_count = ProgramOptions.instances;
511  int service_id_counter = 0;
512  int thread_id_counter = 0;
513  std::atomic<bool> done(false);
514
515  while (service_count--) {
516    std::cerr << "Starting service instance " << service_id_counter
517              << std::endl;
518    auto service = BenchmarkService::Create(
519        android::pdx::default_transport::Endpoint::CreateAndBindSocket(
520            GetServicePath(path, service_id_counter),
521            android::pdx::default_transport::Endpoint::kBlocking));
522    if (!service) {
523      std::cerr << "Failed to create service instance!!" << std::endl;
524      done = true;
525      break;
526    }
527
528    int thread_count = ProgramOptions.threads;
529    while (thread_count--) {
530      std::cerr << "Starting dispatch thread " << thread_id_counter
531                << " service " << service_id_counter << std::endl;
532
533      dispatch_threads.emplace_back(
534          [&](const int thread_id, const int service_id,
535              const std::shared_ptr<BenchmarkService>& local_service) {
536            SetThreadName("service" + std::to_string(service_id));
537
538            // Read the initial schedstats for this thread from procfs.
539            local_service->UpdateSchedStats();
540
541            ATRACE_NAME("BenchmarkService::Dispatch");
542            while (!done) {
543              auto ret = local_service->ReceiveAndDispatch();
544              if (!ret) {
545                if (ret.error() != ESHUTDOWN) {
546                  std::cerr << "Error while dispatching message on thread "
547                            << thread_id << " service " << service_id << ": "
548                            << ret.GetErrorMessage() << std::endl;
549                } else {
550                  std::cerr << "Quitting thread " << thread_id << " service "
551                            << service_id << std::endl;
552                }
553                done = true;
554                return;
555              }
556            }
557          },
558          thread_id_counter++, service_id_counter, service);
559    }
560
561    service_id_counter++;
562  }
563
564  // Wait for the dispatch threads to exit.
565  for (auto& thread : dispatch_threads) {
566    thread.join();
567  }
568
569  return 0;
570}
571
572int ClientCommand(const std::string& path) {
573  // Start the requested number of client threads.
574  std::vector<std::thread> client_threads;
575  std::vector<std::future<BenchmarkResult>> client_results;
576  int service_count = ProgramOptions.instances;
577  int thread_id_counter = 0;
578  int service_id_counter = 0;
579
580  // Aggregate statistics, updated when worker threads exit.
581  std::atomic<uint64_t> total_bytes(0);
582  std::atomic<uint64_t> total_time_ns(0);
583
584  // Samples for variance calculation.
585  std::vector<uint64_t> latency_samples_ns(
586      ProgramOptions.instances * ProgramOptions.threads * ProgramOptions.count);
587  const size_t samples_per_thread = ProgramOptions.count;
588
589  std::vector<uint8_t> send_buffer(ProgramOptions.blocksize);
590  std::vector<uint8_t> receive_buffer(kMaxMessageSize);
591
592  // Barriers for synchronizing thread start.
593  std::vector<std::future<void>> ready_barrier_futures;
594  std::promise<void> go_barrier_promise;
595  std::future<void> go_barrier_future = go_barrier_promise.get_future();
596
597  // Barrier for synchronizing thread tear down.
598  std::promise<void> done_barrier_promise;
599  std::future<void> done_barrier_future = done_barrier_promise.get_future();
600
601  while (service_count--) {
602    int thread_count = ProgramOptions.threads;
603    while (thread_count--) {
604      std::cerr << "Starting client thread " << thread_id_counter << " service "
605                << service_id_counter << std::endl;
606
607      std::promise<BenchmarkResult> result_promise;
608      client_results.push_back(result_promise.get_future());
609
610      std::promise<void> ready_barrier_promise;
611      ready_barrier_futures.push_back(ready_barrier_promise.get_future());
612
613      client_threads.emplace_back(
614          [&](const int thread_id, const int service_id,
615              std::promise<BenchmarkResult> result, std::promise<void> ready) {
616            SetThreadName("client" + std::to_string(thread_id) + "/" +
617                          std::to_string(service_id));
618
619            ATRACE_NAME("BenchmarkClient::Dispatch");
620
621            auto client =
622                BenchmarkClient::Create(GetServicePath(path, service_id));
623            if (!client) {
624              std::cerr << "Failed to create client for service " << service_id
625                        << std::endl;
626              return -ENOMEM;
627            }
628
629            uint64_t* thread_samples =
630                &latency_samples_ns[samples_per_thread * thread_id];
631
632            // Per-thread statistics.
633            uint64_t bytes_sent = 0;
634            uint64_t time_start_ns;
635            uint64_t time_end_ns;
636            SchedStats sched_stats;
637
638            // Signal ready and wait for go.
639            ready.set_value();
640            go_barrier_future.wait();
641
642            // Warmup the scheduler.
643            int warmup = ProgramOptions.warmup;
644            while (warmup--) {
645              for (int i = 0; i < 1000000; i++)
646                ;
647            }
648
649            sched_stats.Update();
650            time_start_ns = GetClockNs();
651
652            int count = ProgramOptions.count;
653            while (count--) {
654              uint64_t iteration_start_ns = GetClockNs();
655
656              switch (ProgramOptions.opcode) {
657                case BenchmarkOps::Nop: {
658                  const int ret = client->Nop();
659                  if (ret < 0) {
660                    std::cerr << "Failed to send nop: " << strerror(-ret)
661                              << std::endl;
662                    return ret;
663                  } else {
664                    VLOG(1) << "Success";
665                  }
666                  break;
667                }
668
669                case BenchmarkOps::Read: {
670                  const int ret = client->Read(receive_buffer.data(),
671                                               ProgramOptions.blocksize);
672                  if (ret < 0) {
673                    std::cerr << "Failed to read: " << strerror(-ret)
674                              << std::endl;
675                    return ret;
676                  } else if (ret != ProgramOptions.blocksize) {
677                    std::cerr << "Expected ret=" << ProgramOptions.blocksize
678                              << "; actual ret=" << ret << std::endl;
679                    return -EINVAL;
680                  } else {
681                    VLOG(1) << "Success";
682                    bytes_sent += ret;
683                  }
684                  break;
685                }
686
687                case BenchmarkOps::Write: {
688                  const int ret =
689                      client->Write(send_buffer.data(), send_buffer.size());
690                  if (ret < 0) {
691                    std::cerr << "Failed to write: " << strerror(-ret)
692                              << std::endl;
693                    return ret;
694                  } else if (ret != ProgramOptions.blocksize) {
695                    std::cerr << "Expected ret=" << ProgramOptions.blocksize
696                              << "; actual ret=" << ret << std::endl;
697                    return -EINVAL;
698                  } else {
699                    VLOG(1) << "Success";
700                    bytes_sent += ret;
701                  }
702                  break;
703                }
704
705                case BenchmarkOps::Echo: {
706                  const int ret = client->Echo(
707                      send_buffer.data(), send_buffer.size(),
708                      receive_buffer.data(), receive_buffer.size());
709                  if (ret < 0) {
710                    std::cerr << "Failed to echo: " << strerror(-ret)
711                              << std::endl;
712                    return ret;
713                  } else if (ret != ProgramOptions.blocksize) {
714                    std::cerr << "Expected ret=" << ProgramOptions.blocksize
715                              << "; actual ret=" << ret << std::endl;
716                    return -EINVAL;
717                  } else {
718                    VLOG(1) << "Success";
719                    bytes_sent += ret * 2;
720                  }
721                  break;
722                }
723
724                case BenchmarkOps::Stats: {
725                  std::tuple<uint64_t, uint64_t, SchedStats> stats;
726                  const int ret = client->Stats(&stats);
727                  if (ret < 0) {
728                    std::cerr << "Failed to get stats: " << strerror(-ret)
729                              << std::endl;
730                    return ret;
731                  } else {
732                    VLOG(1) << "Success";
733                    std::cerr
734                        << "Round trip: receive_time_ns=" << std::get<0>(stats)
735                        << " reply_time_ns=" << std::get<1>(stats)
736                        << " cpu_time_s=" << std::get<2>(stats).cpu_time_s()
737                        << " wait_s=" << std::get<2>(stats).wait_s()
738                        << std::endl;
739                  }
740                  break;
741                }
742
743                case BenchmarkOps::WriteVector: {
744                  const int ret = client->WriteVector(
745                      WrapBuffer(send_buffer.data(), ProgramOptions.blocksize));
746                  if (ret < 0) {
747                    std::cerr << "Failed to write vector: " << strerror(-ret)
748                              << std::endl;
749                    return ret;
750                  } else {
751                    VLOG(1) << "Success";
752                    bytes_sent += ret;
753                  }
754                  break;
755                }
756
757                case BenchmarkOps::EchoVector: {
758                  thread_local BufferWrapper<std::vector<
759                      uint8_t, DefaultInitializationAllocator<uint8_t>>>
760                      response_buffer;
761                  const int ret = client->EchoVector(
762                      WrapBuffer(send_buffer.data(), ProgramOptions.blocksize),
763                      &response_buffer);
764                  if (ret < 0) {
765                    std::cerr << "Failed to echo vector: " << strerror(-ret)
766                              << std::endl;
767                    return ret;
768                  } else {
769                    VLOG(1) << "Success";
770                    bytes_sent += send_buffer.size() + response_buffer.size();
771                  }
772                  break;
773                }
774
775                case BenchmarkOps::Quit: {
776                  const int ret = client->Quit();
777                  if (ret < 0 && ret != -ESHUTDOWN) {
778                    std::cerr << "Failed to send quit: " << strerror(-ret);
779                    return ret;
780                  } else {
781                    VLOG(1) << "Success";
782                  }
783                  break;
784                }
785
786                default:
787                  std::cerr
788                      << "Invalid client operation: " << ProgramOptions.opcode
789                      << std::endl;
790                  return -EINVAL;
791              }
792
793              uint64_t iteration_end_ns = GetClockNs();
794              uint64_t iteration_delta_ns =
795                  iteration_end_ns - iteration_start_ns;
796              thread_samples[count] = iteration_delta_ns;
797
798              if (iteration_delta_ns > (kNanosPerSecond / 100)) {
799                SchedStats stats = sched_stats;
800                stats.Update();
801                std::cerr << "Thread " << thread_id << " iteration_delta_s="
802                          << (static_cast<double>(iteration_delta_ns) /
803                              kNanosPerSecond)
804                          << " " << stats.cpu_time_s() << " " << stats.wait_s()
805                          << std::endl;
806              }
807            }
808
809            time_end_ns = GetClockNs();
810            sched_stats.Update();
811
812            const double time_delta_s =
813                static_cast<double>(time_end_ns - time_start_ns) /
814                kNanosPerSecond;
815
816            total_bytes += bytes_sent;
817            total_time_ns += time_end_ns - time_start_ns;
818
819            result.set_value(
820                {thread_id, service_id, time_delta_s, bytes_sent, sched_stats});
821            done_barrier_future.wait();
822
823            return 0;
824          },
825          thread_id_counter++, service_id_counter, std::move(result_promise),
826          std::move(ready_barrier_promise));
827    }
828
829    service_id_counter++;
830  }
831
832  // Wait for workers to be ready.
833  std::cerr << "Waiting for workers to be ready..." << std::endl;
834  for (auto& ready : ready_barrier_futures)
835    ready.wait();
836
837  // Signal workers to go.
838  std::cerr << "Kicking off benchmark." << std::endl;
839  go_barrier_promise.set_value();
840
841  // Wait for all the worker threas to finish.
842  for (auto& result : client_results)
843    result.wait();
844
845  // Report worker thread results.
846  for (auto& result : client_results) {
847    BenchmarkResult benchmark_result = result.get();
848    std::cerr << std::fixed << "Thread " << benchmark_result.thread_id
849              << " service " << benchmark_result.service_id << ":" << std::endl;
850    std::cerr << "\t " << benchmark_result.bytes_sent << " bytes in "
851              << benchmark_result.time_delta_s << " seconds ("
852              << std::setprecision(0) << (benchmark_result.bytes_sent / 1024.0 /
853                                          benchmark_result.time_delta_s)
854              << " K/s; " << std::setprecision(3)
855              << (ProgramOptions.count / benchmark_result.time_delta_s)
856              << " txn/s; " << std::setprecision(9)
857              << (benchmark_result.time_delta_s / ProgramOptions.count)
858              << " s/txn)" << std::endl;
859    std::cerr << "\tStats: " << benchmark_result.sched_stats.cpu_time_s() << " "
860              << (benchmark_result.sched_stats.cpu_time_s() /
861                  ProgramOptions.count)
862              << " " << benchmark_result.sched_stats.wait_s() << " "
863              << (benchmark_result.sched_stats.wait_s() / ProgramOptions.count)
864              << " " << benchmark_result.sched_stats.timeslices() << std::endl;
865  }
866
867  // Signal worker threads to exit.
868  done_barrier_promise.set_value();
869
870  // Wait for the worker threads to exit.
871  for (auto& thread : client_threads) {
872    thread.join();
873  }
874
875  // Report aggregate results.
876  const int total_threads = ProgramOptions.threads * ProgramOptions.instances;
877  const int iterations = ProgramOptions.count;
878  const double total_time_s =
879      static_cast<double>(total_time_ns) / kNanosPerSecond;
880  // This is about how much wall time it took to completely transfer all the
881  // paylaods.
882  const double average_time_s = total_time_s / total_threads;
883
884  const uint64_t min_sample_time_ns =
885      *std::min_element(latency_samples_ns.begin(), latency_samples_ns.end());
886  const double min_sample_time_s =
887      static_cast<double>(min_sample_time_ns) / kNanosPerSecond;
888
889  const uint64_t max_sample_time_ns =
890      *std::max_element(latency_samples_ns.begin(), latency_samples_ns.end());
891  const double max_sample_time_s =
892      static_cast<double>(max_sample_time_ns) / kNanosPerSecond;
893
894  const double total_sample_time_s =
895      std::accumulate(latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
896                      [](double s, uint64_t ns) {
897                        return s + static_cast<double>(ns) / kNanosPerSecond;
898                      });
899  const double average_sample_time_s =
900      total_sample_time_s / latency_samples_ns.size();
901
902  const double sum_of_squared_deviations = std::accumulate(
903      latency_samples_ns.begin(), latency_samples_ns.end(), 0.0,
904      [&](double s, uint64_t ns) {
905        const double delta =
906            static_cast<double>(ns) / kNanosPerSecond - average_sample_time_s;
907        return s + delta * delta;
908      });
909  const double variance = sum_of_squared_deviations / latency_samples_ns.size();
910  const double standard_deviation = std::sqrt(variance);
911
912  const int num_buckets = 200;
913  const uint64_t sample_range_ns = max_sample_time_ns - min_sample_time_ns;
914  const uint64_t ns_per_bucket = sample_range_ns / num_buckets;
915  std::array<uint64_t, num_buckets> sample_buckets = {{0}};
916
917  // Count samples in each bucket range.
918  for (uint64_t sample_ns : latency_samples_ns) {
919    sample_buckets[(sample_ns - min_sample_time_ns) / (ns_per_bucket + 1)] += 1;
920  }
921
922  // Calculate population percentiles.
923  const uint64_t percent_50 =
924      static_cast<uint64_t>(latency_samples_ns.size() * 0.5);
925  const uint64_t percent_90 =
926      static_cast<uint64_t>(latency_samples_ns.size() * 0.9);
927  const uint64_t percent_95 =
928      static_cast<uint64_t>(latency_samples_ns.size() * 0.95);
929  const uint64_t percent_99 =
930      static_cast<uint64_t>(latency_samples_ns.size() * 0.99);
931
932  uint64_t sample_count = 0;
933  double latency_50th_percentile_s, latency_90th_percentile_s,
934      latency_95th_percentile_s, latency_99th_percentile_s;
935  for (int i = 0; i < num_buckets; i++) {
936    // Report the midpoint of the bucket range as the value of the
937    // corresponding
938    // percentile.
939    const double bucket_midpoint_time_s =
940        (ns_per_bucket * i + 0.5 * ns_per_bucket + min_sample_time_ns) /
941        kNanosPerSecond;
942    if (sample_count < percent_50 &&
943        (sample_count + sample_buckets[i]) >= percent_50) {
944      latency_50th_percentile_s = bucket_midpoint_time_s;
945    }
946    if (sample_count < percent_90 &&
947        (sample_count + sample_buckets[i]) >= percent_90) {
948      latency_90th_percentile_s = bucket_midpoint_time_s;
949    }
950    if (sample_count < percent_95 &&
951        (sample_count + sample_buckets[i]) >= percent_95) {
952      latency_95th_percentile_s = bucket_midpoint_time_s;
953    }
954    if (sample_count < percent_99 &&
955        (sample_count + sample_buckets[i]) >= percent_99) {
956      latency_99th_percentile_s = bucket_midpoint_time_s;
957    }
958    sample_count += sample_buckets[i];
959  }
960
961  std::cerr << std::fixed << "Total throughput over " << total_threads
962            << " threads:\n\t " << total_bytes << " bytes in " << average_time_s
963            << " seconds (" << std::setprecision(0)
964            << (total_bytes / 1024.0 / average_time_s) << " K/s; "
965            << std::setprecision(3)
966            << (iterations * total_threads / average_time_s)
967            << std::setprecision(9) << " txn/s; "
968            << (average_time_s / (iterations * total_threads)) << " s/txn)"
969            << std::endl;
970  std::cerr << "Sample statistics: " << std::endl;
971  std::cerr << total_sample_time_s << " s total sample time" << std::endl;
972  std::cerr << average_sample_time_s << " s avg" << std::endl;
973  std::cerr << standard_deviation << " s std dev" << std::endl;
974  std::cerr << min_sample_time_s << " s min" << std::endl;
975  std::cerr << max_sample_time_s << " s max" << std::endl;
976  std::cerr << "Latency percentiles:" << std::endl;
977  std::cerr << "50th: " << latency_50th_percentile_s << " s" << std::endl;
978  std::cerr << "90th: " << latency_90th_percentile_s << " s" << std::endl;
979  std::cerr << "95th: " << latency_95th_percentile_s << " s" << std::endl;
980  std::cerr << "99th: " << latency_99th_percentile_s << " s" << std::endl;
981
982  std::cout << total_time_ns << " " << std::fixed << std::setprecision(9)
983            << average_sample_time_s << " " << std::fixed
984            << std::setprecision(9) << standard_deviation << std::endl;
985  return 0;
986}
987
988int Usage(const std::string& command_name) {
989  // clang-format off
990  std::cout << "Usage: " << command_name << " [options]" << std::endl;
991  std::cout << "\t--verbose                   : Use verbose messages." << std::endl;
992  std::cout << "\t--service <endpoint path>   : Start service at the given path." << std::endl;
993  std::cout << "\t--client <endpoint path>    : Start client to the given path." << std::endl;
994  std::cout << "\t--op <read | write | echo>  : Sepcify client operation mode." << std::endl;
995  std::cout << "\t--bs <block size bytes>     : Sepcify block size to use." << std::endl;
996  std::cout << "\t--count <count>             : Sepcify number of transactions to make." << std::endl;
997  std::cout << "\t--instances <count>         : Specify number of service instances." << std::endl;
998  std::cout << "\t--threads <count>           : Sepcify number of threads per instance." << std::endl;
999  std::cout << "\t--timeout <timeout ms | -1> : Timeout to wait for services." << std::endl;
1000  std::cout << "\t--trace                     : Enable systrace logging." << std::endl;
1001  std::cout << "\t--warmup <iterations>       : Busy loops before running benchmarks." << std::endl;
1002  // clang-format on
1003  return -1;
1004}
1005
1006}  // anonymous namespace
1007
1008int main(int argc, char** argv) {
1009  logging::LoggingSettings logging_settings;
1010  logging_settings.logging_dest = logging::LOG_TO_SYSTEM_DEBUG_LOG;
1011  logging::InitLogging(logging_settings);
1012
1013  int getopt_code;
1014  int option_index;
1015  std::string option = "";
1016  std::string command = "";
1017  std::string command_argument = "";
1018  bool tracing_enabled = false;
1019
1020  // Process command line options.
1021  while ((getopt_code =
1022              getopt_long(argc, argv, "", long_options, &option_index)) != -1) {
1023    option = long_options[option_index].name;
1024    VLOG(1) << "option=" << option;
1025    switch (getopt_code) {
1026      case 0:
1027        if (option == kOptionVerbose) {
1028          ProgramOptions.verbose = true;
1029          logging::SetMinLogLevel(-1);
1030        } else if (option == kOptionOpcode) {
1031          ParseOpcodeOption(optarg);
1032        } else if (option == kOptionBlocksize) {
1033          ProgramOptions.blocksize = std::stoi(optarg);
1034          if (ProgramOptions.blocksize < 0) {
1035            std::cerr << "Invalid blocksize argument: "
1036                      << ProgramOptions.blocksize << std::endl;
1037            return -EINVAL;
1038          }
1039        } else if (option == kOptionCount) {
1040          ProgramOptions.count = std::stoi(optarg);
1041          if (ProgramOptions.count < 1) {
1042            std::cerr << "Invalid count argument: " << ProgramOptions.count
1043                      << std::endl;
1044            return -EINVAL;
1045          }
1046        } else if (option == kOptionThreads) {
1047          ProgramOptions.threads = std::stoi(optarg);
1048          if (ProgramOptions.threads < 1) {
1049            std::cerr << "Invalid threads argument: " << ProgramOptions.threads
1050                      << std::endl;
1051            return -EINVAL;
1052          }
1053        } else if (option == kOptionInstances) {
1054          ProgramOptions.instances = std::stoi(optarg);
1055          if (ProgramOptions.instances < 1) {
1056            std::cerr << "Invalid instances argument: "
1057                      << ProgramOptions.instances << std::endl;
1058            return -EINVAL;
1059          }
1060        } else if (option == kOptionTimeout) {
1061          ProgramOptions.timeout = std::stoi(optarg);
1062        } else if (option == kOptionTrace) {
1063          tracing_enabled = true;
1064        } else if (option == kOptionWarmup) {
1065          ProgramOptions.warmup = std::stoi(optarg);
1066        } else {
1067          command = option;
1068          if (optarg)
1069            command_argument = optarg;
1070        }
1071        break;
1072    }
1073  }
1074
1075  // Setup ATRACE/systrace based on command line.
1076  atrace_setup();
1077  atrace_set_tracing_enabled(tracing_enabled);
1078
1079  VLOG(1) << "command=" << command << " command_argument=" << command_argument;
1080
1081  if (command == "") {
1082    return Usage(argv[0]);
1083  } else if (command == kOptionService) {
1084    return ServiceCommand(command_argument);
1085  } else if (command == kOptionClient) {
1086    return ClientCommand(command_argument);
1087  } else {
1088    return Usage(argv[0]);
1089  }
1090}
1091