GaugeMetricProducer.cpp revision 34ea1103a0faa0e01df0e2b0ac1ce98e7ec3e3f1
1/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8*      http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG false  // STOPSHIP if true
18#include "Log.h"
19
20#include "GaugeMetricProducer.h"
21#include "guardrail/StatsdStats.h"
22#include "dimension.h"
23#include "stats_log_util.h"
24
25#include <cutils/log.h>
26
27using android::util::FIELD_COUNT_REPEATED;
28using android::util::FIELD_TYPE_BOOL;
29using android::util::FIELD_TYPE_FLOAT;
30using android::util::FIELD_TYPE_INT32;
31using android::util::FIELD_TYPE_INT64;
32using android::util::FIELD_TYPE_MESSAGE;
33using android::util::FIELD_TYPE_STRING;
34using android::util::ProtoOutputStream;
35using std::map;
36using std::string;
37using std::unordered_map;
38using std::vector;
39using std::make_shared;
40using std::shared_ptr;
41
42namespace android {
43namespace os {
44namespace statsd {
45
46// for StatsLogReport
47const int FIELD_ID_ID = 1;
48const int FIELD_ID_START_REPORT_NANOS = 2;
49const int FIELD_ID_END_REPORT_NANOS = 3;
50const int FIELD_ID_GAUGE_METRICS = 8;
51// for GaugeMetricDataWrapper
52const int FIELD_ID_DATA = 1;
53// for GaugeMetricData
54const int FIELD_ID_DIMENSION_IN_WHAT = 1;
55const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
56const int FIELD_ID_BUCKET_INFO = 3;
57// for GaugeBucketInfo
58const int FIELD_ID_START_BUCKET_NANOS = 1;
59const int FIELD_ID_END_BUCKET_NANOS = 2;
60const int FIELD_ID_ATOM = 3;
61const int FIELD_ID_TIMESTAMP = 4;
62
63GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
64                                         const int conditionIndex,
65                                         const sp<ConditionWizard>& wizard,
66                                         const int pullTagId, const uint64_t startTimeNs,
67                                         shared_ptr<StatsPullerManager> statsPullerManager)
68    : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard),
69      mStatsPullerManager(statsPullerManager),
70      mPullTagId(pullTagId) {
71    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
72    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
73    int64_t bucketSizeMills = 0;
74    if (metric.has_bucket()) {
75        bucketSizeMills = TimeUnitToBucketSizeInMillis(metric.bucket());
76    } else {
77        bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
78    }
79    mBucketSizeNs = bucketSizeMills * 1000000;
80
81    mSamplingType = metric.sampling_type();
82    mFieldFilter = metric.gauge_fields_filter();
83
84    // TODO: use UidMap if uid->pkg_name is required
85    mDimensions = metric.dimensions_in_what();
86
87    if (metric.links().size() > 0) {
88        mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
89                               metric.links().end());
90        mConditionSliced = true;
91    }
92
93    // Kicks off the puller immediately.
94    if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
95        mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
96    }
97
98    VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
99         (long long)mBucketSizeNs, (long long)mStartTimeNs);
100}
101
102// for testing
103GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
104                                         const int conditionIndex,
105                                         const sp<ConditionWizard>& wizard, const int pullTagId,
106                                         const int64_t startTimeNs)
107    : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs,
108                          make_shared<StatsPullerManager>()) {
109}
110
111GaugeMetricProducer::~GaugeMetricProducer() {
112    VLOG("~GaugeMetricProducer() called");
113    if (mPullTagId != -1) {
114        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
115    }
116}
117
118void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, StatsLogReport* report) {
119    flushIfNeededLocked(dumpTimeNs);
120    ProtoOutputStream pbOutput;
121    onDumpReportLocked(dumpTimeNs, &pbOutput);
122    parseProtoOutputStream(pbOutput, report);
123}
124
125void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
126                                             ProtoOutputStream* protoOutput) {
127    VLOG("gauge metric %lld report now...", (long long)mMetricId);
128
129    flushIfNeededLocked(dumpTimeNs);
130    if (mPastBuckets.empty()) {
131        return;
132    }
133
134    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
135    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
136    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
137
138    for (const auto& pair : mPastBuckets) {
139        const HashableDimensionKey& hashableKey = pair.first;
140
141        VLOG("  dimension key %s", hashableKey.c_str());
142        long long wrapperToken =
143                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
144
145        // First fill dimension.
146        long long dimensionToken = protoOutput->start(
147                FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
148        writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput);
149        protoOutput->end(dimensionToken);
150
151        // Then fill bucket_info (GaugeBucketInfo).
152        for (const auto& bucket : pair.second) {
153            long long bucketInfoToken = protoOutput->start(
154                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
155            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
156                               (long long)bucket.mBucketStartNs);
157            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
158                               (long long)bucket.mBucketEndNs);
159
160            if (!bucket.mGaugeAtoms.empty()) {
161                long long atomsToken =
162                    protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
163                for (const auto& atom : bucket.mGaugeAtoms) {
164                    writeFieldValueTreeToStream(*atom.mFields, protoOutput);
165                }
166                protoOutput->end(atomsToken);
167
168                for (const auto& atom : bucket.mGaugeAtoms) {
169                    protoOutput->write(FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_TIMESTAMP,
170                                       (long long)atom.mTimestamps);
171                }
172            }
173            protoOutput->end(bucketInfoToken);
174            VLOG("\t bucket [%lld - %lld] includes %d atoms.", (long long)bucket.mBucketStartNs,
175                 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeAtoms.size());
176        }
177        protoOutput->end(wrapperToken);
178    }
179    protoOutput->end(protoToken);
180    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
181
182    mPastBuckets.clear();
183    mStartTimeNs = mCurrentBucketStartTimeNs;
184    // TODO: Clear mDimensionKeyMap once the report is dumped.
185}
186
187void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
188                                                   const uint64_t eventTime) {
189    VLOG("Metric %lld onConditionChanged", (long long)mMetricId);
190    flushIfNeededLocked(eventTime);
191    mCondition = conditionMet;
192
193    // Push mode. No need to proactively pull the gauge data.
194    if (mPullTagId == -1) {
195        return;
196    }
197
198    bool triggerPuller = false;
199    switch(mSamplingType) {
200        // When the metric wants to do random sampling and there is already one gauge atom for the
201        // current bucket, do not do it again.
202        case GaugeMetric::RANDOM_ONE_SAMPLE: {
203            triggerPuller = mCondition && mCurrentSlicedBucket->empty();
204            break;
205        }
206        case GaugeMetric::ALL_CONDITION_CHANGES: {
207            triggerPuller = true;
208            break;
209        }
210        default:
211            break;
212    }
213    if (!triggerPuller) {
214        return;
215    }
216
217    vector<std::shared_ptr<LogEvent>> allData;
218    if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
219        ALOGE("Stats puller failed for tag: %d", mPullTagId);
220        return;
221    }
222    for (const auto& data : allData) {
223        onMatchedLogEventLocked(0, *data);
224    }
225    flushIfNeededLocked(eventTime);
226}
227
228void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
229    VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
230}
231
232std::shared_ptr<FieldValueMap> GaugeMetricProducer::getGaugeFields(const LogEvent& event) {
233    std::shared_ptr<FieldValueMap> gaugeFields =
234        std::make_shared<FieldValueMap>(event.getFieldValueMap());
235    if (!mFieldFilter.include_all()) {
236        filterFields(mFieldFilter.fields(), gaugeFields.get());
237    }
238    return gaugeFields;
239}
240
241void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
242    std::lock_guard<std::mutex> lock(mMutex);
243    if (allData.size() == 0) {
244        return;
245    }
246    for (const auto& data : allData) {
247        onMatchedLogEventLocked(0, *data);
248    }
249}
250
251bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
252    if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
253        return false;
254    }
255    // 1. Report the tuple count if the tuple count > soft limit
256    if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
257        size_t newTupleCount = mCurrentSlicedBucket->size() + 1;
258        StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
259        // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
260        if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
261            ALOGE("GaugeMetric %lld dropping data for dimension key %s",
262                (long long)mMetricId, newKey.c_str());
263            return true;
264        }
265    }
266
267    return false;
268}
269
270void GaugeMetricProducer::onMatchedLogEventInternalLocked(
271        const size_t matcherIndex, const HashableDimensionKey& eventKey,
272        const ConditionKey& conditionKey, bool condition,
273        const LogEvent& event) {
274    if (condition == false) {
275        return;
276    }
277    uint64_t eventTimeNs = event.GetTimestampNs();
278    if (eventTimeNs < mCurrentBucketStartTimeNs) {
279        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
280             (long long)mCurrentBucketStartTimeNs);
281        return;
282    }
283    flushIfNeededLocked(eventTimeNs);
284
285    // When gauge metric wants to randomly sample the output atom, we just simply use the first
286    // gauge in the given bucket.
287    if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() &&
288        mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
289        return;
290    }
291    if (hitGuardRailLocked(eventKey)) {
292        return;
293    }
294    GaugeAtom gaugeAtom;
295    gaugeAtom.mFields = getGaugeFields(event);
296    gaugeAtom.mTimestamps = eventTimeNs;
297    (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom);
298    // Anomaly detection on gauge metric only works when there is one numeric
299    // field specified.
300    if (mAnomalyTrackers.size() > 0) {
301        if (gaugeAtom.mFields->size() == 1) {
302            const DimensionsValue& dimensionsValue = gaugeAtom.mFields->begin()->second;
303            long gaugeVal = 0;
304            if (dimensionsValue.has_value_int()) {
305                gaugeVal = (long)dimensionsValue.value_int();
306            } else if (dimensionsValue.has_value_long()) {
307                gaugeVal = dimensionsValue.value_long();
308            }
309            for (auto& tracker : mAnomalyTrackers) {
310                tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
311                                                 gaugeVal);
312            }
313        }
314    }
315}
316
317void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
318    mCurrentSlicedBucketForAnomaly->clear();
319    status_t err = NO_ERROR;
320    for (const auto& slice : *mCurrentSlicedBucket) {
321        if (slice.second.empty() || slice.second.front().mFields->empty()) {
322            continue;
323        }
324        const DimensionsValue& dimensionsValue = slice.second.front().mFields->begin()->second;
325        long gaugeVal = 0;
326        if (dimensionsValue.has_value_int()) {
327            gaugeVal = (long)dimensionsValue.value_int();
328        } else if (dimensionsValue.has_value_long()) {
329            gaugeVal = dimensionsValue.value_long();
330        }
331        (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
332    }
333}
334
335// When a new matched event comes in, we check if event falls into the current
336// bucket. If not, flush the old counter to past buckets and initialize the new
337// bucket.
338// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
339// the GaugeMetricProducer while holding the lock.
340void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
341    if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
342        VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
343             (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
344        return;
345    }
346
347    GaugeBucket info;
348    info.mBucketStartNs = mCurrentBucketStartTimeNs;
349    info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
350    info.mBucketNum = mCurrentBucketNum;
351
352    for (const auto& slice : *mCurrentSlicedBucket) {
353        info.mGaugeAtoms = slice.second;
354        auto& bucketList = mPastBuckets[slice.first];
355        bucketList.push_back(info);
356        VLOG("gauge metric %lld, dump key value: %s",
357            (long long)mMetricId, slice.first.c_str());
358    }
359
360    // Reset counters
361    if (mAnomalyTrackers.size() > 0) {
362        updateCurrentSlicedBucketForAnomaly();
363        for (auto& tracker : mAnomalyTrackers) {
364            tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
365        }
366    }
367
368    mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
369    mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
370
371    // Adjusts the bucket start time
372    int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
373    mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
374    mCurrentBucketNum += numBucketsForward;
375    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
376         (long long)mCurrentBucketStartTimeNs);
377}
378
379size_t GaugeMetricProducer::byteSizeLocked() const {
380    size_t totalSize = 0;
381    for (const auto& pair : mPastBuckets) {
382        totalSize += pair.second.size() * kBucketSize;
383    }
384    return totalSize;
385}
386
387}  // namespace statsd
388}  // namespace os
389}  // namespace android
390