GaugeMetricProducer.cpp revision b814481ad1f8d0e429d799b1571a6272e1a7f6c5
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG false // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "guardrail/StatsdStats.h" 22#include "dimension.h" 23#include "stats_log_util.h" 24 25#include <cutils/log.h> 26 27using android::util::FIELD_COUNT_REPEATED; 28using android::util::FIELD_TYPE_BOOL; 29using android::util::FIELD_TYPE_FLOAT; 30using android::util::FIELD_TYPE_INT32; 31using android::util::FIELD_TYPE_INT64; 32using android::util::FIELD_TYPE_MESSAGE; 33using android::util::FIELD_TYPE_STRING; 34using android::util::ProtoOutputStream; 35using std::map; 36using std::string; 37using std::unordered_map; 38using std::vector; 39using std::make_shared; 40using std::shared_ptr; 41 42namespace android { 43namespace os { 44namespace statsd { 45 46// for StatsLogReport 47const int FIELD_ID_ID = 1; 48const int FIELD_ID_START_REPORT_NANOS = 2; 49const int FIELD_ID_END_REPORT_NANOS = 3; 50const int FIELD_ID_GAUGE_METRICS = 8; 51// for GaugeMetricDataWrapper 52const int FIELD_ID_DATA = 1; 53// for GaugeMetricData 54const int FIELD_ID_DIMENSION = 1; 55const int FIELD_ID_BUCKET_INFO = 2; 56// for GaugeBucketInfo 57const int FIELD_ID_START_BUCKET_NANOS = 1; 58const int FIELD_ID_END_BUCKET_NANOS = 2; 59const int FIELD_ID_ATOM = 3; 60 61GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 62 const int conditionIndex, 63 const sp<ConditionWizard>& wizard, const int atomTagId, 64 const int pullTagId, const uint64_t startTimeNs, 65 shared_ptr<StatsPullerManager> statsPullerManager) 66 : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard), 67 mStatsPullerManager(statsPullerManager), 68 mPullTagId(pullTagId), 69 mAtomTagId(atomTagId) { 70 mCurrentSlicedBucket = std::make_shared<DimToGaugeFieldsMap>(); 71 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 72 int64_t bucketSizeMills = 0; 73 if (metric.has_bucket()) { 74 bucketSizeMills = TimeUnitToBucketSizeInMillis(metric.bucket()); 75 } else { 76 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 77 } 78 mBucketSizeNs = bucketSizeMills * 1000000; 79 80 mFieldFilter = metric.gauge_fields_filter(); 81 82 // TODO: use UidMap if uid->pkg_name is required 83 mDimensions = metric.dimensions(); 84 85 if (metric.links().size() > 0) { 86 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(), 87 metric.links().end()); 88 mConditionSliced = true; 89 } 90 91 // Kicks off the puller immediately. 92 if (mPullTagId != -1) { 93 mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills); 94 } 95 96 VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), 97 (long long)mBucketSizeNs, (long long)mStartTimeNs); 98} 99 100// for testing 101GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 102 const int conditionIndex, 103 const sp<ConditionWizard>& wizard, const int pullTagId, 104 const int atomTagId, const int64_t startTimeNs) 105 : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs, 106 make_shared<StatsPullerManager>()) { 107} 108 109GaugeMetricProducer::~GaugeMetricProducer() { 110 VLOG("~GaugeMetricProducer() called"); 111 if (mPullTagId != -1) { 112 mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); 113 } 114} 115 116void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, StatsLogReport* report) { 117 flushIfNeededLocked(dumpTimeNs); 118} 119 120void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, 121 ProtoOutputStream* protoOutput) { 122 VLOG("gauge metric %lld report now...", (long long)mMetricId); 123 124 flushIfNeededLocked(dumpTimeNs); 125 126 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 127 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); 128 long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 129 130 for (const auto& pair : mPastBuckets) { 131 const HashableDimensionKey& hashableKey = pair.first; 132 133 VLOG(" dimension key %s", hashableKey.c_str()); 134 long long wrapperToken = 135 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 136 137 // First fill dimension. 138 long long dimensionToken = protoOutput->start( 139 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); 140 writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput); 141 protoOutput->end(dimensionToken); 142 143 // Then fill bucket_info (GaugeBucketInfo). 144 for (const auto& bucket : pair.second) { 145 long long bucketInfoToken = protoOutput->start( 146 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 147 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, 148 (long long)bucket.mBucketStartNs); 149 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, 150 (long long)bucket.mBucketEndNs); 151 long long atomToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM); 152 writeFieldValueTreeToStream(*bucket.mGaugeFields, protoOutput); 153 protoOutput->end(atomToken); 154 protoOutput->end(bucketInfoToken); 155 VLOG("\t bucket [%lld - %lld] includes %d gauge fields.", (long long)bucket.mBucketStartNs, 156 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeFields->size()); 157 } 158 protoOutput->end(wrapperToken); 159 } 160 protoOutput->end(protoToken); 161 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); 162 163 mPastBuckets.clear(); 164 mStartTimeNs = mCurrentBucketStartTimeNs; 165 // TODO: Clear mDimensionKeyMap once the report is dumped. 166} 167 168void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, 169 const uint64_t eventTime) { 170 VLOG("Metric %lld onConditionChanged", (long long)mMetricId); 171 flushIfNeededLocked(eventTime); 172 mCondition = conditionMet; 173 174 // Push mode. No need to proactively pull the gauge data. 175 if (mPullTagId == -1) { 176 return; 177 } 178 // No need to pull again. Either scheduled pull or condition on true happened 179 if (!mCondition) { 180 return; 181 } 182 // Already have gauge metric for the current bucket, do not do it again. 183 if (mCurrentSlicedBucket->size() > 0) { 184 return; 185 } 186 vector<std::shared_ptr<LogEvent>> allData; 187 if (!mStatsPullerManager->Pull(mPullTagId, &allData)) { 188 ALOGE("Stats puller failed for tag: %d", mPullTagId); 189 return; 190 } 191 for (const auto& data : allData) { 192 onMatchedLogEventLocked(0, *data); 193 } 194 flushIfNeededLocked(eventTime); 195} 196 197void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { 198 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); 199} 200 201std::shared_ptr<FieldValueMap> GaugeMetricProducer::getGaugeFields(const LogEvent& event) { 202 std::shared_ptr<FieldValueMap> gaugeFields = 203 std::make_shared<FieldValueMap>(event.getFieldValueMap()); 204 if (!mFieldFilter.include_all()) { 205 filterFields(mFieldFilter.fields(), gaugeFields.get()); 206 } 207 return gaugeFields; 208} 209 210void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 211 std::lock_guard<std::mutex> lock(mMutex); 212 if (allData.size() == 0) { 213 return; 214 } 215 for (const auto& data : allData) { 216 onMatchedLogEventLocked(0, *data); 217 } 218} 219 220bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { 221 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 222 return false; 223 } 224 // 1. Report the tuple count if the tuple count > soft limit 225 if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) { 226 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 227 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 228 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 229 if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) { 230 ALOGE("GaugeMetric %lld dropping data for dimension key %s", 231 (long long)mMetricId, newKey.c_str()); 232 return true; 233 } 234 } 235 236 return false; 237} 238 239void GaugeMetricProducer::onMatchedLogEventInternalLocked( 240 const size_t matcherIndex, const HashableDimensionKey& eventKey, 241 const ConditionKey& conditionKey, bool condition, 242 const LogEvent& event) { 243 if (condition == false) { 244 return; 245 } 246 uint64_t eventTimeNs = event.GetTimestampNs(); 247 if (eventTimeNs < mCurrentBucketStartTimeNs) { 248 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 249 (long long)mCurrentBucketStartTimeNs); 250 return; 251 } 252 flushIfNeededLocked(eventTimeNs); 253 254 // For gauge metric, we just simply use the first gauge in the given bucket. 255 if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end()) { 256 return; 257 } 258 std::shared_ptr<FieldValueMap> gaugeFields = getGaugeFields(event); 259 if (hitGuardRailLocked(eventKey)) { 260 return; 261 } 262 (*mCurrentSlicedBucket)[eventKey] = gaugeFields; 263 // Anomaly detection on gauge metric only works when there is one numeric 264 // field specified. 265 if (mAnomalyTrackers.size() > 0) { 266 if (gaugeFields->size() == 1) { 267 const DimensionsValue& dimensionsValue = gaugeFields->begin()->second; 268 long gaugeVal = 0; 269 if (dimensionsValue.has_value_int()) { 270 gaugeVal = (long)dimensionsValue.value_int(); 271 } else if (dimensionsValue.has_value_long()) { 272 gaugeVal = dimensionsValue.value_long(); 273 } 274 for (auto& tracker : mAnomalyTrackers) { 275 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, 276 gaugeVal); 277 } 278 } 279 } 280} 281 282void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() { 283 mCurrentSlicedBucketForAnomaly->clear(); 284 status_t err = NO_ERROR; 285 for (const auto& slice : *mCurrentSlicedBucket) { 286 const DimensionsValue& dimensionsValue = slice.second->begin()->second; 287 long gaugeVal = 0; 288 if (dimensionsValue.has_value_int()) { 289 gaugeVal = (long)dimensionsValue.value_int(); 290 } else if (dimensionsValue.has_value_long()) { 291 gaugeVal = dimensionsValue.value_long(); 292 } 293 (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal; 294 } 295} 296 297// When a new matched event comes in, we check if event falls into the current 298// bucket. If not, flush the old counter to past buckets and initialize the new 299// bucket. 300// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 301// the GaugeMetricProducer while holding the lock. 302void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { 303 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { 304 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, 305 (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); 306 return; 307 } 308 309 GaugeBucket info; 310 info.mBucketStartNs = mCurrentBucketStartTimeNs; 311 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs; 312 info.mBucketNum = mCurrentBucketNum; 313 314 for (const auto& slice : *mCurrentSlicedBucket) { 315 info.mGaugeFields = slice.second; 316 auto& bucketList = mPastBuckets[slice.first]; 317 bucketList.push_back(info); 318 VLOG("gauge metric %lld, dump key value: %s", 319 (long long)mMetricId, slice.first.c_str()); 320 } 321 322 // Reset counters 323 if (mAnomalyTrackers.size() > 0) { 324 updateCurrentSlicedBucketForAnomaly(); 325 for (auto& tracker : mAnomalyTrackers) { 326 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum); 327 } 328 } 329 330 mCurrentSlicedBucket = std::make_shared<DimToGaugeFieldsMap>(); 331 332 // Adjusts the bucket start time 333 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; 334 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs; 335 mCurrentBucketNum += numBucketsForward; 336 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, 337 (long long)mCurrentBucketStartTimeNs); 338} 339 340size_t GaugeMetricProducer::byteSizeLocked() const { 341 size_t totalSize = 0; 342 for (const auto& pair : mPastBuckets) { 343 totalSize += pair.second.size() * kBucketSize; 344 } 345 return totalSize; 346} 347 348} // namespace statsd 349} // namespace os 350} // namespace android 351