GaugeMetricProducer.cpp revision 27785a8a4a684c831c18f7189a6fa1b98c3573e6
1/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8*      http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG false  // STOPSHIP if true
18#include "Log.h"
19
20#include "GaugeMetricProducer.h"
21#include "guardrail/StatsdStats.h"
22#include "dimension.h"
23#include "stats_log_util.h"
24
25#include <cutils/log.h>
26
27using android::util::FIELD_COUNT_REPEATED;
28using android::util::FIELD_TYPE_BOOL;
29using android::util::FIELD_TYPE_FLOAT;
30using android::util::FIELD_TYPE_INT32;
31using android::util::FIELD_TYPE_INT64;
32using android::util::FIELD_TYPE_MESSAGE;
33using android::util::FIELD_TYPE_STRING;
34using android::util::ProtoOutputStream;
35using std::map;
36using std::string;
37using std::unordered_map;
38using std::vector;
39using std::make_shared;
40using std::shared_ptr;
41
42namespace android {
43namespace os {
44namespace statsd {
45
46// for StatsLogReport
47const int FIELD_ID_ID = 1;
48const int FIELD_ID_START_REPORT_NANOS = 2;
49const int FIELD_ID_END_REPORT_NANOS = 3;
50const int FIELD_ID_GAUGE_METRICS = 8;
51// for GaugeMetricDataWrapper
52const int FIELD_ID_DATA = 1;
53// for GaugeMetricData
54const int FIELD_ID_DIMENSION_IN_WHAT = 1;
55const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
56const int FIELD_ID_BUCKET_INFO = 3;
57// for GaugeBucketInfo
58const int FIELD_ID_START_BUCKET_NANOS = 1;
59const int FIELD_ID_END_BUCKET_NANOS = 2;
60const int FIELD_ID_ATOM = 3;
61const int FIELD_ID_TIMESTAMP = 4;
62
63GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
64                                         const int conditionIndex,
65                                         const sp<ConditionWizard>& wizard,
66                                         const int pullTagId, const uint64_t startTimeNs,
67                                         shared_ptr<StatsPullerManager> statsPullerManager)
68    : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard),
69      mStatsPullerManager(statsPullerManager),
70      mPullTagId(pullTagId) {
71    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
72    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
73    int64_t bucketSizeMills = 0;
74    if (metric.has_bucket()) {
75        bucketSizeMills = TimeUnitToBucketSizeInMillis(metric.bucket());
76    } else {
77        bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
78    }
79    mBucketSizeNs = bucketSizeMills * 1000000;
80
81    mSamplingType = metric.sampling_type();
82    mFieldFilter = metric.gauge_fields_filter();
83
84    // TODO: use UidMap if uid->pkg_name is required
85    mDimensionsInWhat = metric.dimensions_in_what();
86    mDimensionsInCondition = metric.dimensions_in_condition();
87
88    if (metric.links().size() > 0) {
89        mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
90                               metric.links().end());
91    }
92    mConditionSliced = (metric.links().size() > 0)||
93        (mDimensionsInCondition.has_field() && mDimensionsInCondition.child_size() > 0);
94
95    // Kicks off the puller immediately.
96    if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
97        mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
98    }
99
100    VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
101         (long long)mBucketSizeNs, (long long)mStartTimeNs);
102}
103
104// for testing
105GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
106                                         const int conditionIndex,
107                                         const sp<ConditionWizard>& wizard, const int pullTagId,
108                                         const int64_t startTimeNs)
109    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs,
110                          make_shared<StatsPullerManager>()) {
111}
112
113GaugeMetricProducer::~GaugeMetricProducer() {
114    VLOG("~GaugeMetricProducer() called");
115    if (mPullTagId != -1) {
116        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
117    }
118}
119
120void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, StatsLogReport* report) {
121    flushIfNeededLocked(dumpTimeNs);
122    ProtoOutputStream pbOutput;
123    onDumpReportLocked(dumpTimeNs, &pbOutput);
124    parseProtoOutputStream(pbOutput, report);
125}
126
127void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
128                                             ProtoOutputStream* protoOutput) {
129    VLOG("gauge metric %lld report now...", (long long)mMetricId);
130
131    flushIfNeededLocked(dumpTimeNs);
132    if (mPastBuckets.empty()) {
133        return;
134    }
135
136    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
137    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
138    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
139
140    for (const auto& pair : mPastBuckets) {
141        const MetricDimensionKey& dimensionKey = pair.first;
142
143        VLOG("  dimension key %s", dimensionKey.c_str());
144        long long wrapperToken =
145                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
146
147        // First fill dimension.
148        long long dimensionToken = protoOutput->start(
149                FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
150        writeDimensionsValueProtoToStream(
151            dimensionKey.getDimensionKeyInWhat().getDimensionsValue(), protoOutput);
152        protoOutput->end(dimensionToken);
153
154        if (dimensionKey.hasDimensionKeyInCondition()) {
155            long long dimensionInConditionToken = protoOutput->start(
156                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
157            writeDimensionsValueProtoToStream(
158                dimensionKey.getDimensionKeyInCondition().getDimensionsValue(), protoOutput);
159            protoOutput->end(dimensionInConditionToken);
160        }
161
162        // Then fill bucket_info (GaugeBucketInfo).
163        for (const auto& bucket : pair.second) {
164            long long bucketInfoToken = protoOutput->start(
165                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
166            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
167                               (long long)bucket.mBucketStartNs);
168            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
169                               (long long)bucket.mBucketEndNs);
170
171            if (!bucket.mGaugeAtoms.empty()) {
172                long long atomsToken =
173                    protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
174                for (const auto& atom : bucket.mGaugeAtoms) {
175                    writeFieldValueTreeToStream(*atom.mFields, protoOutput);
176                }
177                protoOutput->end(atomsToken);
178
179                for (const auto& atom : bucket.mGaugeAtoms) {
180                    protoOutput->write(FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_TIMESTAMP,
181                                       (long long)atom.mTimestamps);
182                }
183            }
184            protoOutput->end(bucketInfoToken);
185            VLOG("\t bucket [%lld - %lld] includes %d atoms.", (long long)bucket.mBucketStartNs,
186                 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeAtoms.size());
187        }
188        protoOutput->end(wrapperToken);
189    }
190    protoOutput->end(protoToken);
191    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
192
193    mPastBuckets.clear();
194    // TODO: Clear mDimensionKeyMap once the report is dumped.
195}
196
197void GaugeMetricProducer::pullLocked() {
198    vector<std::shared_ptr<LogEvent>> allData;
199    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
200        ALOGE("Stats puller failed for tag: %d", mPullTagId);
201        return;
202    }
203    for (const auto& data : allData) {
204        onMatchedLogEventLocked(0, *data);
205    }
206}
207
208void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
209                                                   const uint64_t eventTime) {
210    VLOG("Metric %lld onConditionChanged", (long long)mMetricId);
211    flushIfNeededLocked(eventTime);
212    mCondition = conditionMet;
213
214    // Push mode. No need to proactively pull the gauge data.
215    if (mPullTagId == -1) {
216        return;
217    }
218
219    bool triggerPuller = false;
220    switch(mSamplingType) {
221        // When the metric wants to do random sampling and there is already one gauge atom for the
222        // current bucket, do not do it again.
223        case GaugeMetric::RANDOM_ONE_SAMPLE: {
224            triggerPuller = mCondition && mCurrentSlicedBucket->empty();
225            break;
226        }
227        case GaugeMetric::ALL_CONDITION_CHANGES: {
228            triggerPuller = true;
229            break;
230        }
231        default:
232            break;
233    }
234    if (!triggerPuller) {
235        return;
236    }
237
238    vector<std::shared_ptr<LogEvent>> allData;
239    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
240        ALOGE("Stats puller failed for tag: %d", mPullTagId);
241        return;
242    }
243    for (const auto& data : allData) {
244        onMatchedLogEventLocked(0, *data);
245    }
246    flushIfNeededLocked(eventTime);
247}
248
249void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
250    VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
251}
252
253std::shared_ptr<FieldValueMap> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
254    std::shared_ptr<FieldValueMap> gaugeFields =
255        std::make_shared<FieldValueMap>(event.getFieldValueMap());
256    if (!mFieldFilter.include_all()) {
257        filterFields(mFieldFilter.fields(), gaugeFields.get());
258    }
259    return gaugeFields;
260}
261
262void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
263    std::lock_guard<std::mutex> lock(mMutex);
264    if (allData.size() == 0) {
265        return;
266    }
267    for (const auto& data : allData) {
268        onMatchedLogEventLocked(0, *data);
269    }
270}
271
272bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
273    if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
274        return false;
275    }
276    // 1. Report the tuple count if the tuple count > soft limit
277    if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
278        size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
279        StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
280        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
281        if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
282            ALOGE("GaugeMetric %lld dropping data for dimension key %s",
283                (long long)mMetricId, newKey.c_str());
284            return true;
285        }
286    }
287
288    return false;
289}
290
291void GaugeMetricProducer::onMatchedLogEventInternalLocked(
292        const size_t matcherIndex, const MetricDimensionKey& eventKey,
293        const ConditionKey& conditionKey, bool condition,
294        const LogEvent& event) {
295    if (condition == false) {
296        return;
297    }
298    uint64_t eventTimeNs = event.GetTimestampNs();
299    if (eventTimeNs < mCurrentBucketStartTimeNs) {
300        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
301             (long long)mCurrentBucketStartTimeNs);
302        return;
303    }
304    flushIfNeededLocked(eventTimeNs);
305
306    // When gauge metric wants to randomly sample the output atom, we just simply use the first
307    // gauge in the given bucket.
308    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
309        mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
310        return;
311    }
312    if (hitGuardRailLocked(eventKey)) {
313        return;
314    }
315    GaugeAtom gaugeAtom;
316    gaugeAtom.mFields = getGaugeFields(event);
317    gaugeAtom.mTimestamps = eventTimeNs;
318    (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
319    // Anomaly detection on gauge metric only works when there is one numeric
320    // field specified.
321    if (mAnomalyTrackers.size() > 0) {
322        if (gaugeAtom.mFields->size() == 1) {
323            const DimensionsValue& dimensionsValue = gaugeAtom.mFields->begin()->second;
324            long gaugeVal = 0;
325            if (dimensionsValue.has_value_int()) {
326                gaugeVal = (long)dimensionsValue.value_int();
327            } else if (dimensionsValue.has_value_long()) {
328                gaugeVal = dimensionsValue.value_long();
329            }
330            for (auto& tracker : mAnomalyTrackers) {
331                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
332                                                 gaugeVal);
333            }
334        }
335    }
336}
337
338void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
339    status_t err = NO_ERROR;
340    for (const auto& slice : *mCurrentSlicedBucket) {
341        if (slice.second.empty() || slice.second.front().mFields->empty()) {
342            continue;
343        }
344        const DimensionsValue& dimensionsValue = slice.second.front().mFields->begin()->second;
345        long gaugeVal = 0;
346        if (dimensionsValue.has_value_int()) {
347            gaugeVal = (long)dimensionsValue.value_int();
348        } else if (dimensionsValue.has_value_long()) {
349            gaugeVal = dimensionsValue.value_long();
350        }
351        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
352    }
353}
354
355// When a new matched event comes in, we check if event falls into the current
356// bucket. If not, flush the old counter to past buckets and initialize the new
357// bucket.
358// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
359// the GaugeMetricProducer while holding the lock.
360void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
361    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
362
363    if (eventTimeNs < currentBucketEndTimeNs) {
364        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
365             (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
366        return;
367    }
368
369    flushCurrentBucketLocked(eventTimeNs);
370
371    // Adjusts the bucket start and end times.
372    int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
373    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
374    mCurrentBucketNum += numBucketsForward;
375    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
376         (long long)mCurrentBucketStartTimeNs);
377}
378
379void GaugeMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
380    uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
381
382    GaugeBucket info;
383    info.mBucketStartNs = mCurrentBucketStartTimeNs;
384    if (eventTimeNs < fullBucketEndTimeNs) {
385        info.mBucketEndNs = eventTimeNs;
386    } else {
387        info.mBucketEndNs = fullBucketEndTimeNs;
388    }
389    info.mBucketNum = mCurrentBucketNum;
390
391    for (const auto& slice : *mCurrentSlicedBucket) {
392        info.mGaugeAtoms = slice.second;
393        auto& bucketList = mPastBuckets[slice.first];
394        bucketList.push_back(info);
395        VLOG("gauge metric %lld, dump key value: %s", (long long)mMetricId, slice.first.c_str());
396    }
397
398    // If we have anomaly trackers, we need to update the partial bucket values.
399    if (mAnomalyTrackers.size() > 0) {
400        updateCurrentSlicedBucketForAnomaly();
401
402        if (eventTimeNs > fullBucketEndTimeNs) {
403            // This is known to be a full bucket, so send this data to the anomaly tracker.
404            for (auto& tracker : mAnomalyTrackers) {
405                tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
406            }
407            mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
408        }
409    }
410
411    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
412}
413
414size_t GaugeMetricProducer::byteSizeLocked() const {
415    size_t totalSize = 0;
416    for (const auto& pair : mPastBuckets) {
417        totalSize += pair.second.size() * kBucketSize;
418    }
419    return totalSize;
420}
421
422}  // namespace statsd
423}  // namespace os
424}  // namespace android
425