service_impl.cc revision aab53553d6ad0ab821e45328487f9d88912fb71c
1/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "src/tracing/core/service_impl.h"
18
19#include <inttypes.h>
20#include <string.h>
21
22#include <algorithm>
23
24#include "perfetto/base/logging.h"
25#include "perfetto/base/task_runner.h"
26#include "perfetto/protozero/proto_utils.h"
27#include "perfetto/tracing/core/consumer.h"
28#include "perfetto/tracing/core/data_source_config.h"
29#include "perfetto/tracing/core/producer.h"
30#include "perfetto/tracing/core/shared_memory.h"
31#include "perfetto/tracing/core/trace_packet.h"
32
33// General note: this class must assume that Producers are malicious and will
34// try to crash / exploit this class. We can trust pointers because they come
35// from the IPC layer, but we should never assume that that the producer calls
36// come in the right order or their arguments are sane / within bounds.
37
38namespace perfetto {
39
40// TODO(fmayer): add ThreadChecker everywhere.
41
42using protozero::proto_utils::ParseVarInt;
43
44namespace {
45constexpr size_t kSystemPageSize = 4096;
46constexpr size_t kDefaultShmSize = kSystemPageSize * 16;  // 64 KB.
47constexpr size_t kMaxShmSize = kSystemPageSize * 1024;    // 4 MB.
48constexpr int kMaxBuffersPerConsumer = 128;
49}  // namespace
50
51// static
52std::unique_ptr<Service> Service::CreateInstance(
53    std::unique_ptr<SharedMemory::Factory> shm_factory,
54    base::TaskRunner* task_runner) {
55  return std::unique_ptr<Service>(
56      new ServiceImpl(std::move(shm_factory), task_runner));
57}
58
59ServiceImpl::ServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,
60                         base::TaskRunner* task_runner)
61    : task_runner_(task_runner),
62      shm_factory_(std::move(shm_factory)),
63      buffer_ids_(kMaxTraceBufferID),
64      weak_ptr_factory_(this) {
65  PERFETTO_DCHECK(task_runner_);
66}
67
68ServiceImpl::~ServiceImpl() {
69  // TODO(fmayer): handle teardown of all Producer.
70}
71
72std::unique_ptr<Service::ProducerEndpoint> ServiceImpl::ConnectProducer(
73    Producer* producer,
74    size_t shared_buffer_size_hint_bytes) {
75  const ProducerID id = ++last_producer_id_;
76  PERFETTO_DLOG("Producer %" PRIu64 " connected", id);
77  size_t shm_size = std::min(shared_buffer_size_hint_bytes, kMaxShmSize);
78  if (shm_size % kSystemPageSize || shm_size < kSystemPageSize)
79    shm_size = kDefaultShmSize;
80
81  // TODO(primiano): right now Create() will suicide in case of OOM if the mmap
82  // fails. We should instead gracefully fail the request and tell the client
83  // to go away.
84  auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
85  std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
86      id, this, task_runner_, producer, std::move(shared_memory)));
87  auto it_and_inserted = producers_.emplace(id, endpoint.get());
88  PERFETTO_DCHECK(it_and_inserted.second);
89  task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
90  return std::move(endpoint);
91}
92
93void ServiceImpl::DisconnectProducer(ProducerID id) {
94  PERFETTO_DLOG("Producer %" PRIu64 " disconnected", id);
95  PERFETTO_DCHECK(producers_.count(id));
96  producers_.erase(id);
97}
98
99ServiceImpl::ProducerEndpointImpl* ServiceImpl::GetProducer(
100    ProducerID id) const {
101  auto it = producers_.find(id);
102  if (it == producers_.end())
103    return nullptr;
104  return it->second;
105}
106
107std::unique_ptr<Service::ConsumerEndpoint> ServiceImpl::ConnectConsumer(
108    Consumer* consumer) {
109  PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
110  std::unique_ptr<ConsumerEndpointImpl> endpoint(
111      new ConsumerEndpointImpl(this, task_runner_, consumer));
112  auto it_and_inserted = consumers_.emplace(endpoint.get());
113  PERFETTO_DCHECK(it_and_inserted.second);
114  task_runner_->PostTask(std::bind(&Consumer::OnConnect, endpoint->consumer_));
115  return std::move(endpoint);
116}
117
118void ServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
119  PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
120  PERFETTO_DCHECK(consumers_.count(consumer));
121
122  // TODO(primiano) : Check that this is safe (what happens if there are
123  // ReadBuffers() calls posted in the meantime? They need to become noop).
124  if (consumer->tracing_session_id_)
125    FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
126  consumers_.erase(consumer);
127}
128
129void ServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
130                                const TraceConfig& cfg) {
131  PERFETTO_DLOG("Enabling tracing for consumer %p",
132                reinterpret_cast<void*>(consumer));
133  if (consumer->tracing_session_id_) {
134    PERFETTO_DLOG(
135        "A Consumer is trying to EnableTracing() but another tracing session "
136        "is already active (forgot a call to FreeBuffers() ?)");
137    // TODO(primiano): make this a bool and return failure to the IPC layer.
138    return;
139  }
140
141  if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
142    PERFETTO_DLOG("Too many buffers configured (%d)", cfg.buffers_size());
143    return;  // TODO(primiano): signal failure to the caller.
144  }
145
146  const TracingSessionID tsid = ++last_tracing_session_id_;
147  TracingSession& ts =
148      tracing_sessions_.emplace(tsid, TracingSession(cfg)).first->second;
149
150  PERFETTO_DLOG("Starting tracing session %" PRIu64, tsid);
151
152  // Initialize the log buffers.
153  bool did_allocate_all_buffers = true;
154
155  // Allocate the trace buffers. Also create a map to translate a consumer
156  // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
157  // corresponding BufferID, which is a global ID namespace for the service and
158  // all producers.
159  ts.buffers_index.reserve(cfg.buffers_size());
160  for (int i = 0; i < cfg.buffers_size(); i++) {
161    const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
162    BufferID global_id = buffer_ids_.Allocate();
163    if (!global_id) {
164      did_allocate_all_buffers = false;  // We ran out of indexes.
165      break;
166    }
167    ts.buffers_index.push_back(global_id);
168    auto it_and_inserted = buffers_.emplace(global_id, TraceBuffer());
169    PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
170    TraceBuffer& trace_buffer = it_and_inserted.first->second;
171    // TODO(primiano): make TraceBuffer::kBufferPageSize dynamic.
172    const size_t buf_size = buffer_cfg.size_kb() * 1024u;
173    if (!trace_buffer.Create(buf_size)) {
174      did_allocate_all_buffers = false;
175      break;
176    }
177  }
178
179  // This can happen if either:
180  // - All the kMaxTraceBufferID slots are taken.
181  // - OOM, or, more relistically, we exhausted virtual memory.
182  // In any case, free all the previously allocated buffers and abort.
183  // TODO(fmayer): add a test to cover this case, this is quite subtle.
184  if (!did_allocate_all_buffers) {
185    for (BufferID global_id : ts.buffers_index) {
186      buffer_ids_.Free(global_id);
187      buffers_.erase(global_id);
188    }
189    tracing_sessions_.erase(tsid);
190    return;  // TODO(primiano): return failure condition?
191  }
192
193  consumer->tracing_session_id_ = tsid;
194
195  // Enable the data sources on the producers.
196  for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
197    // Scan all the registered data sources with a matching name.
198    auto range = data_sources_.equal_range(cfg_data_source.config().name());
199    for (auto it = range.first; it != range.second; it++) {
200      const RegisteredDataSource& reg_data_source = it->second;
201      ProducerEndpointImpl* producer = GetProducer(reg_data_source.producer_id);
202      if (!producer) {
203        PERFETTO_DCHECK(false);  // Something in the unregistration is broken.
204        continue;
205      }
206      CreateDataSourceInstanceForProducer(cfg_data_source, producer, &ts);
207    }
208  }
209
210  // Trigger delayed task if the trace is time limited.
211  if (cfg.duration_ms()) {
212    auto weak_this = weak_ptr_factory_.GetWeakPtr();
213    task_runner_->PostDelayedTask(
214        [weak_this, tsid] {
215          if (weak_this)
216            weak_this->DisableTracing(tsid);
217        },
218        cfg.duration_ms());
219  }
220}
221
222// DisableTracing just stops the data sources but doesn't free up any buffer.
223// This is to allow the consumer to freeze the buffers (by stopping the trace)
224// and then drain the buffers. The actual teardown of the TracingSession happens
225// in FreeBuffers().
226void ServiceImpl::DisableTracing(TracingSessionID tsid) {
227  PERFETTO_DLOG("Disabling tracing session %" PRIu64, tsid);
228  TracingSession* tracing_session = GetTracingSession(tsid);
229  if (!tracing_session) {
230    // Can happen if the consumer calls this before EnableTracing() or after
231    // FreeBuffers().
232    PERFETTO_DLOG("Couldn't find tracing session %" PRIu64, tsid);
233    return;
234  }
235  for (const auto& data_source_inst : tracing_session->data_source_instances) {
236    const ProducerID producer_id = data_source_inst.first;
237    const DataSourceInstanceID ds_inst_id = data_source_inst.second;
238    ProducerEndpointImpl* producer = GetProducer(producer_id);
239    if (!producer)
240      continue;  // This could legitimately happen if a Producer disconnects.
241    producer->producer_->TearDownDataSourceInstance(ds_inst_id);
242  }
243  tracing_session->data_source_instances.clear();
244
245  // Deliberately NOT removing the session from |tracing_session_|, it's still
246  // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
247}
248
249void ServiceImpl::ReadBuffers(TracingSessionID tsid,
250                              ConsumerEndpointImpl* consumer) {
251  PERFETTO_DLOG("Reading buffers for session %" PRIu64, tsid);
252  TracingSession* tracing_session = GetTracingSession(tsid);
253  if (!tracing_session) {
254    PERFETTO_DLOG(
255        "Consumer invoked ReadBuffers() but no tracing session is active");
256    return;  // TODO(primiano): signal failure?
257  }
258  // TODO(primiano): Most of this code is temporary and we should find a better
259  // solution to bookkeep the log buffer (e.g., an allocator-like freelist)
260  // rather than leveraging the SharedMemoryABI (which is intended only for the
261  // Producer <> Service SMB and not for the TraceBuffer itself).
262  auto weak_consumer = consumer->GetWeakPtr();
263  for (size_t buf_idx = 0; buf_idx < tracing_session->num_buffers();
264       buf_idx++) {
265    auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
266    if (tbuf_iter == buffers_.end()) {
267      PERFETTO_DCHECK(false);
268      continue;
269    }
270    TraceBuffer& tbuf = tbuf_iter->second;
271    SharedMemoryABI& abi = *tbuf.abi;
272    for (size_t i = 0; i < tbuf.num_pages(); i++) {
273      const size_t page_idx = (i + tbuf.cur_page) % tbuf.num_pages();
274      if (abi.is_page_free(page_idx))
275        continue;
276      uint32_t layout = abi.page_layout_dbg(page_idx);
277      size_t num_chunks = abi.GetNumChunksForLayout(layout);
278      for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
279        if (abi.GetChunkState(page_idx, chunk_idx) ==
280            SharedMemoryABI::kChunkFree) {
281          continue;
282        }
283        auto chunk = abi.GetChunkUnchecked(page_idx, layout, chunk_idx);
284        uint16_t num_packets;
285        uint8_t flags;
286        std::tie(num_packets, flags) = chunk.GetPacketCountAndFlags();
287        const uint8_t* ptr = chunk.payload_begin();
288
289        // shared_ptr is really a workardound for the fact that is not possible
290        // to std::move() move-only types in labmdas until C++17.
291        std::shared_ptr<std::vector<TracePacket>> packets(
292            new std::vector<TracePacket>());
293        packets->reserve(num_packets);
294
295        for (size_t pack_idx = 0; pack_idx < num_packets; pack_idx++) {
296          uint64_t pack_size = 0;
297          ptr = ParseVarInt(ptr, chunk.end(), &pack_size);
298          // TODO(fmayer): stitching, look at the flags.
299          bool skip = (pack_idx == 0 &&
300                       flags & SharedMemoryABI::ChunkHeader::
301                                   kFirstPacketContinuesFromPrevChunk) ||
302                      (pack_idx == num_packets - 1 &&
303                       flags & SharedMemoryABI::ChunkHeader::
304                                   kLastPacketContinuesOnNextChunk);
305
306          PERFETTO_DLOG("  #%-3zu len:%" PRIu64 " skip: %d", pack_idx,
307                        pack_size, skip);
308          if (ptr > chunk.end() - pack_size) {
309            PERFETTO_DLOG("out of bounds!");
310            break;
311          }
312          if (!skip) {
313            packets->emplace_back();
314            packets->back().AddChunk(Chunk(ptr, pack_size));
315          }
316          ptr += pack_size;
317        }  // for(packet)
318        task_runner_->PostTask([weak_consumer, packets]() {
319          if (weak_consumer)
320            weak_consumer->consumer_->OnTraceData(std::move(*packets),
321                                                  true /*has_more*/);
322        });
323      }  // for(chunk)
324    }    // for(page_idx)
325  }      // for(buffer_id)
326  task_runner_->PostTask([weak_consumer]() {
327    if (weak_consumer)
328      weak_consumer->consumer_->OnTraceData(std::vector<TracePacket>(),
329                                            false /*has_more*/);
330  });
331}
332
333void ServiceImpl::FreeBuffers(TracingSessionID tsid) {
334  PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
335  TracingSession* tracing_session = GetTracingSession(tsid);
336  if (!tracing_session) {
337    PERFETTO_DLOG(
338        "Consumer invoked FreeBuffers() but no tracing session is active");
339    return;  // TODO(primiano): signal failure?
340  }
341  DisableTracing(tsid);
342  for (BufferID buffer_id : tracing_session->buffers_index) {
343    buffer_ids_.Free(buffer_id);
344    PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
345    buffers_.erase(buffer_id);
346  }
347  tracing_sessions_.erase(tsid);
348}
349
350void ServiceImpl::RegisterDataSource(ProducerID producer_id,
351                                     DataSourceID ds_id,
352                                     const DataSourceDescriptor& desc) {
353  PERFETTO_DLOG("Producer %" PRIu64
354                " registered data source \"%s\", ID: %" PRIu64,
355                producer_id, desc.name().c_str(), ds_id);
356
357  PERFETTO_DCHECK(!desc.name().empty());
358  data_sources_.emplace(desc.name(),
359                        RegisteredDataSource{producer_id, ds_id, desc});
360
361  // If there are existing tracing sessions, we need to check if the new
362  // data source is enabled by any of them.
363  if (tracing_sessions_.empty())
364    return;
365
366  ProducerEndpointImpl* producer = GetProducer(producer_id);
367  if (!producer) {
368    PERFETTO_DCHECK(false);
369    return;
370  }
371
372  for (auto& iter : tracing_sessions_) {
373    TracingSession& tracing_session = iter.second;
374    for (const TraceConfig::DataSource& cfg_data_source :
375         tracing_session.config.data_sources()) {
376      if (cfg_data_source.config().name() == desc.name())
377        CreateDataSourceInstanceForProducer(cfg_data_source, producer,
378                                            &tracing_session);
379    }
380  }
381}
382
383void ServiceImpl::CreateDataSourceInstanceForProducer(
384    const TraceConfig::DataSource& cfg_data_source,
385    ProducerEndpointImpl* producer,
386    TracingSession* tracing_session) {
387  // TODO(primiano): match against |producer_name_filter| and add tests
388  // for registration ordering (data sources vs consumers).
389
390  // Create a copy of the DataSourceConfig specified in the trace config. This
391  // will be passed to the producer after translating the |target_buffer| id.
392  // The |target_buffer| parameter passed by the consumer in the trace config is
393  // relative to the buffers declared in the same trace config. This has to be
394  // translated to the global BufferID before passing it to the producers, which
395  // don't know anything about tracing sessions and consumers.
396
397  DataSourceConfig ds_config = cfg_data_source.config();  // Deliberate copy.
398  auto relative_buffer_id = ds_config.target_buffer();
399  if (relative_buffer_id >= tracing_session->num_buffers()) {
400    PERFETTO_LOG(
401        "The TraceConfig for DataSource %s specified a traget_buffer out of "
402        "bound (%d). Skipping it.",
403        ds_config.name().c_str(), relative_buffer_id);
404    return;
405  }
406  BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
407  PERFETTO_DCHECK(global_id);
408  ds_config.set_target_buffer(global_id);
409
410  DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
411  tracing_session->data_source_instances.emplace(producer->id_, inst_id);
412  PERFETTO_DLOG("Starting data source %s with target buffer %" PRIu16,
413                ds_config.name().c_str(), global_id);
414  producer->producer_->CreateDataSourceInstance(inst_id, ds_config);
415}
416
417void ServiceImpl::CopyProducerPageIntoLogBuffer(ProducerID producer_id,
418                                                BufferID target_buffer_id,
419                                                const uint8_t* src,
420                                                size_t size) {
421  // TODO(fmayer): right now the page_size in the SMB and the trace_buffers_ can
422  // mismatch. Remove the ability to decide the page size on the Producer.
423
424  auto buf_iter = buffers_.find(target_buffer_id);
425  if (buf_iter == buffers_.end()) {
426    PERFETTO_DLOG("Could not find target buffer %u for producer %" PRIu64,
427                  target_buffer_id, producer_id);
428    return;
429  }
430  TraceBuffer& buf = buf_iter->second;
431
432  // TODO(primiano): we should have a set<BufferID> |allowed_target_buffers| in
433  // ProducerEndpointImpl to perform ACL checks and prevent that the Producer
434  // passes a |target_buffer| which is valid, but that we never asked it to use.
435  // Essentially we want to prevent a malicious producer to inject data into a
436  // log buffer that has nothing to do with it.
437
438  PERFETTO_DCHECK(size == kBufferPageSize);
439  uint8_t* dst = buf.get_next_page();
440
441  // TODO(primiano): use sendfile(). Requires to make the tbuf itself
442  // a file descriptor (just use SharedMemory without sharing it).
443  PERFETTO_DLOG(
444      "Copying page %p from producer %" PRIu64 " into buffer %" PRIu16,
445      reinterpret_cast<const void*>(src), producer_id, target_buffer_id);
446  memcpy(dst, src, size);
447}
448
449ServiceImpl::TracingSession* ServiceImpl::GetTracingSession(
450    TracingSessionID tsid) {
451  auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
452  if (it == tracing_sessions_.end())
453    return nullptr;
454  return &it->second;
455}
456
457////////////////////////////////////////////////////////////////////////////////
458// ServiceImpl::ConsumerEndpointImpl implementation
459////////////////////////////////////////////////////////////////////////////////
460
461ServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(ServiceImpl* service,
462                                                        base::TaskRunner*,
463                                                        Consumer* consumer)
464    : service_(service), consumer_(consumer), weak_ptr_factory_(this) {}
465
466ServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
467  consumer_->OnDisconnect();
468  service_->DisconnectConsumer(this);
469}
470
471void ServiceImpl::ConsumerEndpointImpl::EnableTracing(const TraceConfig& cfg) {
472  service_->EnableTracing(this, cfg);
473}
474
475void ServiceImpl::ConsumerEndpointImpl::DisableTracing() {
476  if (tracing_session_id_) {
477    service_->DisableTracing(tracing_session_id_);
478  } else {
479    PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
480  }
481}
482
483void ServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
484  if (tracing_session_id_) {
485    service_->ReadBuffers(tracing_session_id_, this);
486  } else {
487    PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
488  }
489}
490
491void ServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
492  if (tracing_session_id_) {
493    service_->FreeBuffers(tracing_session_id_);
494  } else {
495    PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
496  }
497}
498
499base::WeakPtr<ServiceImpl::ConsumerEndpointImpl>
500ServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
501  return weak_ptr_factory_.GetWeakPtr();
502}
503
504////////////////////////////////////////////////////////////////////////////////
505// ServiceImpl::ProducerEndpointImpl implementation
506////////////////////////////////////////////////////////////////////////////////
507
508ServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
509    ProducerID id,
510    ServiceImpl* service,
511    base::TaskRunner* task_runner,
512    Producer* producer,
513    std::unique_ptr<SharedMemory> shared_memory)
514    : id_(id),
515      service_(service),
516      task_runner_(task_runner),
517      producer_(producer),
518      shared_memory_(std::move(shared_memory)),
519      shmem_abi_(reinterpret_cast<uint8_t*>(shared_memory_->start()),
520                 shared_memory_->size(),
521                 kSystemPageSize) {
522  // TODO(primiano): make the page-size for the SHM dynamic and find a way to
523  // communicate that to the Producer (add a field to the
524  // InitializeConnectionResponse IPC).
525}
526
527ServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
528  producer_->OnDisconnect();
529  service_->DisconnectProducer(id_);
530}
531
532void ServiceImpl::ProducerEndpointImpl::RegisterDataSource(
533    const DataSourceDescriptor& desc,
534    RegisterDataSourceCallback callback) {
535  DataSourceID ds_id = ++last_data_source_id_;
536  if (!desc.name().empty()) {
537    service_->RegisterDataSource(id_, ds_id, desc);
538  } else {
539    PERFETTO_DLOG("Received RegisterDataSource() with empty name");
540    ds_id = 0;
541  }
542  task_runner_->PostTask(std::bind(std::move(callback), ds_id));
543}
544
545void ServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
546    DataSourceID dsid) {
547  PERFETTO_CHECK(dsid);
548  // TODO(primiano): implement the bookkeeping logic.
549}
550
551void ServiceImpl::ProducerEndpointImpl::NotifySharedMemoryUpdate(
552    const std::vector<uint32_t>& changed_pages) {
553  for (uint32_t page_idx : changed_pages) {
554    if (page_idx >= shmem_abi_.num_pages())
555      continue;  // Very likely a malicious producer playing dirty.
556
557    if (!shmem_abi_.is_page_complete(page_idx))
558      continue;
559    if (!shmem_abi_.TryAcquireAllChunksForReading(page_idx))
560      continue;
561
562    // TODO(fmayer): we should start collecting individual chunks from non fully
563    // complete pages after a while.
564
565    service_->CopyProducerPageIntoLogBuffer(
566        id_, shmem_abi_.get_target_buffer(page_idx),
567        shmem_abi_.page_start(page_idx), shmem_abi_.page_size());
568
569    shmem_abi_.ReleaseAllChunksAsFree(page_idx);
570  }
571}
572
573SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
574  return shared_memory_.get();
575}
576
577std::unique_ptr<TraceWriter>
578ServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID) {
579  // TODO(primiano): not implemented yet.
580  // This code path is hit only in in-process configuration, where tracing
581  // Service and Producer are hosted in the same process. It's a use case we
582  // want to support, but not too interesting right now.
583  PERFETTO_CHECK(false);
584}
585
586////////////////////////////////////////////////////////////////////////////////
587// ServiceImpl::TraceBuffer implementation
588////////////////////////////////////////////////////////////////////////////////
589
590ServiceImpl::TraceBuffer::TraceBuffer() = default;
591
592bool ServiceImpl::TraceBuffer::Create(size_t sz) {
593  data = base::PageAllocator::AllocateMayFail(sz);
594  if (!data) {
595    PERFETTO_ELOG("Trace buffer allocation failed (size: %zu, page_size: %zu)",
596                  sz, kBufferPageSize);
597    return false;
598  }
599  size = sz;
600  abi.reset(new SharedMemoryABI(get_page(0), size, kBufferPageSize));
601  return true;
602}
603
604ServiceImpl::TraceBuffer::~TraceBuffer() = default;
605ServiceImpl::TraceBuffer::TraceBuffer(ServiceImpl::TraceBuffer&&) noexcept =
606    default;
607ServiceImpl::TraceBuffer& ServiceImpl::TraceBuffer::operator=(
608    ServiceImpl::TraceBuffer&&) = default;
609
610////////////////////////////////////////////////////////////////////////////////
611// ServiceImpl::TracingSession implementation
612////////////////////////////////////////////////////////////////////////////////
613
614ServiceImpl::TracingSession::TracingSession(const TraceConfig& new_config)
615    : config(new_config) {}
616
617}  // namespace perfetto
618