GaugeMetricProducer.cpp revision b814481ad1f8d0e429d799b1571a6272e1a7f6c5
1/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8*      http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG false  // STOPSHIP if true
18#include "Log.h"
19
20#include "GaugeMetricProducer.h"
21#include "guardrail/StatsdStats.h"
22#include "dimension.h"
23#include "stats_log_util.h"
24
25#include <cutils/log.h>
26
27using android::util::FIELD_COUNT_REPEATED;
28using android::util::FIELD_TYPE_BOOL;
29using android::util::FIELD_TYPE_FLOAT;
30using android::util::FIELD_TYPE_INT32;
31using android::util::FIELD_TYPE_INT64;
32using android::util::FIELD_TYPE_MESSAGE;
33using android::util::FIELD_TYPE_STRING;
34using android::util::ProtoOutputStream;
35using std::map;
36using std::string;
37using std::unordered_map;
38using std::vector;
39using std::make_shared;
40using std::shared_ptr;
41
42namespace android {
43namespace os {
44namespace statsd {
45
46// for StatsLogReport
47const int FIELD_ID_ID = 1;
48const int FIELD_ID_START_REPORT_NANOS = 2;
49const int FIELD_ID_END_REPORT_NANOS = 3;
50const int FIELD_ID_GAUGE_METRICS = 8;
51// for GaugeMetricDataWrapper
52const int FIELD_ID_DATA = 1;
53// for GaugeMetricData
54const int FIELD_ID_DIMENSION = 1;
55const int FIELD_ID_BUCKET_INFO = 2;
56// for GaugeBucketInfo
57const int FIELD_ID_START_BUCKET_NANOS = 1;
58const int FIELD_ID_END_BUCKET_NANOS = 2;
59const int FIELD_ID_ATOM = 3;
60
61GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
62                                         const int conditionIndex,
63                                         const sp<ConditionWizard>& wizard, const int atomTagId,
64                                         const int pullTagId, const uint64_t startTimeNs,
65                                         shared_ptr<StatsPullerManager> statsPullerManager)
66    : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard),
67      mStatsPullerManager(statsPullerManager),
68      mPullTagId(pullTagId),
69      mAtomTagId(atomTagId) {
70    mCurrentSlicedBucket = std::make_shared<DimToGaugeFieldsMap>();
71    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
72    int64_t bucketSizeMills = 0;
73    if (metric.has_bucket()) {
74        bucketSizeMills = TimeUnitToBucketSizeInMillis(metric.bucket());
75    } else {
76        bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
77    }
78    mBucketSizeNs = bucketSizeMills * 1000000;
79
80    mFieldFilter = metric.gauge_fields_filter();
81
82    // TODO: use UidMap if uid->pkg_name is required
83    mDimensions = metric.dimensions();
84
85    if (metric.links().size() > 0) {
86        mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
87                               metric.links().end());
88        mConditionSliced = true;
89    }
90
91    // Kicks off the puller immediately.
92    if (mPullTagId != -1) {
93        mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
94    }
95
96    VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
97         (long long)mBucketSizeNs, (long long)mStartTimeNs);
98}
99
100// for testing
101GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
102                                         const int conditionIndex,
103                                         const sp<ConditionWizard>& wizard, const int pullTagId,
104                                         const int atomTagId, const int64_t startTimeNs)
105    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs,
106                          make_shared<StatsPullerManager>()) {
107}
108
109GaugeMetricProducer::~GaugeMetricProducer() {
110    VLOG("~GaugeMetricProducer() called");
111    if (mPullTagId != -1) {
112        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
113    }
114}
115
116void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, StatsLogReport* report) {
117    flushIfNeededLocked(dumpTimeNs);
118}
119
120void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
121                                             ProtoOutputStream* protoOutput) {
122    VLOG("gauge metric %lld report now...", (long long)mMetricId);
123
124    flushIfNeededLocked(dumpTimeNs);
125
126    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
127    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
128    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
129
130    for (const auto& pair : mPastBuckets) {
131        const HashableDimensionKey& hashableKey = pair.first;
132
133        VLOG("  dimension key %s", hashableKey.c_str());
134        long long wrapperToken =
135                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
136
137        // First fill dimension.
138        long long dimensionToken = protoOutput->start(
139                FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
140        writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput);
141        protoOutput->end(dimensionToken);
142
143        // Then fill bucket_info (GaugeBucketInfo).
144        for (const auto& bucket : pair.second) {
145            long long bucketInfoToken = protoOutput->start(
146                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
147            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
148                               (long long)bucket.mBucketStartNs);
149            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
150                               (long long)bucket.mBucketEndNs);
151            long long atomToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM);
152            writeFieldValueTreeToStream(*bucket.mGaugeFields, protoOutput);
153            protoOutput->end(atomToken);
154            protoOutput->end(bucketInfoToken);
155            VLOG("\t bucket [%lld - %lld] includes %d gauge fields.", (long long)bucket.mBucketStartNs,
156                 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeFields->size());
157        }
158        protoOutput->end(wrapperToken);
159    }
160    protoOutput->end(protoToken);
161    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
162
163    mPastBuckets.clear();
164    mStartTimeNs = mCurrentBucketStartTimeNs;
165    // TODO: Clear mDimensionKeyMap once the report is dumped.
166}
167
168void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
169                                                   const uint64_t eventTime) {
170    VLOG("Metric %lld onConditionChanged", (long long)mMetricId);
171    flushIfNeededLocked(eventTime);
172    mCondition = conditionMet;
173
174    // Push mode. No need to proactively pull the gauge data.
175    if (mPullTagId == -1) {
176        return;
177    }
178    // No need to pull again. Either scheduled pull or condition on true happened
179    if (!mCondition) {
180        return;
181    }
182    // Already have gauge metric for the current bucket, do not do it again.
183    if (mCurrentSlicedBucket->size() > 0) {
184        return;
185    }
186    vector<std::shared_ptr<LogEvent>> allData;
187    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
188        ALOGE("Stats puller failed for tag: %d", mPullTagId);
189        return;
190    }
191    for (const auto& data : allData) {
192        onMatchedLogEventLocked(0, *data);
193    }
194    flushIfNeededLocked(eventTime);
195}
196
197void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
198    VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
199}
200
201std::shared_ptr<FieldValueMap> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
202    std::shared_ptr<FieldValueMap> gaugeFields =
203        std::make_shared<FieldValueMap>(event.getFieldValueMap());
204    if (!mFieldFilter.include_all()) {
205        filterFields(mFieldFilter.fields(), gaugeFields.get());
206    }
207    return gaugeFields;
208}
209
210void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
211    std::lock_guard<std::mutex> lock(mMutex);
212    if (allData.size() == 0) {
213        return;
214    }
215    for (const auto& data : allData) {
216        onMatchedLogEventLocked(0, *data);
217    }
218}
219
220bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
221    if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
222        return false;
223    }
224    // 1. Report the tuple count if the tuple count > soft limit
225    if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
226        size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
227        StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
228        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
229        if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
230            ALOGE("GaugeMetric %lld dropping data for dimension key %s",
231                (long long)mMetricId, newKey.c_str());
232            return true;
233        }
234    }
235
236    return false;
237}
238
239void GaugeMetricProducer::onMatchedLogEventInternalLocked(
240        const size_t matcherIndex, const HashableDimensionKey& eventKey,
241        const ConditionKey& conditionKey, bool condition,
242        const LogEvent& event) {
243    if (condition == false) {
244        return;
245    }
246    uint64_t eventTimeNs = event.GetTimestampNs();
247    if (eventTimeNs < mCurrentBucketStartTimeNs) {
248        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
249             (long long)mCurrentBucketStartTimeNs);
250        return;
251    }
252    flushIfNeededLocked(eventTimeNs);
253
254    // For gauge metric, we just simply use the first gauge in the given bucket.
255    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end()) {
256        return;
257    }
258    std::shared_ptr<FieldValueMap> gaugeFields = getGaugeFields(event);
259    if (hitGuardRailLocked(eventKey)) {
260        return;
261    }
262    (*mCurrentSlicedBucket)[eventKey] = gaugeFields;
263    // Anomaly detection on gauge metric only works when there is one numeric
264    // field specified.
265    if (mAnomalyTrackers.size() > 0) {
266        if (gaugeFields->size() == 1) {
267            const DimensionsValue& dimensionsValue = gaugeFields->begin()->second;
268            long gaugeVal = 0;
269            if (dimensionsValue.has_value_int()) {
270                gaugeVal = (long)dimensionsValue.value_int();
271            } else if (dimensionsValue.has_value_long()) {
272                gaugeVal = dimensionsValue.value_long();
273            }
274            for (auto& tracker : mAnomalyTrackers) {
275                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
276                                                 gaugeVal);
277            }
278        }
279    }
280}
281
282void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
283    mCurrentSlicedBucketForAnomaly->clear();
284    status_t err = NO_ERROR;
285    for (const auto& slice : *mCurrentSlicedBucket) {
286        const DimensionsValue& dimensionsValue = slice.second->begin()->second;
287        long gaugeVal = 0;
288        if (dimensionsValue.has_value_int()) {
289            gaugeVal = (long)dimensionsValue.value_int();
290        } else if (dimensionsValue.has_value_long()) {
291            gaugeVal = dimensionsValue.value_long();
292        }
293        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
294    }
295}
296
297// When a new matched event comes in, we check if event falls into the current
298// bucket. If not, flush the old counter to past buckets and initialize the new
299// bucket.
300// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
301// the GaugeMetricProducer while holding the lock.
302void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
303    if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
304        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
305             (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
306        return;
307    }
308
309    GaugeBucket info;
310    info.mBucketStartNs = mCurrentBucketStartTimeNs;
311    info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
312    info.mBucketNum = mCurrentBucketNum;
313
314    for (const auto& slice : *mCurrentSlicedBucket) {
315        info.mGaugeFields = slice.second;
316        auto& bucketList = mPastBuckets[slice.first];
317        bucketList.push_back(info);
318        VLOG("gauge metric %lld, dump key value: %s",
319            (long long)mMetricId, slice.first.c_str());
320    }
321
322    // Reset counters
323    if (mAnomalyTrackers.size() > 0) {
324        updateCurrentSlicedBucketForAnomaly();
325        for (auto& tracker : mAnomalyTrackers) {
326            tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
327        }
328    }
329
330    mCurrentSlicedBucket = std::make_shared<DimToGaugeFieldsMap>();
331
332    // Adjusts the bucket start time
333    int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
334    mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
335    mCurrentBucketNum += numBucketsForward;
336    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
337         (long long)mCurrentBucketStartTimeNs);
338}
339
340size_t GaugeMetricProducer::byteSizeLocked() const {
341    size_t totalSize = 0;
342    for (const auto& pair : mPastBuckets) {
343        totalSize += pair.second.size() * kBucketSize;
344    }
345    return totalSize;
346}
347
348}  // namespace statsd
349}  // namespace os
350}  // namespace android
351