GaugeMetricProducer.cpp revision 34ea1103a0faa0e01df0e2b0ac1ce98e7ec3e3f1
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG false // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "guardrail/StatsdStats.h" 22#include "dimension.h" 23#include "stats_log_util.h" 24 25#include <cutils/log.h> 26 27using android::util::FIELD_COUNT_REPEATED; 28using android::util::FIELD_TYPE_BOOL; 29using android::util::FIELD_TYPE_FLOAT; 30using android::util::FIELD_TYPE_INT32; 31using android::util::FIELD_TYPE_INT64; 32using android::util::FIELD_TYPE_MESSAGE; 33using android::util::FIELD_TYPE_STRING; 34using android::util::ProtoOutputStream; 35using std::map; 36using std::string; 37using std::unordered_map; 38using std::vector; 39using std::make_shared; 40using std::shared_ptr; 41 42namespace android { 43namespace os { 44namespace statsd { 45 46// for StatsLogReport 47const int FIELD_ID_ID = 1; 48const int FIELD_ID_START_REPORT_NANOS = 2; 49const int FIELD_ID_END_REPORT_NANOS = 3; 50const int FIELD_ID_GAUGE_METRICS = 8; 51// for GaugeMetricDataWrapper 52const int FIELD_ID_DATA = 1; 53// for GaugeMetricData 54const int FIELD_ID_DIMENSION_IN_WHAT = 1; 55const int FIELD_ID_DIMENSION_IN_CONDITION = 2; 56const int FIELD_ID_BUCKET_INFO = 3; 57// for GaugeBucketInfo 58const int FIELD_ID_START_BUCKET_NANOS = 1; 59const int FIELD_ID_END_BUCKET_NANOS = 2; 60const int FIELD_ID_ATOM = 3; 61const int FIELD_ID_TIMESTAMP = 4; 62 63GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 64 const int conditionIndex, 65 const sp<ConditionWizard>& wizard, 66 const int pullTagId, const uint64_t startTimeNs, 67 shared_ptr<StatsPullerManager> statsPullerManager) 68 : MetricProducer(metric.id(), key, startTimeNs, conditionIndex, wizard), 69 mStatsPullerManager(statsPullerManager), 70 mPullTagId(pullTagId) { 71 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 72 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 73 int64_t bucketSizeMills = 0; 74 if (metric.has_bucket()) { 75 bucketSizeMills = TimeUnitToBucketSizeInMillis(metric.bucket()); 76 } else { 77 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 78 } 79 mBucketSizeNs = bucketSizeMills * 1000000; 80 81 mSamplingType = metric.sampling_type(); 82 mFieldFilter = metric.gauge_fields_filter(); 83 84 // TODO: use UidMap if uid->pkg_name is required 85 mDimensions = metric.dimensions_in_what(); 86 87 if (metric.links().size() > 0) { 88 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(), 89 metric.links().end()); 90 mConditionSliced = true; 91 } 92 93 // Kicks off the puller immediately. 94 if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 95 mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills); 96 } 97 98 VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), 99 (long long)mBucketSizeNs, (long long)mStartTimeNs); 100} 101 102// for testing 103GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 104 const int conditionIndex, 105 const sp<ConditionWizard>& wizard, const int pullTagId, 106 const int64_t startTimeNs) 107 : GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, startTimeNs, 108 make_shared<StatsPullerManager>()) { 109} 110 111GaugeMetricProducer::~GaugeMetricProducer() { 112 VLOG("~GaugeMetricProducer() called"); 113 if (mPullTagId != -1) { 114 mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); 115 } 116} 117 118void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, StatsLogReport* report) { 119 flushIfNeededLocked(dumpTimeNs); 120 ProtoOutputStream pbOutput; 121 onDumpReportLocked(dumpTimeNs, &pbOutput); 122 parseProtoOutputStream(pbOutput, report); 123} 124 125void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, 126 ProtoOutputStream* protoOutput) { 127 VLOG("gauge metric %lld report now...", (long long)mMetricId); 128 129 flushIfNeededLocked(dumpTimeNs); 130 if (mPastBuckets.empty()) { 131 return; 132 } 133 134 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 135 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); 136 long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 137 138 for (const auto& pair : mPastBuckets) { 139 const HashableDimensionKey& hashableKey = pair.first; 140 141 VLOG(" dimension key %s", hashableKey.c_str()); 142 long long wrapperToken = 143 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 144 145 // First fill dimension. 146 long long dimensionToken = protoOutput->start( 147 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 148 writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput); 149 protoOutput->end(dimensionToken); 150 151 // Then fill bucket_info (GaugeBucketInfo). 152 for (const auto& bucket : pair.second) { 153 long long bucketInfoToken = protoOutput->start( 154 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 155 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, 156 (long long)bucket.mBucketStartNs); 157 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, 158 (long long)bucket.mBucketEndNs); 159 160 if (!bucket.mGaugeAtoms.empty()) { 161 long long atomsToken = 162 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_ATOM); 163 for (const auto& atom : bucket.mGaugeAtoms) { 164 writeFieldValueTreeToStream(*atom.mFields, protoOutput); 165 } 166 protoOutput->end(atomsToken); 167 168 for (const auto& atom : bucket.mGaugeAtoms) { 169 protoOutput->write(FIELD_TYPE_INT64 | FIELD_COUNT_REPEATED | FIELD_ID_TIMESTAMP, 170 (long long)atom.mTimestamps); 171 } 172 } 173 protoOutput->end(bucketInfoToken); 174 VLOG("\t bucket [%lld - %lld] includes %d atoms.", (long long)bucket.mBucketStartNs, 175 (long long)bucket.mBucketEndNs, (int)bucket.mGaugeAtoms.size()); 176 } 177 protoOutput->end(wrapperToken); 178 } 179 protoOutput->end(protoToken); 180 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); 181 182 mPastBuckets.clear(); 183 mStartTimeNs = mCurrentBucketStartTimeNs; 184 // TODO: Clear mDimensionKeyMap once the report is dumped. 185} 186 187void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, 188 const uint64_t eventTime) { 189 VLOG("Metric %lld onConditionChanged", (long long)mMetricId); 190 flushIfNeededLocked(eventTime); 191 mCondition = conditionMet; 192 193 // Push mode. No need to proactively pull the gauge data. 194 if (mPullTagId == -1) { 195 return; 196 } 197 198 bool triggerPuller = false; 199 switch(mSamplingType) { 200 // When the metric wants to do random sampling and there is already one gauge atom for the 201 // current bucket, do not do it again. 202 case GaugeMetric::RANDOM_ONE_SAMPLE: { 203 triggerPuller = mCondition && mCurrentSlicedBucket->empty(); 204 break; 205 } 206 case GaugeMetric::ALL_CONDITION_CHANGES: { 207 triggerPuller = true; 208 break; 209 } 210 default: 211 break; 212 } 213 if (!triggerPuller) { 214 return; 215 } 216 217 vector<std::shared_ptr<LogEvent>> allData; 218 if (!mStatsPullerManager->Pull(mPullTagId, &allData)) { 219 ALOGE("Stats puller failed for tag: %d", mPullTagId); 220 return; 221 } 222 for (const auto& data : allData) { 223 onMatchedLogEventLocked(0, *data); 224 } 225 flushIfNeededLocked(eventTime); 226} 227 228void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { 229 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); 230} 231 232std::shared_ptr<FieldValueMap> GaugeMetricProducer::getGaugeFields(const LogEvent& event) { 233 std::shared_ptr<FieldValueMap> gaugeFields = 234 std::make_shared<FieldValueMap>(event.getFieldValueMap()); 235 if (!mFieldFilter.include_all()) { 236 filterFields(mFieldFilter.fields(), gaugeFields.get()); 237 } 238 return gaugeFields; 239} 240 241void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 242 std::lock_guard<std::mutex> lock(mMutex); 243 if (allData.size() == 0) { 244 return; 245 } 246 for (const auto& data : allData) { 247 onMatchedLogEventLocked(0, *data); 248 } 249} 250 251bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { 252 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 253 return false; 254 } 255 // 1. Report the tuple count if the tuple count > soft limit 256 if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) { 257 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 258 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 259 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 260 if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) { 261 ALOGE("GaugeMetric %lld dropping data for dimension key %s", 262 (long long)mMetricId, newKey.c_str()); 263 return true; 264 } 265 } 266 267 return false; 268} 269 270void GaugeMetricProducer::onMatchedLogEventInternalLocked( 271 const size_t matcherIndex, const HashableDimensionKey& eventKey, 272 const ConditionKey& conditionKey, bool condition, 273 const LogEvent& event) { 274 if (condition == false) { 275 return; 276 } 277 uint64_t eventTimeNs = event.GetTimestampNs(); 278 if (eventTimeNs < mCurrentBucketStartTimeNs) { 279 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 280 (long long)mCurrentBucketStartTimeNs); 281 return; 282 } 283 flushIfNeededLocked(eventTimeNs); 284 285 // When gauge metric wants to randomly sample the output atom, we just simply use the first 286 // gauge in the given bucket. 287 if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end() && 288 mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { 289 return; 290 } 291 if (hitGuardRailLocked(eventKey)) { 292 return; 293 } 294 GaugeAtom gaugeAtom; 295 gaugeAtom.mFields = getGaugeFields(event); 296 gaugeAtom.mTimestamps = eventTimeNs; 297 (*mCurrentSlicedBucket)[eventKey].push_back(gaugeAtom); 298 // Anomaly detection on gauge metric only works when there is one numeric 299 // field specified. 300 if (mAnomalyTrackers.size() > 0) { 301 if (gaugeAtom.mFields->size() == 1) { 302 const DimensionsValue& dimensionsValue = gaugeAtom.mFields->begin()->second; 303 long gaugeVal = 0; 304 if (dimensionsValue.has_value_int()) { 305 gaugeVal = (long)dimensionsValue.value_int(); 306 } else if (dimensionsValue.has_value_long()) { 307 gaugeVal = dimensionsValue.value_long(); 308 } 309 for (auto& tracker : mAnomalyTrackers) { 310 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, 311 gaugeVal); 312 } 313 } 314 } 315} 316 317void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() { 318 mCurrentSlicedBucketForAnomaly->clear(); 319 status_t err = NO_ERROR; 320 for (const auto& slice : *mCurrentSlicedBucket) { 321 if (slice.second.empty() || slice.second.front().mFields->empty()) { 322 continue; 323 } 324 const DimensionsValue& dimensionsValue = slice.second.front().mFields->begin()->second; 325 long gaugeVal = 0; 326 if (dimensionsValue.has_value_int()) { 327 gaugeVal = (long)dimensionsValue.value_int(); 328 } else if (dimensionsValue.has_value_long()) { 329 gaugeVal = dimensionsValue.value_long(); 330 } 331 (*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal; 332 } 333} 334 335// When a new matched event comes in, we check if event falls into the current 336// bucket. If not, flush the old counter to past buckets and initialize the new 337// bucket. 338// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 339// the GaugeMetricProducer while holding the lock. 340void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { 341 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { 342 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, 343 (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); 344 return; 345 } 346 347 GaugeBucket info; 348 info.mBucketStartNs = mCurrentBucketStartTimeNs; 349 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs; 350 info.mBucketNum = mCurrentBucketNum; 351 352 for (const auto& slice : *mCurrentSlicedBucket) { 353 info.mGaugeAtoms = slice.second; 354 auto& bucketList = mPastBuckets[slice.first]; 355 bucketList.push_back(info); 356 VLOG("gauge metric %lld, dump key value: %s", 357 (long long)mMetricId, slice.first.c_str()); 358 } 359 360 // Reset counters 361 if (mAnomalyTrackers.size() > 0) { 362 updateCurrentSlicedBucketForAnomaly(); 363 for (auto& tracker : mAnomalyTrackers) { 364 tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum); 365 } 366 } 367 368 mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>(); 369 mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>(); 370 371 // Adjusts the bucket start time 372 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; 373 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs; 374 mCurrentBucketNum += numBucketsForward; 375 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, 376 (long long)mCurrentBucketStartTimeNs); 377} 378 379size_t GaugeMetricProducer::byteSizeLocked() const { 380 size_t totalSize = 0; 381 for (const auto& pair : mPastBuckets) { 382 totalSize += pair.second.size() * kBucketSize; 383 } 384 return totalSize; 385} 386 387} // namespace statsd 388} // namespace os 389} // namespace android 390