GaugeMetricProducer.cpp revision 81245fd53a0bd627fa87e3a69dd667c7d6696ede
1/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8*      http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG false  // STOPSHIP if true
18#include "Log.h"
19
20#include "../guardrail/StatsdStats.h"
21#include "GaugeMetricProducer.h"
22#include "../stats_log_util.h"
23
24#include <cutils/log.h>
25
26using android::util::FIELD_COUNT_REPEATED;
27using android::util::FIELD_TYPE_BOOL;
28using android::util::FIELD_TYPE_FLOAT;
29using android::util::FIELD_TYPE_INT32;
30using android::util::FIELD_TYPE_INT64;
31using android::util::FIELD_TYPE_MESSAGE;
32using android::util::FIELD_TYPE_STRING;
33using android::util::ProtoOutputStream;
34using std::map;
35using std::string;
36using std::unordered_map;
37using std::vector;
38using std::make_shared;
39using std::shared_ptr;
40
41namespace android {
42namespace os {
43namespace statsd {
44
45// for StatsLogReport
46const int FIELD_ID_ID = 1;
47const int FIELD_ID_GAUGE_METRICS = 8;
48// for GaugeMetricDataWrapper
49const int FIELD_ID_DATA = 1;
50const int FIELD_ID_SKIPPED = 2;
51const int FIELD_ID_SKIPPED_START = 1;
52const int FIELD_ID_SKIPPED_END = 2;
53// for GaugeMetricData
54const int FIELD_ID_DIMENSION_IN_WHAT = 1;
55const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
56const int FIELD_ID_BUCKET_INFO = 3;
57// for GaugeBucketInfo
58const int FIELD_ID_START_BUCKET_ELAPSED_NANOS = 1;
59const int FIELD_ID_END_BUCKET_ELAPSED_NANOS = 2;
60const int FIELD_ID_ATOM = 3;
61const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4;
62const int FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP = 5;
63
64GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
65                                         const int conditionIndex,
66                                         const sp<ConditionWizard>& wizard, const int pullTagId,
67                                         const int64_t timeBaseNs, const int64_t startTimeNs,
68                                         shared_ptr<StatsPullerManager> statsPullerManager)
69    : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
70      mStatsPullerManager(statsPullerManager),
71      mPullTagId(pullTagId),
72      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
73      mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
74                                          StatsdStats::kAtomDimensionKeySizeLimitMap.end()
75                                  ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
76                                  : StatsdStats::kDimensionKeySizeSoftLimit),
77      mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
78                                          StatsdStats::kAtomDimensionKeySizeLimitMap.end()
79                                  ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
80                                  : StatsdStats::kDimensionKeySizeHardLimit) {
81    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
82    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
83    int64_t bucketSizeMills = 0;
84    if (metric.has_bucket()) {
85        bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
86    } else {
87        bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
88    }
89    mBucketSizeNs = bucketSizeMills * 1000000;
90
91    mSamplingType = metric.sampling_type();
92    if (!metric.gauge_fields_filter().include_all()) {
93        translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
94    }
95
96    // TODO: use UidMap if uid->pkg_name is required
97    if (metric.has_dimensions_in_what()) {
98        translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
99        mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
100    }
101
102    if (metric.has_dimensions_in_condition()) {
103        translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
104    }
105
106    if (metric.links().size() > 0) {
107        for (const auto& link : metric.links()) {
108            Metric2Condition mc;
109            mc.conditionId = link.condition();
110            translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
111            translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
112            mMetric2ConditionLinks.push_back(mc);
113        }
114    }
115    mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
116
117    flushIfNeededLocked(startTimeNs);
118    // Kicks off the puller immediately.
119    if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
120        mStatsPullerManager->RegisterReceiver(
121                mPullTagId, this, getCurrentBucketEndTimeNs(), mBucketSizeNs);
122    }
123
124    VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
125         (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs,
126         mConditionSliced);
127}
128
129// for testing
130GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
131                                         const int conditionIndex,
132                                         const sp<ConditionWizard>& wizard, const int pullTagId,
133                                         const int64_t timeBaseNs, const int64_t startTimeNs)
134    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, timeBaseNs, startTimeNs,
135                          make_shared<StatsPullerManager>()) {
136}
137
138GaugeMetricProducer::~GaugeMetricProducer() {
139    VLOG("~GaugeMetricProducer() called");
140    if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
141        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
142    }
143}
144
145void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
146    if (mCurrentSlicedBucket == nullptr ||
147        mCurrentSlicedBucket->size() == 0) {
148        return;
149    }
150
151    fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
152            (unsigned long)mCurrentSlicedBucket->size());
153    if (verbose) {
154        for (const auto& it : *mCurrentSlicedBucket) {
155            fprintf(out, "\t(what)%s\t(condition)%s  %d atoms\n",
156                it.first.getDimensionKeyInWhat().toString().c_str(),
157                it.first.getDimensionKeyInCondition().toString().c_str(),
158                (int)it.second.size());
159        }
160    }
161}
162
163void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
164                                             const bool include_current_partial_bucket,
165                                             ProtoOutputStream* protoOutput) {
166    VLOG("Gauge metric %lld report now...", (long long)mMetricId);
167    if (include_current_partial_bucket) {
168        flushLocked(dumpTimeNs);
169    } else {
170        flushIfNeededLocked(dumpTimeNs);
171    }
172
173    flushIfNeededLocked(dumpTimeNs);
174    if (mPastBuckets.empty()) {
175        return;
176    }
177
178    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
179    uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
180
181    for (const auto& pair : mSkippedBuckets) {
182        uint64_t wrapperToken =
183                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
184        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
185        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
186        protoOutput->end(wrapperToken);
187    }
188    mSkippedBuckets.clear();
189
190    for (const auto& pair : mPastBuckets) {
191        const MetricDimensionKey& dimensionKey = pair.first;
192
193        VLOG("Gauge dimension key %s", dimensionKey.toString().c_str());
194        uint64_t wrapperToken =
195                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
196
197        // First fill dimension.
198        uint64_t dimensionToken = protoOutput->start(
199                FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
200        writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), protoOutput);
201        protoOutput->end(dimensionToken);
202
203        if (dimensionKey.hasDimensionKeyInCondition()) {
204            uint64_t dimensionInConditionToken = protoOutput->start(
205                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
206            writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), protoOutput);
207            protoOutput->end(dimensionInConditionToken);
208        }
209
210        // Then fill bucket_info (GaugeBucketInfo).
211        for (const auto& bucket : pair.second) {
212            uint64_t bucketInfoToken = protoOutput->start(
213                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
214            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_NANOS,
215                               (long long)bucket.mBucketStartNs);
216            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_NANOS,
217                               (long long)bucket.mBucketEndNs);
218
219            if (!bucket.mGaugeAtoms.empty()) {
220                for (const auto& atom : bucket.mGaugeAtoms) {
221                    uint64_t atomsToken =
222                        protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
223                                           FIELD_ID_ATOM);
224                    writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput);
225                    protoOutput->end(atomsToken);
226                }
227                const bool truncateTimestamp =
228                        android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.find(
229                                mTagId) ==
230                        android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.end();
231                for (const auto& atom : bucket.mGaugeAtoms) {
232                    const int64_t elapsedTimestampNs =  truncateTimestamp ?
233                        truncateTimestampNsToFiveMinutes(atom.mElapsedTimestamps) :
234                            atom.mElapsedTimestamps;
235                    const int64_t wallClockNs = truncateTimestamp ?
236                        truncateTimestampNsToFiveMinutes(atom.mWallClockTimestampNs) :
237                            atom.mWallClockTimestampNs;
238                    protoOutput->write(
239                        FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
240                        (long long)elapsedTimestampNs);
241                    protoOutput->write(
242                        FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED |
243                            FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP,
244                        (long long)wallClockNs);
245                }
246            }
247            protoOutput->end(bucketInfoToken);
248            VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.",
249                 (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs,
250                 (int)bucket.mGaugeAtoms.size());
251        }
252        protoOutput->end(wrapperToken);
253    }
254    protoOutput->end(protoToken);
255
256    mPastBuckets.clear();
257    // TODO: Clear mDimensionKeyMap once the report is dumped.
258}
259
260void GaugeMetricProducer::pullLocked(const int64_t timestampNs) {
261    bool triggerPuller = false;
262    switch(mSamplingType) {
263        // When the metric wants to do random sampling and there is already one gauge atom for the
264        // current bucket, do not do it again.
265        case GaugeMetric::RANDOM_ONE_SAMPLE: {
266            triggerPuller = mCondition && mCurrentSlicedBucket->empty();
267            break;
268        }
269        case GaugeMetric::ALL_CONDITION_CHANGES: {
270            triggerPuller = true;
271            break;
272        }
273        default:
274            break;
275    }
276    if (!triggerPuller) {
277        return;
278    }
279
280    vector<std::shared_ptr<LogEvent>> allData;
281    if (!mStatsPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
282        ALOGE("Gauge Stats puller failed for tag: %d", mPullTagId);
283        return;
284    }
285
286    for (const auto& data : allData) {
287        onMatchedLogEventLocked(0, *data);
288    }
289}
290
291void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
292                                                   const int64_t eventTimeNs) {
293    VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId);
294    flushIfNeededLocked(eventTimeNs);
295    mCondition = conditionMet;
296
297    if (mPullTagId != -1 && mCondition) {
298        pullLocked(eventTimeNs);
299    }  // else: Push mode. No need to proactively pull the gauge data.
300}
301
302void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
303                                                           const int64_t eventTimeNs) {
304    VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId,
305         overallCondition);
306    flushIfNeededLocked(eventTimeNs);
307    // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will
308    // pull for every dimension.
309    mCondition = overallCondition;
310    if (mPullTagId != -1) {
311        pullLocked(eventTimeNs);
312    }  // else: Push mode. No need to proactively pull the gauge data.
313}
314
315std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
316    if (mFieldMatchers.size() > 0) {
317        std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>();
318        filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
319        return gaugeFields;
320    } else {
321        return std::make_shared<vector<FieldValue>>(event.getValues());
322    }
323}
324
325void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
326    std::lock_guard<std::mutex> lock(mMutex);
327    if (allData.size() == 0) {
328        return;
329    }
330    for (const auto& data : allData) {
331        onMatchedLogEventLocked(0, *data);
332    }
333}
334
335bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
336    if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
337        return false;
338    }
339    // 1. Report the tuple count if the tuple count > soft limit
340    if (mCurrentSlicedBucket->size() > mDimensionSoftLimit - 1) {
341        size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
342        StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
343        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
344        if (newTupleCount > mDimensionHardLimit) {
345            ALOGE("GaugeMetric %lld dropping data for dimension key %s",
346                (long long)mMetricId, newKey.toString().c_str());
347            return true;
348        }
349    }
350
351    return false;
352}
353
354void GaugeMetricProducer::onMatchedLogEventInternalLocked(
355        const size_t matcherIndex, const MetricDimensionKey& eventKey,
356        const ConditionKey& conditionKey, bool condition,
357        const LogEvent& event) {
358    if (condition == false) {
359        return;
360    }
361    int64_t eventTimeNs = event.GetElapsedTimestampNs();
362    mTagId = event.GetTagId();
363    if (eventTimeNs < mCurrentBucketStartTimeNs) {
364        VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
365             (long long)mCurrentBucketStartTimeNs);
366        return;
367    }
368    flushIfNeededLocked(eventTimeNs);
369
370    // When gauge metric wants to randomly sample the output atom, we just simply use the first
371    // gauge in the given bucket.
372    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
373        mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
374        return;
375    }
376    if (hitGuardRailLocked(eventKey)) {
377        return;
378    }
379    GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs, getWallClockNs());
380    (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
381    // Anomaly detection on gauge metric only works when there is one numeric
382    // field specified.
383    if (mAnomalyTrackers.size() > 0) {
384        if (gaugeAtom.mFields->size() == 1) {
385            const Value& value = gaugeAtom.mFields->begin()->mValue;
386            long gaugeVal = 0;
387            if (value.getType() == INT) {
388                gaugeVal = (long)value.int_value;
389            } else if (value.getType() == LONG) {
390                gaugeVal = value.long_value;
391            }
392            for (auto& tracker : mAnomalyTrackers) {
393                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
394                                                 gaugeVal);
395            }
396        }
397    }
398}
399
400void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
401    for (const auto& slice : *mCurrentSlicedBucket) {
402        if (slice.second.empty()) {
403            continue;
404        }
405        const Value& value = slice.second.front().mFields->front().mValue;
406        long gaugeVal = 0;
407        if (value.getType() == INT) {
408            gaugeVal = (long)value.int_value;
409        } else if (value.getType() == LONG) {
410            gaugeVal = value.long_value;
411        }
412        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
413    }
414}
415
416void GaugeMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
417    flushIfNeededLocked(dropTimeNs);
418    mPastBuckets.clear();
419}
420
421// When a new matched event comes in, we check if event falls into the current
422// bucket. If not, flush the old counter to past buckets and initialize the new
423// bucket.
424// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
425// the GaugeMetricProducer while holding the lock.
426void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
427    int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
428
429    if (eventTimeNs < currentBucketEndTimeNs) {
430        VLOG("Gauge eventTime is %lld, less than next bucket start time %lld",
431             (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
432        return;
433    }
434
435    flushCurrentBucketLocked(eventTimeNs);
436
437    // Adjusts the bucket start and end times.
438    int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
439    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
440    mCurrentBucketNum += numBucketsForward;
441    VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId,
442         (long long)mCurrentBucketStartTimeNs);
443}
444
445void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
446    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
447
448    GaugeBucket info;
449    info.mBucketStartNs = mCurrentBucketStartTimeNs;
450    if (eventTimeNs < fullBucketEndTimeNs) {
451        info.mBucketEndNs = eventTimeNs;
452    } else {
453        info.mBucketEndNs = fullBucketEndTimeNs;
454    }
455
456    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
457        for (const auto& slice : *mCurrentSlicedBucket) {
458            info.mGaugeAtoms = slice.second;
459            auto& bucketList = mPastBuckets[slice.first];
460            bucketList.push_back(info);
461            VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
462                 slice.first.toString().c_str());
463        }
464    } else {
465        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
466    }
467
468    // If we have anomaly trackers, we need to update the partial bucket values.
469    if (mAnomalyTrackers.size() > 0) {
470        updateCurrentSlicedBucketForAnomaly();
471
472        if (eventTimeNs > fullBucketEndTimeNs) {
473            // This is known to be a full bucket, so send this data to the anomaly tracker.
474            for (auto& tracker : mAnomalyTrackers) {
475                tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
476            }
477            mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
478        }
479    }
480
481    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
482}
483
484size_t GaugeMetricProducer::byteSizeLocked() const {
485    size_t totalSize = 0;
486    for (const auto& pair : mPastBuckets) {
487        totalSize += pair.second.size() * kBucketSize;
488    }
489    return totalSize;
490}
491
492}  // namespace statsd
493}  // namespace os
494}  // namespace android
495