GaugeMetricProducer.cpp revision c40a19d2e43d5de3e036e926bf070220c2c865e6
1/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8*      http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG false  // STOPSHIP if true
18#include "Log.h"
19
20#include "GaugeMetricProducer.h"
21#include "guardrail/StatsdStats.h"
22#include "stats_log_util.h"
23
24#include <cutils/log.h>
25
26using android::util::FIELD_COUNT_REPEATED;
27using android::util::FIELD_TYPE_BOOL;
28using android::util::FIELD_TYPE_FLOAT;
29using android::util::FIELD_TYPE_INT32;
30using android::util::FIELD_TYPE_INT64;
31using android::util::FIELD_TYPE_MESSAGE;
32using android::util::FIELD_TYPE_STRING;
33using android::util::ProtoOutputStream;
34using std::map;
35using std::string;
36using std::unordered_map;
37using std::vector;
38using std::make_shared;
39using std::shared_ptr;
40
41namespace android {
42namespace os {
43namespace statsd {
44
45// for StatsLogReport
46const int FIELD_ID_ID = 1;
47const int FIELD_ID_GAUGE_METRICS = 8;
48// for GaugeMetricDataWrapper
49const int FIELD_ID_DATA = 1;
50// for GaugeMetricData
51const int FIELD_ID_DIMENSION_IN_WHAT = 1;
52const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
53const int FIELD_ID_BUCKET_INFO = 3;
54// for GaugeBucketInfo
55const int FIELD_ID_START_BUCKET_ELAPSED_NANOS = 1;
56const int FIELD_ID_END_BUCKET_ELAPSED_NANOS = 2;
57const int FIELD_ID_ATOM = 3;
58const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4;
59const int FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP = 5;
60
61GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
62                                         const int conditionIndex,
63                                         const sp<ConditionWizard>& wizard,
64                                         const int pullTagId, const uint64_t startTimeNs,
65                                         shared_ptr<StatsPullerManager> statsPullerManager)
66    : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard),
67      mStatsPullerManager(statsPullerManager),
68      mPullTagId(pullTagId) {
69    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
70    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
71    int64_t bucketSizeMills = 0;
72    if (metric.has_bucket()) {
73        bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
74    } else {
75        bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
76    }
77    mBucketSizeNs = bucketSizeMills * 1000000;
78
79    mSamplingType = metric.sampling_type();
80    if (!metric.gauge_fields_filter().include_all()) {
81        translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers);
82    }
83
84    // TODO: use UidMap if uid->pkg_name is required
85    if (metric.has_dimensions_in_what()) {
86        translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
87        mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
88    }
89
90    if (metric.has_dimensions_in_condition()) {
91        translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
92    }
93
94    if (metric.links().size() > 0) {
95        for (const auto& link : metric.links()) {
96            Metric2Condition mc;
97            mc.conditionId = link.condition();
98            translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
99            translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
100            mMetric2ConditionLinks.push_back(mc);
101        }
102    }
103    mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
104
105    // Kicks off the puller immediately.
106    if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
107        mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
108    }
109
110    VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
111         (long long)mBucketSizeNs, (long long)mStartTimeNs);
112}
113
114// for testing
115GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
116                                         const int conditionIndex,
117                                         const sp<ConditionWizard>& wizard, const int pullTagId,
118                                         const int64_t startTimeNs)
119    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs,
120                          make_shared<StatsPullerManager>()) {
121}
122
123GaugeMetricProducer::~GaugeMetricProducer() {
124    VLOG("~GaugeMetricProducer() called");
125    if (mPullTagId != -1) {
126        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
127    }
128}
129
130void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
131    if (mCurrentSlicedBucket == nullptr ||
132        mCurrentSlicedBucket->size() == 0) {
133        return;
134    }
135
136    fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId,
137            (unsigned long)mCurrentSlicedBucket->size());
138    if (verbose) {
139        for (const auto& it : *mCurrentSlicedBucket) {
140            fprintf(out, "\t(what)%s\t(condition)%s  %d atoms\n",
141                it.first.getDimensionKeyInWhat().toString().c_str(),
142                it.first.getDimensionKeyInCondition().toString().c_str(),
143                (int)it.second.size());
144        }
145    }
146}
147
148void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
149                                             ProtoOutputStream* protoOutput) {
150    VLOG("gauge metric %lld report now...", (long long)mMetricId);
151
152    flushIfNeededLocked(dumpTimeNs);
153    if (mPastBuckets.empty()) {
154        return;
155    }
156
157    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
158    uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
159
160    for (const auto& pair : mPastBuckets) {
161        const MetricDimensionKey& dimensionKey = pair.first;
162
163        VLOG("  dimension key %s", dimensionKey.toString().c_str());
164        uint64_t wrapperToken =
165                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
166
167        // First fill dimension.
168        uint64_t dimensionToken = protoOutput->start(
169                FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
170        writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), protoOutput);
171        protoOutput->end(dimensionToken);
172
173        if (dimensionKey.hasDimensionKeyInCondition()) {
174            uint64_t dimensionInConditionToken = protoOutput->start(
175                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
176            writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), protoOutput);
177            protoOutput->end(dimensionInConditionToken);
178        }
179
180        // Then fill bucket_info (GaugeBucketInfo).
181        for (const auto& bucket : pair.second) {
182            uint64_t bucketInfoToken = protoOutput->start(
183                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
184            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_NANOS,
185                               (long long)bucket.mBucketStartNs);
186            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_NANOS,
187                               (long long)bucket.mBucketEndNs);
188
189            if (!bucket.mGaugeAtoms.empty()) {
190                for (const auto& atom : bucket.mGaugeAtoms) {
191                    uint64_t atomsToken =
192                        protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
193                                           FIELD_ID_ATOM);
194                    writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput);
195                    protoOutput->end(atomsToken);
196                }
197                const bool truncateTimestamp =
198                        android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.find(
199                                mTagId) ==
200                        android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.end();
201                const int64_t wall_clock_ns = truncateTimestamp ?
202                    truncateTimestampNsToFiveMinutes(getWallClockNs()) : getWallClockNs();
203                for (const auto& atom : bucket.mGaugeAtoms) {
204                    int64_t timestampNs =  truncateTimestamp ?
205                        truncateTimestampNsToFiveMinutes(atom.mTimestamps) : atom.mTimestamps;
206                    protoOutput->write(
207                        FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP,
208                        (long long)timestampNs);
209                    protoOutput->write(
210                        FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED |
211                            FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP,
212                        (long long)wall_clock_ns);
213                }
214            }
215            protoOutput->end(bucketInfoToken);
216            VLOG("\t bucket [%lld - %lld] includes %d atoms.", (long long)bucket.mBucketStartNs,
217                 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeAtoms.size());
218        }
219        protoOutput->end(wrapperToken);
220    }
221    protoOutput->end(protoToken);
222
223    mPastBuckets.clear();
224    // TODO: Clear mDimensionKeyMap once the report is dumped.
225}
226
227void GaugeMetricProducer::pullLocked() {
228    vector<std::shared_ptr<LogEvent>> allData;
229    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
230        ALOGE("Stats puller failed for tag: %d", mPullTagId);
231        return;
232    }
233    for (const auto& data : allData) {
234        onMatchedLogEventLocked(0, *data);
235    }
236}
237
238void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
239                                                   const uint64_t eventTime) {
240    VLOG("Metric %lld onConditionChanged", (long long)mMetricId);
241    flushIfNeededLocked(eventTime);
242    mCondition = conditionMet;
243
244    // Push mode. No need to proactively pull the gauge data.
245    if (mPullTagId == -1) {
246        return;
247    }
248
249    bool triggerPuller = false;
250    switch(mSamplingType) {
251        // When the metric wants to do random sampling and there is already one gauge atom for the
252        // current bucket, do not do it again.
253        case GaugeMetric::RANDOM_ONE_SAMPLE: {
254            triggerPuller = mCondition && mCurrentSlicedBucket->empty();
255            break;
256        }
257        case GaugeMetric::ALL_CONDITION_CHANGES: {
258            triggerPuller = true;
259            break;
260        }
261        default:
262            break;
263    }
264    if (!triggerPuller) {
265        return;
266    }
267
268    vector<std::shared_ptr<LogEvent>> allData;
269    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
270        ALOGE("Stats puller failed for tag: %d", mPullTagId);
271        return;
272    }
273    for (const auto& data : allData) {
274        onMatchedLogEventLocked(0, *data);
275    }
276    flushIfNeededLocked(eventTime);
277}
278
279void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
280    VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
281}
282
283std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
284    if (mFieldMatchers.size() > 0) {
285        std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>();
286        filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get());
287        return gaugeFields;
288    } else {
289        return std::make_shared<vector<FieldValue>>(event.getValues());
290    }
291}
292
293void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
294    std::lock_guard<std::mutex> lock(mMutex);
295    if (allData.size() == 0) {
296        return;
297    }
298    for (const auto& data : allData) {
299        onMatchedLogEventLocked(0, *data);
300    }
301}
302
303bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
304    if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
305        return false;
306    }
307    // 1. Report the tuple count if the tuple count > soft limit
308    if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
309        size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
310        StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
311        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
312        if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
313            ALOGE("GaugeMetric %lld dropping data for dimension key %s",
314                (long long)mMetricId, newKey.toString().c_str());
315            return true;
316        }
317    }
318
319    return false;
320}
321
322void GaugeMetricProducer::onMatchedLogEventInternalLocked(
323        const size_t matcherIndex, const MetricDimensionKey& eventKey,
324        const ConditionKey& conditionKey, bool condition,
325        const LogEvent& event) {
326    if (condition == false) {
327        return;
328    }
329    uint64_t eventTimeNs = event.GetElapsedTimestampNs();
330    mTagId = event.GetTagId();
331    if (eventTimeNs < mCurrentBucketStartTimeNs) {
332        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
333             (long long)mCurrentBucketStartTimeNs);
334        return;
335    }
336    flushIfNeededLocked(eventTimeNs);
337
338    // When gauge metric wants to randomly sample the output atom, we just simply use the first
339    // gauge in the given bucket.
340    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
341        mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
342        return;
343    }
344    if (hitGuardRailLocked(eventKey)) {
345        return;
346    }
347    GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs);
348    (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
349    // Anomaly detection on gauge metric only works when there is one numeric
350    // field specified.
351    if (mAnomalyTrackers.size() > 0) {
352        if (gaugeAtom.mFields->size() == 1) {
353            const Value& value = gaugeAtom.mFields->begin()->mValue;
354            long gaugeVal = 0;
355            if (value.getType() == INT) {
356                gaugeVal = (long)value.int_value;
357            } else if (value.getType() == LONG) {
358                gaugeVal = value.long_value;
359            }
360            for (auto& tracker : mAnomalyTrackers) {
361                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
362                                                 gaugeVal);
363            }
364        }
365    }
366}
367
368void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
369    for (const auto& slice : *mCurrentSlicedBucket) {
370        if (slice.second.empty()) {
371            continue;
372        }
373        const Value& value = slice.second.front().mFields->front().mValue;
374        long gaugeVal = 0;
375        if (value.getType() == INT) {
376            gaugeVal = (long)value.int_value;
377        } else if (value.getType() == LONG) {
378            gaugeVal = value.long_value;
379        }
380        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
381    }
382}
383
384void GaugeMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
385    flushIfNeededLocked(dropTimeNs);
386    mPastBuckets.clear();
387}
388
389// When a new matched event comes in, we check if event falls into the current
390// bucket. If not, flush the old counter to past buckets and initialize the new
391// bucket.
392// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
393// the GaugeMetricProducer while holding the lock.
394void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
395    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
396
397    if (eventTimeNs < currentBucketEndTimeNs) {
398        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
399             (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
400        return;
401    }
402
403    flushCurrentBucketLocked(eventTimeNs);
404
405    // Adjusts the bucket start and end times.
406    int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
407    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
408    mCurrentBucketNum += numBucketsForward;
409    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
410         (long long)mCurrentBucketStartTimeNs);
411}
412
413void GaugeMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
414    uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
415
416    GaugeBucket info;
417    info.mBucketStartNs = mCurrentBucketStartTimeNs;
418    if (eventTimeNs < fullBucketEndTimeNs) {
419        info.mBucketEndNs = eventTimeNs;
420    } else {
421        info.mBucketEndNs = fullBucketEndTimeNs;
422    }
423
424    for (const auto& slice : *mCurrentSlicedBucket) {
425        info.mGaugeAtoms = slice.second;
426        auto& bucketList = mPastBuckets[slice.first];
427        bucketList.push_back(info);
428        VLOG("gauge metric %lld, dump key value: %s", (long long)mMetricId,
429             slice.first.toString().c_str());
430    }
431
432    // If we have anomaly trackers, we need to update the partial bucket values.
433    if (mAnomalyTrackers.size() > 0) {
434        updateCurrentSlicedBucketForAnomaly();
435
436        if (eventTimeNs > fullBucketEndTimeNs) {
437            // This is known to be a full bucket, so send this data to the anomaly tracker.
438            for (auto& tracker : mAnomalyTrackers) {
439                tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
440            }
441            mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
442        }
443    }
444
445    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
446}
447
448size_t GaugeMetricProducer::byteSizeLocked() const {
449    size_t totalSize = 0;
450    for (const auto& pair : mPastBuckets) {
451        totalSize += pair.second.size() * kBucketSize;
452    }
453    return totalSize;
454}
455
456}  // namespace statsd
457}  // namespace os
458}  // namespace android
459