GaugeMetricProducer.cpp revision d9afdee26f01d1e0277749959094d9f396799a69
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG false // STOPSHIP if true 18#include "Log.h" 19 20#include "../guardrail/StatsdStats.h" 21#include "GaugeMetricProducer.h" 22#include "../stats_log_util.h" 23 24#include <cutils/log.h> 25 26using android::util::FIELD_COUNT_REPEATED; 27using android::util::FIELD_TYPE_BOOL; 28using android::util::FIELD_TYPE_FLOAT; 29using android::util::FIELD_TYPE_INT32; 30using android::util::FIELD_TYPE_INT64; 31using android::util::FIELD_TYPE_MESSAGE; 32using android::util::FIELD_TYPE_STRING; 33using android::util::ProtoOutputStream; 34using std::map; 35using std::string; 36using std::unordered_map; 37using std::vector; 38using std::make_shared; 39using std::shared_ptr; 40 41namespace android { 42namespace os { 43namespace statsd { 44 45// for StatsLogReport 46const int FIELD_ID_ID = 1; 47const int FIELD_ID_GAUGE_METRICS = 8; 48// for GaugeMetricDataWrapper 49const int FIELD_ID_DATA = 1; 50// for GaugeMetricData 51const int FIELD_ID_DIMENSION_IN_WHAT = 1; 52const int FIELD_ID_DIMENSION_IN_CONDITION = 2; 53const int FIELD_ID_BUCKET_INFO = 3; 54// for GaugeBucketInfo 55const int FIELD_ID_START_BUCKET_ELAPSED_NANOS = 1; 56const int FIELD_ID_END_BUCKET_ELAPSED_NANOS = 2; 57const int FIELD_ID_ATOM = 3; 58const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4; 59const int FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP = 5; 60 61GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 62 const int conditionIndex, 63 const sp<ConditionWizard>& wizard, const int pullTagId, 64 const uint64_t startTimeNs, 65 shared_ptr<StatsPullerManager> statsPullerManager) 66 : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard), 67 mStatsPullerManager(statsPullerManager), 68 mPullTagId(pullTagId), 69 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 70 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 71 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first 72 : StatsdStats::kDimensionKeySizeSoftLimit), 73 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 74 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 75 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second 76 : StatsdStats::kDimensionKeySizeHardLimit) { 77 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 78 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 79 int64_t bucketSizeMills = 0; 80 if (metric.has_bucket()) { 81 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); 82 } else { 83 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 84 } 85 mBucketSizeNs = bucketSizeMills * 1000000; 86 87 mSamplingType = metric.sampling_type(); 88 if (!metric.gauge_fields_filter().include_all()) { 89 translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers); 90 } 91 92 // TODO: use UidMap if uid->pkg_name is required 93 if (metric.has_dimensions_in_what()) { 94 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); 95 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); 96 } 97 98 if (metric.has_dimensions_in_condition()) { 99 translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); 100 } 101 102 if (metric.links().size() > 0) { 103 for (const auto& link : metric.links()) { 104 Metric2Condition mc; 105 mc.conditionId = link.condition(); 106 translateFieldMatcher(link.fields_in_what(), &mc.metricFields); 107 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); 108 mMetric2ConditionLinks.push_back(mc); 109 } 110 } 111 mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); 112 113 // Kicks off the puller immediately. 114 if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 115 mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills); 116 } 117 118 VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d", 119 (long long)metric.id(), (long long)mBucketSizeNs, (long long)mStartTimeNs, 120 mConditionSliced); 121} 122 123// for testing 124GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 125 const int conditionIndex, 126 const sp<ConditionWizard>& wizard, const int pullTagId, 127 const int64_t startTimeNs) 128 : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs, 129 make_shared<StatsPullerManager>()) { 130} 131 132GaugeMetricProducer::~GaugeMetricProducer() { 133 VLOG("~GaugeMetricProducer() called"); 134 if (mPullTagId != -1) { 135 mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); 136 } 137} 138 139void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { 140 if (mCurrentSlicedBucket == nullptr || 141 mCurrentSlicedBucket->size() == 0) { 142 return; 143 } 144 145 fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId, 146 (unsigned long)mCurrentSlicedBucket->size()); 147 if (verbose) { 148 for (const auto& it : *mCurrentSlicedBucket) { 149 fprintf(out, "\t(what)%s\t(condition)%s %d atoms\n", 150 it.first.getDimensionKeyInWhat().toString().c_str(), 151 it.first.getDimensionKeyInCondition().toString().c_str(), 152 (int)it.second.size()); 153 } 154 } 155} 156 157void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, 158 ProtoOutputStream* protoOutput) { 159 VLOG("Gauge metric %lld report now...", (long long)mMetricId); 160 161 flushIfNeededLocked(dumpTimeNs); 162 if (mPastBuckets.empty()) { 163 return; 164 } 165 166 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 167 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 168 169 for (const auto& pair : mPastBuckets) { 170 const MetricDimensionKey& dimensionKey = pair.first; 171 172 VLOG("Gauge dimension key %s", dimensionKey.toString().c_str()); 173 uint64_t wrapperToken = 174 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 175 176 // First fill dimension. 177 uint64_t dimensionToken = protoOutput->start( 178 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 179 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), protoOutput); 180 protoOutput->end(dimensionToken); 181 182 if (dimensionKey.hasDimensionKeyInCondition()) { 183 uint64_t dimensionInConditionToken = protoOutput->start( 184 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); 185 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), protoOutput); 186 protoOutput->end(dimensionInConditionToken); 187 } 188 189 // Then fill bucket_info (GaugeBucketInfo). 190 for (const auto& bucket : pair.second) { 191 uint64_t bucketInfoToken = protoOutput->start( 192 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 193 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_NANOS, 194 (long long)bucket.mBucketStartNs); 195 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_NANOS, 196 (long long)bucket.mBucketEndNs); 197 198 if (!bucket.mGaugeAtoms.empty()) { 199 for (const auto& atom : bucket.mGaugeAtoms) { 200 uint64_t atomsToken = 201 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | 202 FIELD_ID_ATOM); 203 writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput); 204 protoOutput->end(atomsToken); 205 } 206 const bool truncateTimestamp = 207 android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.find( 208 mTagId) == 209 android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.end(); 210 const int64_t wall_clock_ns = truncateTimestamp ? 211 truncateTimestampNsToFiveMinutes(getWallClockNs()) : getWallClockNs(); 212 for (const auto& atom : bucket.mGaugeAtoms) { 213 int64_t timestampNs = truncateTimestamp ? 214 truncateTimestampNsToFiveMinutes(atom.mTimestamps) : atom.mTimestamps; 215 protoOutput->write( 216 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP, 217 (long long)timestampNs); 218 protoOutput->write( 219 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | 220 FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP, 221 (long long)wall_clock_ns); 222 } 223 } 224 protoOutput->end(bucketInfoToken); 225 VLOG("Gauge \t bucket [%lld - %lld] includes %d atoms.", 226 (long long)bucket.mBucketStartNs, (long long)bucket.mBucketEndNs, 227 (int)bucket.mGaugeAtoms.size()); 228 } 229 protoOutput->end(wrapperToken); 230 } 231 protoOutput->end(protoToken); 232 233 mPastBuckets.clear(); 234 // TODO: Clear mDimensionKeyMap once the report is dumped. 235} 236 237void GaugeMetricProducer::pullLocked() { 238 bool triggerPuller = false; 239 switch(mSamplingType) { 240 // When the metric wants to do random sampling and there is already one gauge atom for the 241 // current bucket, do not do it again. 242 case GaugeMetric::RANDOM_ONE_SAMPLE: { 243 triggerPuller = mCondition && mCurrentSlicedBucket->empty(); 244 break; 245 } 246 case GaugeMetric::ALL_CONDITION_CHANGES: { 247 triggerPuller = true; 248 break; 249 } 250 default: 251 break; 252 } 253 if (!triggerPuller) { 254 return; 255 } 256 257 vector<std::shared_ptr<LogEvent>> allData; 258 if (!mStatsPullerManager->Pull(mPullTagId, &allData)) { 259 ALOGE("Gauge Stats puller failed for tag: %d", mPullTagId); 260 return; 261 } 262 263 for (const auto& data : allData) { 264 onMatchedLogEventLocked(0, *data); 265 } 266} 267 268void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, 269 const uint64_t eventTime) { 270 VLOG("GaugeMetric %lld onConditionChanged", (long long)mMetricId); 271 flushIfNeededLocked(eventTime); 272 mCondition = conditionMet; 273 274 if (mPullTagId != -1) { 275 pullLocked(); 276 } // else: Push mode. No need to proactively pull the gauge data. 277} 278 279void GaugeMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, 280 const uint64_t eventTime) { 281 VLOG("GaugeMetric %lld onSlicedConditionMayChange overall condition %d", (long long)mMetricId, 282 overallCondition); 283 flushIfNeededLocked(eventTime); 284 // If the condition is sliced, mCondition is true if any of the dimensions is true. And we will 285 // pull for every dimension. 286 mCondition = overallCondition; 287 if (mPullTagId != -1) { 288 pullLocked(); 289 } // else: Push mode. No need to proactively pull the gauge data. 290} 291 292std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) { 293 if (mFieldMatchers.size() > 0) { 294 std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>(); 295 filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get()); 296 return gaugeFields; 297 } else { 298 return std::make_shared<vector<FieldValue>>(event.getValues()); 299 } 300} 301 302void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 303 std::lock_guard<std::mutex> lock(mMutex); 304 if (allData.size() == 0) { 305 return; 306 } 307 for (const auto& data : allData) { 308 onMatchedLogEventLocked(0, *data); 309 } 310} 311 312bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { 313 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 314 return false; 315 } 316 // 1. Report the tuple count if the tuple count > soft limit 317 if (mCurrentSlicedBucket->size() > mDimensionSoftLimit - 1) { 318 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 319 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 320 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 321 if (newTupleCount > mDimensionHardLimit) { 322 ALOGE("GaugeMetric %lld dropping data for dimension key %s", 323 (long long)mMetricId, newKey.toString().c_str()); 324 return true; 325 } 326 } 327 328 return false; 329} 330 331void GaugeMetricProducer::onMatchedLogEventInternalLocked( 332 const size_t matcherIndex, const MetricDimensionKey& eventKey, 333 const ConditionKey& conditionKey, bool condition, 334 const LogEvent& event) { 335 if (condition == false) { 336 return; 337 } 338 uint64_t eventTimeNs = event.GetElapsedTimestampNs(); 339 mTagId = event.GetTagId(); 340 if (eventTimeNs < mCurrentBucketStartTimeNs) { 341 VLOG("Gauge Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 342 (long long)mCurrentBucketStartTimeNs); 343 return; 344 } 345 flushIfNeededLocked(eventTimeNs); 346 347 // When gauge metric wants to randomly sample the output atom, we just simply use the first 348 // gauge in the given bucket. 349 if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() && 350 mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 351 return; 352 } 353 if (hitGuardRailLocked(eventKey)) { 354 return; 355 } 356 GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs); 357 (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom); 358 // Anomaly detection on gauge metric only works when there is one numeric 359 // field specified. 360 if (mAnomalyTrackers.size() > 0) { 361 if (gaugeAtom.mFields->size() == 1) { 362 const Value& value = gaugeAtom.mFields->begin()->mValue; 363 long gaugeVal = 0; 364 if (value.getType() == INT) { 365 gaugeVal = (long)value.int_value; 366 } else if (value.getType() == LONG) { 367 gaugeVal = value.long_value; 368 } 369 for (auto& tracker : mAnomalyTrackers) { 370 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, 371 gaugeVal); 372 } 373 } 374 } 375} 376 377void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() { 378 for (const auto& slice : *mCurrentSlicedBucket) { 379 if (slice.second.empty()) { 380 continue; 381 } 382 const Value& value = slice.second.front().mFields->front().mValue; 383 long gaugeVal = 0; 384 if (value.getType() == INT) { 385 gaugeVal = (long)value.int_value; 386 } else if (value.getType() == LONG) { 387 gaugeVal = value.long_value; 388 } 389 (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal; 390 } 391} 392 393void GaugeMetricProducer::dropDataLocked(const uint64_t dropTimeNs) { 394 flushIfNeededLocked(dropTimeNs); 395 mPastBuckets.clear(); 396} 397 398// When a new matched event comes in, we check if event falls into the current 399// bucket. If not, flush the old counter to past buckets and initialize the new 400// bucket. 401// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 402// the GaugeMetricProducer while holding the lock. 403void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { 404 uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 405 406 if (eventTimeNs < currentBucketEndTimeNs) { 407 VLOG("Gauge eventTime is %lld, less than next bucket start time %lld", 408 (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); 409 return; 410 } 411 412 flushCurrentBucketLocked(eventTimeNs); 413 414 // Adjusts the bucket start and end times. 415 int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; 416 mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; 417 mCurrentBucketNum += numBucketsForward; 418 VLOG("Gauge metric %lld: new bucket start time: %lld", (long long)mMetricId, 419 (long long)mCurrentBucketStartTimeNs); 420} 421 422void GaugeMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) { 423 uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); 424 425 GaugeBucket info; 426 info.mBucketStartNs = mCurrentBucketStartTimeNs; 427 if (eventTimeNs < fullBucketEndTimeNs) { 428 info.mBucketEndNs = eventTimeNs; 429 } else { 430 info.mBucketEndNs = fullBucketEndTimeNs; 431 } 432 433 for (const auto& slice : *mCurrentSlicedBucket) { 434 info.mGaugeAtoms = slice.second; 435 auto& bucketList = mPastBuckets[slice.first]; 436 bucketList.push_back(info); 437 VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, 438 slice.first.toString().c_str()); 439 } 440 441 // If we have anomaly trackers, we need to update the partial bucket values. 442 if (mAnomalyTrackers.size() > 0) { 443 updateCurrentSlicedBucketForAnomaly(); 444 445 if (eventTimeNs > fullBucketEndTimeNs) { 446 // This is known to be a full bucket, so send this data to the anomaly tracker. 447 for (auto& tracker : mAnomalyTrackers) { 448 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum); 449 } 450 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 451 } 452 } 453 454 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 455} 456 457size_t GaugeMetricProducer::byteSizeLocked() const { 458 size_t totalSize = 0; 459 for (const auto& pair : mPastBuckets) { 460 totalSize += pair.second.size() * kBucketSize; 461 } 462 return totalSize; 463} 464 465} // namespace statsd 466} // namespace os 467} // namespace android 468