1#include <android-base/logging.h>
2#include <android/native_window.h>
3#include <benchmark/benchmark.h>
4#include <binder/IPCThreadState.h>
5#include <binder/IServiceManager.h>
6#include <dvr/dvr_api.h>
7#include <gui/BufferItem.h>
8#include <gui/BufferItemConsumer.h>
9#include <gui/Surface.h>
10#include <private/dvr/epoll_file_descriptor.h>
11#include <utils/Trace.h>
12
13#include <chrono>
14#include <functional>
15#include <iostream>
16#include <thread>
17#include <vector>
18
19#include <dlfcn.h>
20#include <poll.h>
21#include <sys/epoll.h>
22#include <sys/wait.h>
23
24// Use ALWAYS at the tag level. Control is performed manually during command
25// line processing.
26#ifdef ATRACE_TAG
27#undef ATRACE_TAG
28#endif
29#define ATRACE_TAG ATRACE_TAG_ALWAYS
30
31using namespace android;
32using ::benchmark::State;
33
34static const String16 kBinderService = String16("bufferTransport");
35static const uint32_t kBufferWidth = 100;
36static const uint32_t kBufferHeight = 1;
37static const uint32_t kBufferFormat = HAL_PIXEL_FORMAT_BLOB;
38static const uint64_t kBufferUsage =
39    GRALLOC_USAGE_SW_READ_OFTEN | GRALLOC_USAGE_SW_WRITE_OFTEN;
40static const uint32_t kBufferLayer = 1;
41static const int kMaxAcquiredImages = 1;
42static const int kQueueDepth = 2;  // We are double buffering for this test.
43static const size_t kMaxQueueCounts = 128;
44static const int kInvalidFence = -1;
45
46enum BufferTransportServiceCode {
47  CREATE_BUFFER_QUEUE = IBinder::FIRST_CALL_TRANSACTION,
48};
49
50// A binder services that minics a compositor that consumes buffers. It provides
51// one Binder interface to create a new Surface for buffer producer to write
52// into; while itself will carry out no-op buffer consuming by acquiring then
53// releasing the buffer immediately.
54class BufferTransportService : public BBinder {
55 public:
56  BufferTransportService() = default;
57  ~BufferTransportService() = default;
58
59  virtual status_t onTransact(uint32_t code, const Parcel& data, Parcel* reply,
60                              uint32_t flags = 0) {
61    (void)flags;
62    (void)data;
63    switch (code) {
64      case CREATE_BUFFER_QUEUE: {
65        auto new_queue = std::make_shared<BufferQueueHolder>(this);
66        reply->writeStrongBinder(
67            IGraphicBufferProducer::asBinder(new_queue->producer));
68        buffer_queues_.push_back(new_queue);
69        return NO_ERROR;
70      }
71      default:
72        return UNKNOWN_TRANSACTION;
73    };
74  }
75
76 private:
77  struct FrameListener : public ConsumerBase::FrameAvailableListener {
78   public:
79    FrameListener(BufferTransportService* /*service*/,
80                  sp<BufferItemConsumer> buffer_item_consumer)
81        : buffer_item_consumer_(buffer_item_consumer) {}
82
83    void onFrameAvailable(const BufferItem& /*item*/) override {
84      BufferItem buffer;
85      status_t ret = 0;
86      {
87        ATRACE_NAME("AcquireBuffer");
88        ret = buffer_item_consumer_->acquireBuffer(&buffer, /*presentWhen=*/0,
89                                                   /*waitForFence=*/false);
90      }
91
92      if (ret != NO_ERROR) {
93        LOG(ERROR) << "Failed to acquire next buffer.";
94        return;
95      }
96
97      {
98        ATRACE_NAME("ReleaseBuffer");
99        ret = buffer_item_consumer_->releaseBuffer(buffer);
100      }
101
102      if (ret != NO_ERROR) {
103        LOG(ERROR) << "Failed to release buffer.";
104        return;
105      }
106    }
107
108   private:
109    sp<BufferItemConsumer> buffer_item_consumer_;
110  };
111
112  struct BufferQueueHolder {
113    explicit BufferQueueHolder(BufferTransportService* service) {
114      BufferQueue::createBufferQueue(&producer, &consumer);
115
116      sp<BufferItemConsumer> buffer_item_consumer =
117          new BufferItemConsumer(consumer, kBufferUsage, kMaxAcquiredImages,
118                                 /*controlledByApp=*/true);
119      buffer_item_consumer->setName(String8("BinderBufferTransport"));
120      frame_listener_ = new FrameListener(service, buffer_item_consumer);
121      buffer_item_consumer->setFrameAvailableListener(frame_listener_);
122    }
123
124    sp<IGraphicBufferProducer> producer;
125    sp<IGraphicBufferConsumer> consumer;
126
127   private:
128    sp<FrameListener> frame_listener_;
129  };
130
131  std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
132};
133
134// A virtual interfaces that abstracts the common BufferQueue operations, so
135// that the test suite can use the same test case to drive different types of
136// transport backends.
137class BufferTransport {
138 public:
139  virtual ~BufferTransport() {}
140
141  virtual int Start() = 0;
142  virtual sp<Surface> CreateSurface() = 0;
143};
144
145// Binder-based buffer transport backend.
146//
147// On Start() a new process will be swapned to run a Binder server that
148// actually consumes the buffer.
149// On CreateSurface() a new Binder BufferQueue will be created, which the
150// service holds the concrete binder node of the IGraphicBufferProducer while
151// sending the binder proxy to the client. In another word, the producer side
152// operations are carried out process while the consumer side operations are
153// carried out within the BufferTransportService's own process.
154class BinderBufferTransport : public BufferTransport {
155 public:
156  BinderBufferTransport() {}
157
158  int Start() override {
159    sp<IServiceManager> sm = defaultServiceManager();
160    service_ = sm->getService(kBinderService);
161    if (service_ == nullptr) {
162      LOG(ERROR) << "Failed to get the benchmark service.";
163      return -EIO;
164    }
165
166    LOG(INFO) << "Binder server is ready for client.";
167    return 0;
168  }
169
170  sp<Surface> CreateSurface() override {
171    Parcel data;
172    Parcel reply;
173    int error = service_->transact(CREATE_BUFFER_QUEUE, data, &reply);
174    if (error != NO_ERROR) {
175      LOG(ERROR) << "Failed to get buffer queue over binder.";
176      return nullptr;
177    }
178
179    sp<IBinder> binder;
180    error = reply.readNullableStrongBinder(&binder);
181    if (error != NO_ERROR) {
182      LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder.";
183      return nullptr;
184    }
185
186    auto producer = interface_cast<IGraphicBufferProducer>(binder);
187    if (producer == nullptr) {
188      LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder.";
189      return nullptr;
190    }
191
192    sp<Surface> surface = new Surface(producer, /*controlledByApp=*/true);
193
194    // Set buffer dimension.
195    ANativeWindow* window = static_cast<ANativeWindow*>(surface.get());
196    ANativeWindow_setBuffersGeometry(window, kBufferWidth, kBufferHeight,
197                                     kBufferFormat);
198
199    return surface;
200  }
201
202 private:
203  sp<IBinder> service_;
204};
205
206class DvrApi {
207 public:
208  DvrApi() {
209    handle_ = dlopen("libdvr.so", RTLD_NOW | RTLD_LOCAL);
210    CHECK(handle_);
211
212    auto dvr_get_api =
213        reinterpret_cast<decltype(&dvrGetApi)>(dlsym(handle_, "dvrGetApi"));
214    int ret = dvr_get_api(&api_, sizeof(api_), /*version=*/1);
215
216    CHECK(ret == 0);
217  }
218
219  ~DvrApi() { dlclose(handle_); }
220
221  const DvrApi_v1& Api() { return api_; }
222
223 private:
224  void* handle_ = nullptr;
225  DvrApi_v1 api_;
226};
227
228// BufferHub/PDX-based buffer transport.
229//
230// On Start() a new thread will be swapned to run an epoll polling thread which
231// minics the behavior of a compositor. Similar to Binder-based backend, the
232// buffer available handler is also a no-op: Buffer gets acquired and released
233// immediately.
234// On CreateSurface() a pair of dvr::ProducerQueue and dvr::ConsumerQueue will
235// be created. The epoll thread holds on the consumer queue and dequeues buffer
236// from it; while the producer queue will be wrapped in a Surface and returned
237// to test suite.
238class BufferHubTransport : public BufferTransport {
239 public:
240  virtual ~BufferHubTransport() {
241    stopped_.store(true);
242    if (reader_thread_.joinable()) {
243      reader_thread_.join();
244    }
245  }
246
247  int Start() override {
248    int ret = epoll_fd_.Create();
249    if (ret < 0) {
250      LOG(ERROR) << "Failed to create epoll fd: %s", strerror(-ret);
251      return -1;
252    }
253
254    // Create the reader thread.
255    reader_thread_ = std::thread([this]() {
256      int ret = dvr_.Api().PerformanceSetSchedulerPolicy(0, "graphics");
257      if (ret < 0) {
258        LOG(ERROR) << "Failed to set scheduler policy, ret=" << ret;
259        return;
260      }
261
262      stopped_.store(false);
263      LOG(INFO) << "Reader Thread Running...";
264
265      while (!stopped_.load()) {
266        std::array<epoll_event, kMaxQueueCounts> events;
267
268        // Don't sleep forever so that we will have a chance to wake up.
269        const int ret = epoll_fd_.Wait(events.data(), events.size(),
270                                       /*timeout=*/100);
271        if (ret < 0) {
272          LOG(ERROR) << "Error polling consumer queues.";
273          continue;
274        }
275        if (ret == 0) {
276          continue;
277        }
278
279        const int num_events = ret;
280        for (int i = 0; i < num_events; i++) {
281          uint32_t index = events[i].data.u32;
282          dvr_.Api().ReadBufferQueueHandleEvents(
283              buffer_queues_[index]->GetReadQueue());
284        }
285      }
286
287      LOG(INFO) << "Reader Thread Exiting...";
288    });
289
290    return 0;
291  }
292
293  sp<Surface> CreateSurface() override {
294    auto new_queue = std::make_shared<BufferQueueHolder>();
295    if (!new_queue->IsReady()) {
296      LOG(ERROR) << "Failed to create BufferHub-based BufferQueue.";
297      return nullptr;
298    }
299
300    // Set buffer dimension.
301    ANativeWindow_setBuffersGeometry(new_queue->GetSurface(), kBufferWidth,
302                                     kBufferHeight, kBufferFormat);
303
304    // Use the next position as buffer_queue index.
305    uint32_t index = buffer_queues_.size();
306    epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u32 = index}};
307    int queue_fd =
308        dvr_.Api().ReadBufferQueueGetEventFd(new_queue->GetReadQueue());
309    const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, queue_fd, &event);
310    if (ret < 0) {
311      LOG(ERROR) << "Failed to track consumer queue: " << strerror(-ret)
312                 << ", consumer queue fd: " << queue_fd;
313      return nullptr;
314    }
315
316    buffer_queues_.push_back(new_queue);
317    ANativeWindow_acquire(new_queue->GetSurface());
318    return static_cast<Surface*>(new_queue->GetSurface());
319  }
320
321 private:
322  struct BufferQueueHolder {
323    BufferQueueHolder() {
324      int ret = 0;
325      ret = dvr_.Api().WriteBufferQueueCreate(
326          kBufferWidth, kBufferHeight, kBufferFormat, kBufferLayer,
327          kBufferUsage, 0, sizeof(DvrNativeBufferMetadata), &write_queue_);
328      if (ret < 0) {
329        LOG(ERROR) << "Failed to create write buffer queue, ret=" << ret;
330        return;
331      }
332
333      ret = dvr_.Api().WriteBufferQueueCreateReadQueue(write_queue_,
334                                                       &read_queue_);
335      if (ret < 0) {
336        LOG(ERROR) << "Failed to create read buffer queue, ret=" << ret;
337        return;
338      }
339
340      ret = dvr_.Api().ReadBufferQueueSetBufferAvailableCallback(
341          read_queue_, BufferAvailableCallback, this);
342      if (ret < 0) {
343        LOG(ERROR) << "Failed to create buffer available callback, ret=" << ret;
344        return;
345      }
346
347      ret =
348          dvr_.Api().WriteBufferQueueGetANativeWindow(write_queue_, &surface_);
349      if (ret < 0) {
350        LOG(ERROR) << "Failed to create surface, ret=" << ret;
351        return;
352      }
353    }
354
355    static void BufferAvailableCallback(void* context) {
356      BufferQueueHolder* thiz = static_cast<BufferQueueHolder*>(context);
357      thiz->HandleBufferAvailable();
358    }
359
360    DvrReadBufferQueue* GetReadQueue() { return read_queue_; }
361
362    ANativeWindow* GetSurface() { return surface_; }
363
364    bool IsReady() {
365      return write_queue_ != nullptr && read_queue_ != nullptr &&
366             surface_ != nullptr;
367    }
368
369    void HandleBufferAvailable() {
370      int ret = 0;
371      DvrNativeBufferMetadata meta;
372      DvrReadBuffer* buffer = nullptr;
373      DvrNativeBufferMetadata metadata;
374      int acquire_fence = kInvalidFence;
375
376      {
377        ATRACE_NAME("AcquireBuffer");
378        ret = dvr_.Api().ReadBufferQueueAcquireBuffer(
379            read_queue_, 0, &buffer, &metadata, &acquire_fence);
380      }
381      if (ret < 0) {
382        LOG(ERROR) << "Failed to acquire consumer buffer, error: " << ret;
383        return;
384      }
385
386      if (buffer != nullptr) {
387        ATRACE_NAME("ReleaseBuffer");
388        ret = dvr_.Api().ReadBufferQueueReleaseBuffer(read_queue_, buffer,
389                                                      &meta, kInvalidFence);
390      }
391      if (ret < 0) {
392        LOG(ERROR) << "Failed to release consumer buffer, error: " << ret;
393      }
394    }
395
396   private:
397    DvrWriteBufferQueue* write_queue_ = nullptr;
398    DvrReadBufferQueue* read_queue_ = nullptr;
399    ANativeWindow* surface_ = nullptr;
400  };
401
402  static DvrApi dvr_;
403  std::atomic<bool> stopped_;
404  std::thread reader_thread_;
405
406  dvr::EpollFileDescriptor epoll_fd_;
407  std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
408};
409
410DvrApi BufferHubTransport::dvr_ = {};
411
412enum TransportType {
413  kBinderBufferTransport,
414  kBufferHubTransport,
415};
416
417// Main test suite, which supports two transport backend: 1) BinderBufferQueue,
418// 2) BufferHubQueue. The test case drives the producer end of both transport
419// backend by queuing buffers into the buffer queue by using ANativeWindow API.
420class BufferTransportBenchmark : public ::benchmark::Fixture {
421 public:
422  void SetUp(State& state) override {
423    if (state.thread_index == 0) {
424      const int transport = state.range(0);
425      switch (transport) {
426        case kBinderBufferTransport:
427          transport_.reset(new BinderBufferTransport);
428          break;
429        case kBufferHubTransport:
430          transport_.reset(new BufferHubTransport);
431          break;
432        default:
433          CHECK(false) << "Unknown test case.";
434          break;
435      }
436
437      CHECK(transport_);
438      const int ret = transport_->Start();
439      CHECK_EQ(ret, 0);
440
441      LOG(INFO) << "Transport backend running, transport=" << transport << ".";
442
443      // Create surfaces for each thread.
444      surfaces_.resize(state.threads);
445      for (int i = 0; i < state.threads; i++) {
446        // Common setup every thread needs.
447        surfaces_[i] = transport_->CreateSurface();
448        CHECK(surfaces_[i]);
449
450        LOG(INFO) << "Surface initialized on thread " << i << ".";
451      }
452    }
453  }
454
455  void TearDown(State& state) override {
456    if (state.thread_index == 0) {
457      surfaces_.clear();
458      transport_.reset();
459      LOG(INFO) << "Tear down benchmark.";
460    }
461  }
462
463 protected:
464  std::unique_ptr<BufferTransport> transport_;
465  std::vector<sp<Surface>> surfaces_;
466};
467
468BENCHMARK_DEFINE_F(BufferTransportBenchmark, Producers)(State& state) {
469  ANativeWindow* window = nullptr;
470  ANativeWindow_Buffer buffer;
471  int32_t error = 0;
472  double total_gain_buffer_us = 0;
473  double total_post_buffer_us = 0;
474  int iterations = 0;
475
476  while (state.KeepRunning()) {
477    if (window == nullptr) {
478      CHECK(surfaces_[state.thread_index]);
479      window = static_cast<ANativeWindow*>(surfaces_[state.thread_index].get());
480
481      // Lock buffers a couple time from the queue, so that we have the buffer
482      // allocated.
483      for (int i = 0; i < kQueueDepth; i++) {
484        error = ANativeWindow_lock(window, &buffer,
485                                   /*inOutDirtyBounds=*/nullptr);
486        CHECK_EQ(error, 0);
487        error = ANativeWindow_unlockAndPost(window);
488        CHECK_EQ(error, 0);
489      }
490    }
491
492    {
493      ATRACE_NAME("GainBuffer");
494      auto t1 = std::chrono::high_resolution_clock::now();
495      error = ANativeWindow_lock(window, &buffer,
496                                 /*inOutDirtyBounds=*/nullptr);
497      auto t2 = std::chrono::high_resolution_clock::now();
498      std::chrono::duration<double, std::micro> delta_us = t2 - t1;
499      total_gain_buffer_us += delta_us.count();
500    }
501    CHECK_EQ(error, 0);
502
503    {
504      ATRACE_NAME("PostBuffer");
505      auto t1 = std::chrono::high_resolution_clock::now();
506      error = ANativeWindow_unlockAndPost(window);
507      auto t2 = std::chrono::high_resolution_clock::now();
508      std::chrono::duration<double, std::micro> delta_us = t2 - t1;
509      total_post_buffer_us += delta_us.count();
510    }
511    CHECK_EQ(error, 0);
512
513    iterations++;
514  }
515
516  state.counters["gain_buffer_us"] = ::benchmark::Counter(
517      total_gain_buffer_us / iterations, ::benchmark::Counter::kAvgThreads);
518  state.counters["post_buffer_us"] = ::benchmark::Counter(
519      total_post_buffer_us / iterations, ::benchmark::Counter::kAvgThreads);
520  state.counters["producer_us"] = ::benchmark::Counter(
521      (total_gain_buffer_us + total_post_buffer_us) / iterations,
522      ::benchmark::Counter::kAvgThreads);
523}
524
525BENCHMARK_REGISTER_F(BufferTransportBenchmark, Producers)
526    ->Unit(::benchmark::kMicrosecond)
527    ->Ranges({{kBinderBufferTransport, kBufferHubTransport}})
528    ->ThreadRange(1, 32);
529
530static void runBinderServer() {
531  ProcessState::self()->setThreadPoolMaxThreadCount(0);
532  ProcessState::self()->startThreadPool();
533
534  sp<IServiceManager> sm = defaultServiceManager();
535  sp<BufferTransportService> service = new BufferTransportService;
536  sm->addService(kBinderService, service, false);
537
538  LOG(INFO) << "Binder server running...";
539
540  while (true) {
541    int stat, retval;
542    retval = wait(&stat);
543    if (retval == -1 && errno == ECHILD) {
544      break;
545    }
546  }
547
548  LOG(INFO) << "Service Exiting...";
549}
550
551// To run binder-based benchmark, use:
552// adb shell buffer_transport_benchmark \
553//   --benchmark_filter="BufferTransportBenchmark/ContinuousLoad/0/"
554//
555// To run bufferhub-based benchmark, use:
556// adb shell buffer_transport_benchmark \
557//   --benchmark_filter="BufferTransportBenchmark/ContinuousLoad/1/"
558int main(int argc, char** argv) {
559  bool tracing_enabled = false;
560
561  // Parse arguments in addition to "--benchmark_filter" paramters.
562  for (int i = 1; i < argc; i++) {
563    if (std::string(argv[i]) == "--help") {
564      std::cout << "Usage: binderThroughputTest [OPTIONS]" << std::endl;
565      std::cout << "\t--trace: Enable systrace logging." << std::endl;
566      return 0;
567    }
568    if (std::string(argv[i]) == "--trace") {
569      tracing_enabled = true;
570      continue;
571    }
572  }
573
574  // Setup ATRACE/systrace based on command line.
575  atrace_setup();
576  atrace_set_tracing_enabled(tracing_enabled);
577
578  pid_t pid = fork();
579  if (pid == 0) {
580    // Child, i.e. the client side.
581    ProcessState::self()->startThreadPool();
582
583    ::benchmark::Initialize(&argc, argv);
584    ::benchmark::RunSpecifiedBenchmarks();
585  } else {
586    LOG(INFO) << "Benchmark process pid: " << pid;
587    runBinderServer();
588  }
589}
590