GaugeMetricProducer.cpp revision aaadf6663fdc8787947d67bc14841d20094bd072
1/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8*      http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG false  // STOPSHIP if true
18#include "Log.h"
19
20#include "GaugeMetricProducer.h"
21#include "guardrail/StatsdStats.h"
22#include "stats_log_util.h"
23
24#include <cutils/log.h>
25
26using android::util::FIELD_COUNT_REPEATED;
27using android::util::FIELD_TYPE_BOOL;
28using android::util::FIELD_TYPE_FLOAT;
29using android::util::FIELD_TYPE_INT32;
30using android::util::FIELD_TYPE_INT64;
31using android::util::FIELD_TYPE_MESSAGE;
32using android::util::FIELD_TYPE_STRING;
33using android::util::ProtoOutputStream;
34using std::map;
35using std::string;
36using std::unordered_map;
37using std::vector;
38using std::make_shared;
39using std::shared_ptr;
40
41namespace android {
42namespace os {
43namespace statsd {
44
45// for StatsLogReport
46const int FIELD_ID_ID = 1;
47const int FIELD_ID_GAUGE_METRICS = 8;
48// for GaugeMetricDataWrapper
49const int FIELD_ID_DATA = 1;
50// for GaugeMetricData
51const int FIELD_ID_DIMENSION_IN_WHAT = 1;
52const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
53const int FIELD_ID_BUCKET_INFO = 3;
54// for GaugeBucketInfo
55const int FIELD_ID_START_BUCKET_ELAPSED_NANOS = 1;
56const int FIELD_ID_END_BUCKET_ELAPSED_NANOS = 2;
57const int FIELD_ID_ATOM = 3;
58const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4;
59
60GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
61                                         const int conditionIndex,
62                                         const sp<ConditionWizard>& wizard,
63                                         const int pullTagId, const uint64_t startTimeNs,
64                                         shared_ptr<StatsPullerManager> statsPullerManager)
65    : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard),
66      mStatsPullerManager(statsPullerManager),
67      mPullTagId(pullTagId) {
68    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
69    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
70    int64_t bucketSizeMills = 0;
71    if (metric.has_bucket()) {
72        bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
73    } else {
74        bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
75    }
76    mBucketSizeNs = bucketSizeMills * 1000000;
77
78    mSamplingType = metric.sampling_type();
79    if (!metric.gauge_fields_filter().include_all()) {
80        translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
81    }
82
83    // TODO: use UidMap if uid->pkg_name is required
84    if (metric.has_dimensions_in_what()) {
85        translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
86        mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
87    }
88
89    if (metric.has_dimensions_in_condition()) {
90        translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
91    }
92
93    if (metric.links().size() > 0) {
94        for (const auto& link : metric.links()) {
95            Metric2Condition mc;
96            mc.conditionId = link.condition();
97            translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
98            translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
99            mMetric2ConditionLinks.push_back(mc);
100        }
101    }
102    mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
103
104    // Kicks off the puller immediately.
105    if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
106        mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
107    }
108
109    VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
110         (long long)mBucketSizeNs, (long long)mStartTimeNs);
111}
112
113// for testing
114GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
115                                         const int conditionIndex,
116                                         const sp<ConditionWizard>& wizard, const int pullTagId,
117                                         const int64_t startTimeNs)
118    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs,
119                          make_shared<StatsPullerManager>()) {
120}
121
122GaugeMetricProducer::~GaugeMetricProducer() {
123    VLOG("~GaugeMetricProducer() called");
124    if (mPullTagId != -1) {
125        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
126    }
127}
128
129void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
130    if (mCurrentSlicedBucket == nullptr ||
131        mCurrentSlicedBucket->size() == 0) {
132        return;
133    }
134
135    fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
136            (unsigned long)mCurrentSlicedBucket->size());
137    if (verbose) {
138        for (const auto& it : *mCurrentSlicedBucket) {
139            fprintf(out, "\t(what)%s\t(condition)%s  %d atoms\n",
140                it.first.getDimensionKeyInWhat().toString().c_str(),
141                it.first.getDimensionKeyInCondition().toString().c_str(),
142                (int)it.second.size());
143        }
144    }
145}
146
147void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
148                                             ProtoOutputStream* protoOutput) {
149    VLOG("gauge metric %lld report now...", (long long)mMetricId);
150
151    flushIfNeededLocked(dumpTimeNs);
152    if (mPastBuckets.empty()) {
153        return;
154    }
155
156    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
157    uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
158
159    for (const auto& pair : mPastBuckets) {
160        const MetricDimensionKey& dimensionKey = pair.first;
161
162        VLOG("  dimension key %s", dimensionKey.toString().c_str());
163        uint64_t wrapperToken =
164                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
165
166        // First fill dimension.
167        uint64_t dimensionToken = protoOutput->start(
168                FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
169        writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), protoOutput);
170        protoOutput->end(dimensionToken);
171
172        if (dimensionKey.hasDimensionKeyInCondition()) {
173            uint64_t dimensionInConditionToken = protoOutput->start(
174                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
175            writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), protoOutput);
176            protoOutput->end(dimensionInConditionToken);
177        }
178
179        // Then fill bucket_info (GaugeBucketInfo).
180        for (const auto& bucket : pair.second) {
181            uint64_t bucketInfoToken = protoOutput->start(
182                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
183            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_NANOS,
184                               (long long)bucket.mBucketStartNs);
185            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_NANOS,
186                               (long long)bucket.mBucketEndNs);
187
188            if (!bucket.mGaugeAtoms.empty()) {
189                uint64_t atomsToken =
190                    protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
191                for (const auto& atom : bucket.mGaugeAtoms) {
192                    writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput);
193                }
194                protoOutput->end(atomsToken);
195                for (const auto& atom : bucket.mGaugeAtoms) {
196                    const bool truncateTimestamp =
197                        android::util::kNotTruncatingTimestampAtomWhiteList.find(mTagId) ==
198                        android::util::kNotTruncatingTimestampAtomWhiteList.end();
199                    int64_t timestampNs =  truncateTimestamp ?
200                        truncateTimestampNsToFiveMinutes(atom.mTimestamps) : atom.mTimestamps;
201                    protoOutput->write(
202                        FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
203                        (long long)timestampNs);
204                }
205            }
206            protoOutput->end(bucketInfoToken);
207            VLOG("\t bucket [%lld - %lld] includes %d atoms.", (long long)bucket.mBucketStartNs,
208                 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeAtoms.size());
209        }
210        protoOutput->end(wrapperToken);
211    }
212    protoOutput->end(protoToken);
213
214    mPastBuckets.clear();
215    // TODO: Clear mDimensionKeyMap once the report is dumped.
216}
217
218void GaugeMetricProducer::pullLocked() {
219    vector<std::shared_ptr<LogEvent>> allData;
220    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
221        ALOGE("Stats puller failed for tag: %d", mPullTagId);
222        return;
223    }
224    for (const auto& data : allData) {
225        onMatchedLogEventLocked(0, *data);
226    }
227}
228
229void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
230                                                   const uint64_t eventTime) {
231    VLOG("Metric %lld onConditionChanged", (long long)mMetricId);
232    flushIfNeededLocked(eventTime);
233    mCondition = conditionMet;
234
235    // Push mode. No need to proactively pull the gauge data.
236    if (mPullTagId == -1) {
237        return;
238    }
239
240    bool triggerPuller = false;
241    switch(mSamplingType) {
242        // When the metric wants to do random sampling and there is already one gauge atom for the
243        // current bucket, do not do it again.
244        case GaugeMetric::RANDOM_ONE_SAMPLE: {
245            triggerPuller = mCondition && mCurrentSlicedBucket->empty();
246            break;
247        }
248        case GaugeMetric::ALL_CONDITION_CHANGES: {
249            triggerPuller = true;
250            break;
251        }
252        default:
253            break;
254    }
255    if (!triggerPuller) {
256        return;
257    }
258
259    vector<std::shared_ptr<LogEvent>> allData;
260    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
261        ALOGE("Stats puller failed for tag: %d", mPullTagId);
262        return;
263    }
264    for (const auto& data : allData) {
265        onMatchedLogEventLocked(0, *data);
266    }
267    flushIfNeededLocked(eventTime);
268}
269
270void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
271    VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
272}
273
274std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
275    if (mFieldMatchers.size() > 0) {
276        std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>();
277        filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
278        return gaugeFields;
279    } else {
280        return std::make_shared<vector<FieldValue>>(event.getValues());
281    }
282}
283
284void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
285    std::lock_guard<std::mutex> lock(mMutex);
286    if (allData.size() == 0) {
287        return;
288    }
289    for (const auto& data : allData) {
290        onMatchedLogEventLocked(0, *data);
291    }
292}
293
294bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
295    if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
296        return false;
297    }
298    // 1. Report the tuple count if the tuple count > soft limit
299    if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
300        size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
301        StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
302        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
303        if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
304            ALOGE("GaugeMetric %lld dropping data for dimension key %s",
305                (long long)mMetricId, newKey.toString().c_str());
306            return true;
307        }
308    }
309
310    return false;
311}
312
313void GaugeMetricProducer::onMatchedLogEventInternalLocked(
314        const size_t matcherIndex, const MetricDimensionKey& eventKey,
315        const ConditionKey& conditionKey, bool condition,
316        const LogEvent& event) {
317    if (condition == false) {
318        return;
319    }
320    uint64_t eventTimeNs = event.GetElapsedTimestampNs();
321    mTagId = event.GetTagId();
322    if (eventTimeNs < mCurrentBucketStartTimeNs) {
323        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
324             (long long)mCurrentBucketStartTimeNs);
325        return;
326    }
327    flushIfNeededLocked(eventTimeNs);
328
329    // When gauge metric wants to randomly sample the output atom, we just simply use the first
330    // gauge in the given bucket.
331    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
332        mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
333        return;
334    }
335    if (hitGuardRailLocked(eventKey)) {
336        return;
337    }
338    GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs);
339    (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
340    // Anomaly detection on gauge metric only works when there is one numeric
341    // field specified.
342    if (mAnomalyTrackers.size() > 0) {
343        if (gaugeAtom.mFields->size() == 1) {
344            const Value& value = gaugeAtom.mFields->begin()->mValue;
345            long gaugeVal = 0;
346            if (value.getType() == INT) {
347                gaugeVal = (long)value.int_value;
348            } else if (value.getType() == LONG) {
349                gaugeVal = value.long_value;
350            }
351            for (auto& tracker : mAnomalyTrackers) {
352                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
353                                                 gaugeVal);
354            }
355        }
356    }
357}
358
359void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
360    for (const auto& slice : *mCurrentSlicedBucket) {
361        if (slice.second.empty()) {
362            continue;
363        }
364        const Value& value = slice.second.front().mFields->front().mValue;
365        long gaugeVal = 0;
366        if (value.getType() == INT) {
367            gaugeVal = (long)value.int_value;
368        } else if (value.getType() == LONG) {
369            gaugeVal = value.long_value;
370        }
371        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
372    }
373}
374
375void GaugeMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
376    flushIfNeededLocked(dropTimeNs);
377    mPastBuckets.clear();
378}
379
380// When a new matched event comes in, we check if event falls into the current
381// bucket. If not, flush the old counter to past buckets and initialize the new
382// bucket.
383// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
384// the GaugeMetricProducer while holding the lock.
385void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
386    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
387
388    if (eventTimeNs < currentBucketEndTimeNs) {
389        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
390             (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
391        return;
392    }
393
394    flushCurrentBucketLocked(eventTimeNs);
395
396    // Adjusts the bucket start and end times.
397    int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
398    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
399    mCurrentBucketNum += numBucketsForward;
400    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
401         (long long)mCurrentBucketStartTimeNs);
402}
403
404void GaugeMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
405    uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
406
407    GaugeBucket info;
408    info.mBucketStartNs = mCurrentBucketStartTimeNs;
409    if (eventTimeNs < fullBucketEndTimeNs) {
410        info.mBucketEndNs = eventTimeNs;
411    } else {
412        info.mBucketEndNs = fullBucketEndTimeNs;
413    }
414
415    for (const auto& slice : *mCurrentSlicedBucket) {
416        info.mGaugeAtoms = slice.second;
417        auto& bucketList = mPastBuckets[slice.first];
418        bucketList.push_back(info);
419        VLOG("gauge metric %lld, dump key value: %s", (long long)mMetricId,
420             slice.first.toString().c_str());
421    }
422
423    // If we have anomaly trackers, we need to update the partial bucket values.
424    if (mAnomalyTrackers.size() > 0) {
425        updateCurrentSlicedBucketForAnomaly();
426
427        if (eventTimeNs > fullBucketEndTimeNs) {
428            // This is known to be a full bucket, so send this data to the anomaly tracker.
429            for (auto& tracker : mAnomalyTrackers) {
430                tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
431            }
432            mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
433        }
434    }
435
436    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
437}
438
439size_t GaugeMetricProducer::byteSizeLocked() const {
440    size_t totalSize = 0;
441    for (const auto& pair : mPastBuckets) {
442        totalSize += pair.second.size() * kBucketSize;
443    }
444    return totalSize;
445}
446
447}  // namespace statsd
448}  // namespace os
449}  // namespace android
450