GaugeMetricProducer.cpp revision c40a19d2e43d5de3e036e926bf070220c2c865e6
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG false // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "guardrail/StatsdStats.h" 22#include "stats_log_util.h" 23 24#include <cutils/log.h> 25 26using android::util::FIELD_COUNT_REPEATED; 27using android::util::FIELD_TYPE_BOOL; 28using android::util::FIELD_TYPE_FLOAT; 29using android::util::FIELD_TYPE_INT32; 30using android::util::FIELD_TYPE_INT64; 31using android::util::FIELD_TYPE_MESSAGE; 32using android::util::FIELD_TYPE_STRING; 33using android::util::ProtoOutputStream; 34using std::map; 35using std::string; 36using std::unordered_map; 37using std::vector; 38using std::make_shared; 39using std::shared_ptr; 40 41namespace android { 42namespace os { 43namespace statsd { 44 45// for StatsLogReport 46const int FIELD_ID_ID = 1; 47const int FIELD_ID_GAUGE_METRICS = 8; 48// for GaugeMetricDataWrapper 49const int FIELD_ID_DATA = 1; 50// for GaugeMetricData 51const int FIELD_ID_DIMENSION_IN_WHAT = 1; 52const int FIELD_ID_DIMENSION_IN_CONDITION = 2; 53const int FIELD_ID_BUCKET_INFO = 3; 54// for GaugeBucketInfo 55const int FIELD_ID_START_BUCKET_ELAPSED_NANOS = 1; 56const int FIELD_ID_END_BUCKET_ELAPSED_NANOS = 2; 57const int FIELD_ID_ATOM = 3; 58const int FIELD_ID_ELAPSED_ATOM_TIMESTAMP = 4; 59const int FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP = 5; 60 61GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 62 const int conditionIndex, 63 const sp<ConditionWizard>& wizard, 64 const int pullTagId, const uint64_t startTimeNs, 65 shared_ptr<StatsPullerManager> statsPullerManager) 66 : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard), 67 mStatsPullerManager(statsPullerManager), 68 mPullTagId(pullTagId) { 69 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 70 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 71 int64_t bucketSizeMills = 0; 72 if (metric.has_bucket()) { 73 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); 74 } else { 75 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 76 } 77 mBucketSizeNs = bucketSizeMills * 1000000; 78 79 mSamplingType = metric.sampling_type(); 80 if (!metric.gauge_fields_filter().include_all()) { 81 translateFieldMatcher(metric.gauge_fields_filter().fields(), &mFieldMatchers); 82 } 83 84 // TODO: use UidMap if uid->pkg_name is required 85 if (metric.has_dimensions_in_what()) { 86 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); 87 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); 88 } 89 90 if (metric.has_dimensions_in_condition()) { 91 translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); 92 } 93 94 if (metric.links().size() > 0) { 95 for (const auto& link : metric.links()) { 96 Metric2Condition mc; 97 mc.conditionId = link.condition(); 98 translateFieldMatcher(link.fields_in_what(), &mc.metricFields); 99 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); 100 mMetric2ConditionLinks.push_back(mc); 101 } 102 } 103 mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); 104 105 // Kicks off the puller immediately. 106 if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 107 mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills); 108 } 109 110 VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), 111 (long long)mBucketSizeNs, (long long)mStartTimeNs); 112} 113 114// for testing 115GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 116 const int conditionIndex, 117 const sp<ConditionWizard>& wizard, const int pullTagId, 118 const int64_t startTimeNs) 119 : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs, 120 make_shared<StatsPullerManager>()) { 121} 122 123GaugeMetricProducer::~GaugeMetricProducer() { 124 VLOG("~GaugeMetricProducer() called"); 125 if (mPullTagId != -1) { 126 mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); 127 } 128} 129 130void GaugeMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { 131 if (mCurrentSlicedBucket == nullptr || 132 mCurrentSlicedBucket->size() == 0) { 133 return; 134 } 135 136 fprintf(out, "GaugeMetric %lld dimension size %lu\n", (long long)mMetricId, 137 (unsigned long)mCurrentSlicedBucket->size()); 138 if (verbose) { 139 for (const auto& it : *mCurrentSlicedBucket) { 140 fprintf(out, "\t(what)%s\t(condition)%s %d atoms\n", 141 it.first.getDimensionKeyInWhat().toString().c_str(), 142 it.first.getDimensionKeyInCondition().toString().c_str(), 143 (int)it.second.size()); 144 } 145 } 146} 147 148void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, 149 ProtoOutputStream* protoOutput) { 150 VLOG("gauge metric %lld report now...", (long long)mMetricId); 151 152 flushIfNeededLocked(dumpTimeNs); 153 if (mPastBuckets.empty()) { 154 return; 155 } 156 157 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 158 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 159 160 for (const auto& pair : mPastBuckets) { 161 const MetricDimensionKey& dimensionKey = pair.first; 162 163 VLOG(" dimension key %s", dimensionKey.toString().c_str()); 164 uint64_t wrapperToken = 165 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 166 167 // First fill dimension. 168 uint64_t dimensionToken = protoOutput->start( 169 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 170 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), protoOutput); 171 protoOutput->end(dimensionToken); 172 173 if (dimensionKey.hasDimensionKeyInCondition()) { 174 uint64_t dimensionInConditionToken = protoOutput->start( 175 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); 176 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), protoOutput); 177 protoOutput->end(dimensionInConditionToken); 178 } 179 180 // Then fill bucket_info (GaugeBucketInfo). 181 for (const auto& bucket : pair.second) { 182 uint64_t bucketInfoToken = protoOutput->start( 183 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 184 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_NANOS, 185 (long long)bucket.mBucketStartNs); 186 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_NANOS, 187 (long long)bucket.mBucketEndNs); 188 189 if (!bucket.mGaugeAtoms.empty()) { 190 for (const auto& atom : bucket.mGaugeAtoms) { 191 uint64_t atomsToken = 192 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | 193 FIELD_ID_ATOM); 194 writeFieldValueTreeToStream(mTagId, *(atom.mFields), protoOutput); 195 protoOutput->end(atomsToken); 196 } 197 const bool truncateTimestamp = 198 android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.find( 199 mTagId) == 200 android::util::AtomsInfo::kNotTruncatingTimestampAtomWhiteList.end(); 201 const int64_t wall_clock_ns = truncateTimestamp ? 202 truncateTimestampNsToFiveMinutes(getWallClockNs()) : getWallClockNs(); 203 for (const auto& atom : bucket.mGaugeAtoms) { 204 int64_t timestampNs = truncateTimestamp ? 205 truncateTimestampNsToFiveMinutes(atom.mTimestamps) : atom.mTimestamps; 206 protoOutput->write( 207 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_ELAPSED_ATOM_TIMESTAMP, 208 (long long)timestampNs); 209 protoOutput->write( 210 FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | 211 FIELD_ID_WALL_CLOCK_ATOM_TIMESTAMP, 212 (long long)wall_clock_ns); 213 } 214 } 215 protoOutput->end(bucketInfoToken); 216 VLOG("\t bucket [%lld - %lld] includes %d atoms.", (long long)bucket.mBucketStartNs, 217 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeAtoms.size()); 218 } 219 protoOutput->end(wrapperToken); 220 } 221 protoOutput->end(protoToken); 222 223 mPastBuckets.clear(); 224 // TODO: Clear mDimensionKeyMap once the report is dumped. 225} 226 227void GaugeMetricProducer::pullLocked() { 228 vector<std::shared_ptr<LogEvent>> allData; 229 if (!mStatsPullerManager->Pull(mPullTagId, &allData)) { 230 ALOGE("Stats puller failed for tag: %d", mPullTagId); 231 return; 232 } 233 for (const auto& data : allData) { 234 onMatchedLogEventLocked(0, *data); 235 } 236} 237 238void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, 239 const uint64_t eventTime) { 240 VLOG("Metric %lld onConditionChanged", (long long)mMetricId); 241 flushIfNeededLocked(eventTime); 242 mCondition = conditionMet; 243 244 // Push mode. No need to proactively pull the gauge data. 245 if (mPullTagId == -1) { 246 return; 247 } 248 249 bool triggerPuller = false; 250 switch(mSamplingType) { 251 // When the metric wants to do random sampling and there is already one gauge atom for the 252 // current bucket, do not do it again. 253 case GaugeMetric::RANDOM_ONE_SAMPLE: { 254 triggerPuller = mCondition && mCurrentSlicedBucket->empty(); 255 break; 256 } 257 case GaugeMetric::ALL_CONDITION_CHANGES: { 258 triggerPuller = true; 259 break; 260 } 261 default: 262 break; 263 } 264 if (!triggerPuller) { 265 return; 266 } 267 268 vector<std::shared_ptr<LogEvent>> allData; 269 if (!mStatsPullerManager->Pull(mPullTagId, &allData)) { 270 ALOGE("Stats puller failed for tag: %d", mPullTagId); 271 return; 272 } 273 for (const auto& data : allData) { 274 onMatchedLogEventLocked(0, *data); 275 } 276 flushIfNeededLocked(eventTime); 277} 278 279void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { 280 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); 281} 282 283std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const LogEvent& event) { 284 if (mFieldMatchers.size() > 0) { 285 std::shared_ptr<vector<FieldValue>> gaugeFields = std::make_shared<vector<FieldValue>>(); 286 filterGaugeValues(mFieldMatchers, event.getValues(), gaugeFields.get()); 287 return gaugeFields; 288 } else { 289 return std::make_shared<vector<FieldValue>>(event.getValues()); 290 } 291} 292 293void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 294 std::lock_guard<std::mutex> lock(mMutex); 295 if (allData.size() == 0) { 296 return; 297 } 298 for (const auto& data : allData) { 299 onMatchedLogEventLocked(0, *data); 300 } 301} 302 303bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { 304 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 305 return false; 306 } 307 // 1. Report the tuple count if the tuple count > soft limit 308 if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) { 309 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 310 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 311 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 312 if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) { 313 ALOGE("GaugeMetric %lld dropping data for dimension key %s", 314 (long long)mMetricId, newKey.toString().c_str()); 315 return true; 316 } 317 } 318 319 return false; 320} 321 322void GaugeMetricProducer::onMatchedLogEventInternalLocked( 323 const size_t matcherIndex, const MetricDimensionKey& eventKey, 324 const ConditionKey& conditionKey, bool condition, 325 const LogEvent& event) { 326 if (condition == false) { 327 return; 328 } 329 uint64_t eventTimeNs = event.GetElapsedTimestampNs(); 330 mTagId = event.GetTagId(); 331 if (eventTimeNs < mCurrentBucketStartTimeNs) { 332 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 333 (long long)mCurrentBucketStartTimeNs); 334 return; 335 } 336 flushIfNeededLocked(eventTimeNs); 337 338 // When gauge metric wants to randomly sample the output atom, we just simply use the first 339 // gauge in the given bucket. 340 if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() && 341 mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 342 return; 343 } 344 if (hitGuardRailLocked(eventKey)) { 345 return; 346 } 347 GaugeAtom gaugeAtom(getGaugeFields(event), eventTimeNs); 348 (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom); 349 // Anomaly detection on gauge metric only works when there is one numeric 350 // field specified. 351 if (mAnomalyTrackers.size() > 0) { 352 if (gaugeAtom.mFields->size() == 1) { 353 const Value& value = gaugeAtom.mFields->begin()->mValue; 354 long gaugeVal = 0; 355 if (value.getType() == INT) { 356 gaugeVal = (long)value.int_value; 357 } else if (value.getType() == LONG) { 358 gaugeVal = value.long_value; 359 } 360 for (auto& tracker : mAnomalyTrackers) { 361 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, 362 gaugeVal); 363 } 364 } 365 } 366} 367 368void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() { 369 for (const auto& slice : *mCurrentSlicedBucket) { 370 if (slice.second.empty()) { 371 continue; 372 } 373 const Value& value = slice.second.front().mFields->front().mValue; 374 long gaugeVal = 0; 375 if (value.getType() == INT) { 376 gaugeVal = (long)value.int_value; 377 } else if (value.getType() == LONG) { 378 gaugeVal = value.long_value; 379 } 380 (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal; 381 } 382} 383 384void GaugeMetricProducer::dropDataLocked(const uint64_t dropTimeNs) { 385 flushIfNeededLocked(dropTimeNs); 386 mPastBuckets.clear(); 387} 388 389// When a new matched event comes in, we check if event falls into the current 390// bucket. If not, flush the old counter to past buckets and initialize the new 391// bucket. 392// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 393// the GaugeMetricProducer while holding the lock. 394void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { 395 uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 396 397 if (eventTimeNs < currentBucketEndTimeNs) { 398 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, 399 (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); 400 return; 401 } 402 403 flushCurrentBucketLocked(eventTimeNs); 404 405 // Adjusts the bucket start and end times. 406 int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; 407 mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; 408 mCurrentBucketNum += numBucketsForward; 409 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, 410 (long long)mCurrentBucketStartTimeNs); 411} 412 413void GaugeMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) { 414 uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); 415 416 GaugeBucket info; 417 info.mBucketStartNs = mCurrentBucketStartTimeNs; 418 if (eventTimeNs < fullBucketEndTimeNs) { 419 info.mBucketEndNs = eventTimeNs; 420 } else { 421 info.mBucketEndNs = fullBucketEndTimeNs; 422 } 423 424 for (const auto& slice : *mCurrentSlicedBucket) { 425 info.mGaugeAtoms = slice.second; 426 auto& bucketList = mPastBuckets[slice.first]; 427 bucketList.push_back(info); 428 VLOG("gauge metric %lld, dump key value: %s", (long long)mMetricId, 429 slice.first.toString().c_str()); 430 } 431 432 // If we have anomaly trackers, we need to update the partial bucket values. 433 if (mAnomalyTrackers.size() > 0) { 434 updateCurrentSlicedBucketForAnomaly(); 435 436 if (eventTimeNs > fullBucketEndTimeNs) { 437 // This is known to be a full bucket, so send this data to the anomaly tracker. 438 for (auto& tracker : mAnomalyTrackers) { 439 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum); 440 } 441 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 442 } 443 } 444 445 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 446} 447 448size_t GaugeMetricProducer::byteSizeLocked() const { 449 size_t totalSize = 0; 450 for (const auto& pair : mPastBuckets) { 451 totalSize += pair.second.size() * kBucketSize; 452 } 453 return totalSize; 454} 455 456} // namespace statsd 457} // namespace os 458} // namespace android 459