1/* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#define DEBUG false // STOPSHIP if true 18#include "Log.h" 19 20#include "ValueMetricProducer.h" 21#include "../guardrail/StatsdStats.h" 22#include "../stats_log_util.h" 23 24#include <cutils/log.h> 25#include <limits.h> 26#include <stdlib.h> 27 28using android::util::FIELD_COUNT_REPEATED; 29using android::util::FIELD_TYPE_BOOL; 30using android::util::FIELD_TYPE_FLOAT; 31using android::util::FIELD_TYPE_INT32; 32using android::util::FIELD_TYPE_INT64; 33using android::util::FIELD_TYPE_MESSAGE; 34using android::util::FIELD_TYPE_STRING; 35using android::util::ProtoOutputStream; 36using std::list; 37using std::make_pair; 38using std::make_shared; 39using std::map; 40using std::shared_ptr; 41using std::unique_ptr; 42using std::unordered_map; 43 44namespace android { 45namespace os { 46namespace statsd { 47 48// for StatsLogReport 49const int FIELD_ID_ID = 1; 50const int FIELD_ID_VALUE_METRICS = 7; 51const int FIELD_ID_TIME_BASE = 9; 52const int FIELD_ID_BUCKET_SIZE = 10; 53const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11; 54const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12; 55// for ValueMetricDataWrapper 56const int FIELD_ID_DATA = 1; 57const int FIELD_ID_SKIPPED = 2; 58const int FIELD_ID_SKIPPED_START_MILLIS = 3; 59const int FIELD_ID_SKIPPED_END_MILLIS = 4; 60// for ValueMetricData 61const int FIELD_ID_DIMENSION_IN_WHAT = 1; 62const int FIELD_ID_DIMENSION_IN_CONDITION = 2; 63const int FIELD_ID_BUCKET_INFO = 3; 64const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4; 65const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5; 66// for ValueBucketInfo 67const int FIELD_ID_VALUE = 3; 68const int FIELD_ID_BUCKET_NUM = 4; 69const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; 70const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; 71 72// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently 73ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric, 74 const int conditionIndex, 75 const sp<ConditionWizard>& wizard, const int pullTagId, 76 const int64_t timeBaseNs, const int64_t startTimestampNs, 77 shared_ptr<StatsPullerManager> statsPullerManager) 78 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), 79 mValueField(metric.value_field()), 80 mStatsPullerManager(statsPullerManager), 81 mPullTagId(pullTagId), 82 mMinBucketSizeNs(metric.min_bucket_size_nanos()), 83 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 84 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 85 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first 86 : StatsdStats::kDimensionKeySizeSoftLimit), 87 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != 88 StatsdStats::kAtomDimensionKeySizeLimitMap.end() 89 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second 90 : StatsdStats::kDimensionKeySizeHardLimit), 91 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) { 92 // TODO: valuemetric for pushed events may need unlimited bucket length 93 int64_t bucketSizeMills = 0; 94 if (metric.has_bucket()) { 95 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); 96 } else { 97 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR); 98 } 99 100 mBucketSizeNs = bucketSizeMills * 1000000; 101 if (metric.has_dimensions_in_what()) { 102 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat); 103 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what()); 104 } 105 106 if (metric.has_dimensions_in_condition()) { 107 translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition); 108 } 109 110 if (metric.links().size() > 0) { 111 for (const auto& link : metric.links()) { 112 Metric2Condition mc; 113 mc.conditionId = link.condition(); 114 translateFieldMatcher(link.fields_in_what(), &mc.metricFields); 115 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields); 116 mMetric2ConditionLinks.push_back(mc); 117 } 118 } 119 120 if (mValueField.child_size() > 0) { 121 mField = mValueField.child(0).field(); 122 } 123 mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); 124 mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || 125 HasPositionALL(metric.dimensions_in_condition()); 126 127 // Kicks off the puller immediately. 128 flushIfNeededLocked(startTimestampNs); 129 if (mPullTagId != -1) { 130 mStatsPullerManager->RegisterReceiver( 131 mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs); 132 } 133 134 VLOG("value metric %lld created. bucket size %lld start_time: %lld", 135 (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); 136} 137 138// for testing 139ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric& metric, 140 const int conditionIndex, 141 const sp<ConditionWizard>& wizard, const int pullTagId, 142 const int64_t timeBaseNs, const int64_t startTimeNs) 143 : ValueMetricProducer(key, metric, conditionIndex, wizard, pullTagId, timeBaseNs, startTimeNs, 144 make_shared<StatsPullerManager>()) { 145} 146 147ValueMetricProducer::~ValueMetricProducer() { 148 VLOG("~ValueMetricProducer() called"); 149 if (mPullTagId != -1) { 150 mStatsPullerManager->UnRegisterReceiver(mPullTagId, this); 151 } 152} 153 154void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition, 155 const int64_t eventTime) { 156 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId); 157} 158 159void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { 160 flushIfNeededLocked(dropTimeNs); 161 mPastBuckets.clear(); 162} 163 164void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { 165 flushIfNeededLocked(dumpTimeNs); 166 mPastBuckets.clear(); 167 mSkippedBuckets.clear(); 168} 169 170void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, 171 const bool include_current_partial_bucket, 172 std::set<string> *str_set, 173 ProtoOutputStream* protoOutput) { 174 VLOG("metric %lld dump report now...", (long long)mMetricId); 175 if (include_current_partial_bucket) { 176 flushLocked(dumpTimeNs); 177 } else { 178 flushIfNeededLocked(dumpTimeNs); 179 } 180 if (mPastBuckets.empty() && mSkippedBuckets.empty()) { 181 return; 182 } 183 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); 184 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs); 185 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs); 186 // Fills the dimension path if not slicing by ALL. 187 if (!mSliceByPositionALL) { 188 if (!mDimensionsInWhat.empty()) { 189 uint64_t dimenPathToken = protoOutput->start( 190 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); 191 writeDimensionPathToProto(mDimensionsInWhat, protoOutput); 192 protoOutput->end(dimenPathToken); 193 } 194 if (!mDimensionsInCondition.empty()) { 195 uint64_t dimenPathToken = protoOutput->start( 196 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); 197 writeDimensionPathToProto(mDimensionsInCondition, protoOutput); 198 protoOutput->end(dimenPathToken); 199 } 200 } 201 202 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); 203 204 for (const auto& pair : mSkippedBuckets) { 205 uint64_t wrapperToken = 206 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); 207 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS, 208 (long long)(NanoToMillis(pair.first))); 209 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS, 210 (long long)(NanoToMillis(pair.second))); 211 protoOutput->end(wrapperToken); 212 } 213 mSkippedBuckets.clear(); 214 215 for (const auto& pair : mPastBuckets) { 216 const MetricDimensionKey& dimensionKey = pair.first; 217 VLOG(" dimension key %s", dimensionKey.toString().c_str()); 218 uint64_t wrapperToken = 219 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 220 221 // First fill dimension. 222 if (mSliceByPositionALL) { 223 uint64_t dimensionToken = protoOutput->start( 224 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); 225 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); 226 protoOutput->end(dimensionToken); 227 if (dimensionKey.hasDimensionKeyInCondition()) { 228 uint64_t dimensionInConditionToken = protoOutput->start( 229 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); 230 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), 231 str_set, protoOutput); 232 protoOutput->end(dimensionInConditionToken); 233 } 234 } else { 235 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(), 236 FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); 237 if (dimensionKey.hasDimensionKeyInCondition()) { 238 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), 239 FIELD_ID_DIMENSION_LEAF_IN_CONDITION, 240 str_set, protoOutput); 241 } 242 } 243 244 // Then fill bucket_info (ValueBucketInfo). 245 for (const auto& bucket : pair.second) { 246 uint64_t bucketInfoToken = protoOutput->start( 247 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 248 249 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) { 250 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS, 251 (long long)NanoToMillis(bucket.mBucketStartNs)); 252 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS, 253 (long long)NanoToMillis(bucket.mBucketEndNs)); 254 } else { 255 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, 256 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); 257 } 258 259 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue); 260 protoOutput->end(bucketInfoToken); 261 VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, 262 (long long)bucket.mBucketEndNs, (long long)bucket.mValue); 263 } 264 protoOutput->end(wrapperToken); 265 } 266 protoOutput->end(protoToken); 267 268 VLOG("metric %lld dump report now...", (long long)mMetricId); 269 mPastBuckets.clear(); 270} 271 272void ValueMetricProducer::onConditionChangedLocked(const bool condition, 273 const int64_t eventTimeNs) { 274 mCondition = condition; 275 276 if (eventTimeNs < mCurrentBucketStartTimeNs) { 277 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 278 (long long)mCurrentBucketStartTimeNs); 279 return; 280 } 281 282 flushIfNeededLocked(eventTimeNs); 283 284 if (mPullTagId != -1) { 285 vector<shared_ptr<LogEvent>> allData; 286 if (mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) { 287 if (allData.size() == 0) { 288 return; 289 } 290 for (const auto& data : allData) { 291 onMatchedLogEventLocked(0, *data); 292 } 293 } 294 return; 295 } 296} 297 298void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 299 std::lock_guard<std::mutex> lock(mMutex); 300 301 if (mCondition == true || mConditionTrackerIndex < 0) { 302 if (allData.size() == 0) { 303 return; 304 } 305 // For scheduled pulled data, the effective event time is snap to the nearest 306 // bucket boundary to make bucket finalize. 307 int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); 308 int64_t eventTime = mTimeBaseNs + 309 ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; 310 311 mCondition = false; 312 for (const auto& data : allData) { 313 data->setElapsedTimestampNs(eventTime - 1); 314 onMatchedLogEventLocked(0, *data); 315 } 316 317 mCondition = true; 318 for (const auto& data : allData) { 319 data->setElapsedTimestampNs(eventTime); 320 onMatchedLogEventLocked(0, *data); 321 } 322 } 323} 324 325void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { 326 if (mCurrentSlicedBucket.size() == 0) { 327 return; 328 } 329 330 fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId, 331 (unsigned long)mCurrentSlicedBucket.size()); 332 if (verbose) { 333 for (const auto& it : mCurrentSlicedBucket) { 334 fprintf(out, "\t(what)%s\t(condition)%s (value)%lld\n", 335 it.first.getDimensionKeyInWhat().toString().c_str(), 336 it.first.getDimensionKeyInCondition().toString().c_str(), 337 (unsigned long long)it.second.sum); 338 } 339 } 340} 341 342bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { 343 // ===========GuardRail============== 344 // 1. Report the tuple count if the tuple count > soft limit 345 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) { 346 return false; 347 } 348 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) { 349 size_t newTupleCount = mCurrentSlicedBucket.size() + 1; 350 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); 351 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 352 if (newTupleCount > mDimensionHardLimit) { 353 ALOGE("ValueMetric %lld dropping data for dimension key %s", 354 (long long)mMetricId, newKey.toString().c_str()); 355 return true; 356 } 357 } 358 359 return false; 360} 361 362void ValueMetricProducer::onMatchedLogEventInternalLocked( 363 const size_t matcherIndex, const MetricDimensionKey& eventKey, 364 const ConditionKey& conditionKey, bool condition, 365 const LogEvent& event) { 366 int64_t eventTimeNs = event.GetElapsedTimestampNs(); 367 if (eventTimeNs < mCurrentBucketStartTimeNs) { 368 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 369 (long long)mCurrentBucketStartTimeNs); 370 return; 371 } 372 373 flushIfNeededLocked(eventTimeNs); 374 375 if (hitGuardRailLocked(eventKey)) { 376 return; 377 } 378 Interval& interval = mCurrentSlicedBucket[eventKey]; 379 380 int error = 0; 381 const int64_t value = event.GetLong(mField, &error); 382 if (error < 0) { 383 return; 384 } 385 386 if (mPullTagId != -1) { // for pulled events 387 if (mCondition == true) { 388 if (!interval.startUpdated) { 389 interval.start = value; 390 interval.startUpdated = true; 391 } else { 392 // skip it if there is already value recorded for the start 393 VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); 394 } 395 } else { 396 // Generally we expect value to be monotonically increasing. 397 // If not, take absolute value or drop it, based on config. 398 if (interval.startUpdated) { 399 if (value >= interval.start) { 400 interval.sum += (value - interval.start); 401 interval.hasValue = true; 402 } else { 403 if (mUseAbsoluteValueOnReset) { 404 interval.sum += value; 405 interval.hasValue = true; 406 } else { 407 VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId, 408 (long long)interval.start, (long long)value); 409 } 410 } 411 interval.startUpdated = false; 412 } else { 413 VLOG("No start for matching end %lld", (long long)value); 414 interval.tainted += 1; 415 } 416 } 417 } else { // for pushed events, only accumulate when condition is true 418 if (mCondition == true || mConditionTrackerIndex < 0) { 419 interval.sum += value; 420 interval.hasValue = true; 421 } 422 } 423 424 long wholeBucketVal = interval.sum; 425 auto prev = mCurrentFullBucket.find(eventKey); 426 if (prev != mCurrentFullBucket.end()) { 427 wholeBucketVal += prev->second; 428 } 429 for (auto& tracker : mAnomalyTrackers) { 430 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, wholeBucketVal); 431 } 432} 433 434void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { 435 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); 436 437 if (eventTimeNs < currentBucketEndTimeNs) { 438 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, 439 (long long)(currentBucketEndTimeNs)); 440 return; 441 } 442 443 flushCurrentBucketLocked(eventTimeNs); 444 445 int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; 446 mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; 447 mCurrentBucketNum += numBucketsForward; 448 449 if (numBucketsForward > 1) { 450 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); 451 } 452 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, 453 (long long)mCurrentBucketStartTimeNs); 454} 455 456void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { 457 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, 458 (int)mCurrentSlicedBucket.size()); 459 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); 460 461 ValueBucket info; 462 info.mBucketStartNs = mCurrentBucketStartTimeNs; 463 if (eventTimeNs < fullBucketEndTimeNs) { 464 info.mBucketEndNs = eventTimeNs; 465 } else { 466 info.mBucketEndNs = fullBucketEndTimeNs; 467 } 468 469 if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { 470 // The current bucket is large enough to keep. 471 int tainted = 0; 472 for (const auto& slice : mCurrentSlicedBucket) { 473 tainted += slice.second.tainted; 474 tainted += slice.second.startUpdated; 475 if (slice.second.hasValue) { 476 info.mValue = slice.second.sum; 477 // it will auto create new vector of ValuebucketInfo if the key is not found. 478 auto& bucketList = mPastBuckets[slice.first]; 479 bucketList.push_back(info); 480 } 481 } 482 VLOG("%d tainted pairs in the bucket", tainted); 483 } else { 484 mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); 485 } 486 487 if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. 488 // Accumulate partial buckets with current value and then send to anomaly tracker. 489 if (mCurrentFullBucket.size() > 0) { 490 for (const auto& slice : mCurrentSlicedBucket) { 491 mCurrentFullBucket[slice.first] += slice.second.sum; 492 } 493 for (const auto& slice : mCurrentFullBucket) { 494 for (auto& tracker : mAnomalyTrackers) { 495 if (tracker != nullptr) { 496 tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum); 497 } 498 } 499 } 500 mCurrentFullBucket.clear(); 501 } else { 502 // Skip aggregating the partial buckets since there's no previous partial bucket. 503 for (const auto& slice : mCurrentSlicedBucket) { 504 for (auto& tracker : mAnomalyTrackers) { 505 if (tracker != nullptr) { 506 tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum); 507 } 508 } 509 } 510 } 511 } else { 512 // Accumulate partial bucket. 513 for (const auto& slice : mCurrentSlicedBucket) { 514 mCurrentFullBucket[slice.first] += slice.second.sum; 515 } 516 } 517 518 // Reset counters 519 mCurrentSlicedBucket.clear(); 520} 521 522size_t ValueMetricProducer::byteSizeLocked() const { 523 size_t totalSize = 0; 524 for (const auto& pair : mPastBuckets) { 525 totalSize += pair.second.size() * kBucketSize; 526 } 527 return totalSize; 528} 529 530} // namespace statsd 531} // namespace os 532} // namespace android 533