GaugeMetricProducer.cpp revision 032fefc3b547726e675a112baa8d1b6c550ac192
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG true // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "guardrail/StatsdStats.h" 22#include "stats_util.h" 23 24#include <cutils/log.h> 25#include <limits.h> 26#include <stdlib.h> 27 28using android::util::FIELD_COUNT_REPEATED; 29using android::util::FIELD_TYPE_BOOL; 30using android::util::FIELD_TYPE_FLOAT; 31using android::util::FIELD_TYPE_INT32; 32using android::util::FIELD_TYPE_INT64; 33using android::util::FIELD_TYPE_MESSAGE; 34using android::util::FIELD_TYPE_STRING; 35using android::util::ProtoOutputStream; 36using std::map; 37using std::string; 38using std::unordered_map; 39using std::vector; 40 41namespace android { 42namespace os { 43namespace statsd { 44 45// for StatsLogReport 46const int FIELD_ID_NAME = 1; 47const int FIELD_ID_START_REPORT_NANOS = 2; 48const int FIELD_ID_END_REPORT_NANOS = 3; 49const int FIELD_ID_GAUGE_METRICS = 8; 50// for GaugeMetricDataWrapper 51const int FIELD_ID_DATA = 1; 52// for GaugeMetricData 53const int FIELD_ID_DIMENSION = 1; 54const int FIELD_ID_BUCKET_INFO = 2; 55// for KeyValuePair 56const int FIELD_ID_KEY = 1; 57const int FIELD_ID_VALUE_STR = 2; 58const int FIELD_ID_VALUE_INT = 3; 59const int FIELD_ID_VALUE_BOOL = 4; 60const int FIELD_ID_VALUE_FLOAT = 5; 61// for GaugeBucketInfo 62const int FIELD_ID_START_BUCKET_NANOS = 1; 63const int FIELD_ID_END_BUCKET_NANOS = 2; 64const int FIELD_ID_GAUGE = 3; 65 66GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 67 const int conditionIndex, 68 const sp<ConditionWizard>& wizard, const int pullTagId, 69 const int64_t startTimeNs) 70 : MetricProducer(key, startTimeNs, conditionIndex, wizard), 71 mMetric(metric), 72 mPullTagId(pullTagId) { 73 if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) { 74 mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000; 75 } else { 76 mBucketSizeNs = kDefaultGaugemBucketSizeNs; 77 } 78 79 // TODO: use UidMap if uid->pkg_name is required 80 mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end()); 81 82 if (metric.links().size() > 0) { 83 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(), 84 metric.links().end()); 85 mConditionSliced = true; 86 } 87 88 // Kicks off the puller immediately. 89 if (mPullTagId != -1) { 90 mStatsPullerManager.RegisterReceiver(mPullTagId, this, 91 metric.bucket().bucket_size_millis()); 92 } 93 94 startNewProtoOutputStream(mStartTimeNs); 95 96 VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), 97 (long long)mBucketSizeNs, (long long)mStartTimeNs); 98} 99 100GaugeMetricProducer::~GaugeMetricProducer() { 101 VLOG("~GaugeMetricProducer() called"); 102 if (mPullTagId != -1) { 103 mStatsPullerManager.UnRegisterReceiver(mPullTagId, this); 104 } 105} 106 107void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) { 108 mProto = std::make_unique<ProtoOutputStream>(); 109 mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); 110 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); 111 mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 112} 113 114void GaugeMetricProducer::finish() { 115} 116 117std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() { 118 VLOG("gauge metric %s dump report now...", mMetric.name().c_str()); 119 120 // Dump current bucket if it's stale. 121 // If current bucket is still on-going, don't force dump current bucket. 122 // In finish(), We can force dump current bucket. 123 flushIfNeeded(time(nullptr) * NS_PER_SEC); 124 125 for (const auto& pair : mPastBuckets) { 126 const HashableDimensionKey& hashableKey = pair.first; 127 auto it = mDimensionKeyMap.find(hashableKey); 128 if (it == mDimensionKeyMap.end()) { 129 ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str()); 130 continue; 131 } 132 133 VLOG(" dimension key %s", hashableKey.c_str()); 134 long long wrapperToken = 135 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 136 137 // First fill dimension (KeyValuePairs). 138 for (const auto& kv : it->second) { 139 long long dimensionToken = 140 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); 141 mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); 142 if (kv.has_value_str()) { 143 mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); 144 } else if (kv.has_value_int()) { 145 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); 146 } else if (kv.has_value_bool()) { 147 mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); 148 } else if (kv.has_value_float()) { 149 mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); 150 } 151 mProto->end(dimensionToken); 152 } 153 154 // Then fill bucket_info (GaugeBucketInfo). 155 for (const auto& bucket : pair.second) { 156 long long bucketInfoToken = 157 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 158 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, 159 (long long)bucket.mBucketStartNs); 160 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, 161 (long long)bucket.mBucketEndNs); 162 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge); 163 mProto->end(bucketInfoToken); 164 VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, 165 (long long)bucket.mBucketEndNs, (long long)bucket.mGauge); 166 } 167 mProto->end(wrapperToken); 168 } 169 mProto->end(mProtoToken); 170 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, 171 (long long)mCurrentBucketStartTimeNs); 172 173 std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); 174 175 startNewProtoOutputStream(time(nullptr) * NS_PER_SEC); 176 mPastBuckets.clear(); 177 178 return buffer; 179 180 // TODO: Clear mDimensionKeyMap once the report is dumped. 181} 182 183void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { 184 AutoMutex _l(mLock); 185 VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); 186 flushIfNeeded(eventTime); 187 mCondition = conditionMet; 188 189 // Push mode. No need to proactively pull the gauge data. 190 if (mPullTagId == -1) { 191 return; 192 } 193 if (!mCondition) { 194 return; 195 } 196 // Already have gauge metric for the current bucket, do not do it again. 197 if (mCurrentSlicedBucket->size() > 0) { 198 return; 199 } 200 vector<std::shared_ptr<LogEvent>> allData; 201 if (!mStatsPullerManager.Pull(mPullTagId, &allData)) { 202 ALOGE("Stats puller failed for tag: %d", mPullTagId); 203 return; 204 } 205 for (const auto& data : allData) { 206 onMatchedLogEvent(0, *data, false /*scheduledPull*/); 207 } 208 flushIfNeeded(eventTime); 209} 210 211void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { 212 VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); 213} 214 215int64_t GaugeMetricProducer::getGauge(const LogEvent& event) { 216 status_t err = NO_ERROR; 217 int64_t val = event.GetLong(mMetric.gauge_field(), &err); 218 if (err == NO_ERROR) { 219 return val; 220 } else { 221 VLOG("Can't find value in message."); 222 return -1; 223 } 224} 225 226void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 227 AutoMutex mutex(mLock); 228 for (const auto& data : allData) { 229 onMatchedLogEvent(0, *data, true /*scheduledPull*/); 230 } 231} 232 233bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { 234 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 235 return false; 236 } 237 // 1. Report the tuple count if the tuple count > soft limit 238 if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) { 239 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 240 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetric.name(), 241 newTupleCount); 242 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 243 if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) { 244 ALOGE("GaugeMetric %s dropping data for dimension key %s", mMetric.name().c_str(), 245 newKey.c_str()); 246 return true; 247 } 248 } 249 250 return false; 251} 252 253void GaugeMetricProducer::onMatchedLogEventInternal( 254 const size_t matcherIndex, const HashableDimensionKey& eventKey, 255 const map<string, HashableDimensionKey>& conditionKey, bool condition, 256 const LogEvent& event, bool scheduledPull) { 257 if (condition == false) { 258 return; 259 } 260 uint64_t eventTimeNs = event.GetTimestampNs(); 261 if (eventTimeNs < mCurrentBucketStartTimeNs) { 262 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 263 (long long)mCurrentBucketStartTimeNs); 264 return; 265 } 266 267 // When the event happens in a new bucket, flush the old buckets. 268 if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) { 269 flushIfNeeded(eventTimeNs); 270 } 271 272 // For gauge metric, we just simply use the first gauge in the given bucket. 273 if (!mCurrentSlicedBucket->empty()) { 274 return; 275 } 276 const long gauge = getGauge(event); 277 if (gauge >= 0) { 278 if (hitGuardRail(eventKey)) { 279 return; 280 } 281 (*mCurrentSlicedBucket)[eventKey] = gauge; 282 } 283 for (auto& tracker : mAnomalyTrackers) { 284 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge); 285 } 286} 287 288// When a new matched event comes in, we check if event falls into the current 289// bucket. If not, flush the old counter to past buckets and initialize the new 290// bucket. 291// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 292// the GaugeMetricProducer while holding the lock. 293void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { 294 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { 295 return; 296 } 297 298 GaugeBucket info; 299 info.mBucketStartNs = mCurrentBucketStartTimeNs; 300 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs; 301 info.mBucketNum = mCurrentBucketNum; 302 303 for (const auto& slice : *mCurrentSlicedBucket) { 304 info.mGauge = slice.second; 305 auto& bucketList = mPastBuckets[slice.first]; 306 bucketList.push_back(info); 307 VLOG("gauge metric %s, dump key value: %s -> %lld", mMetric.name().c_str(), 308 slice.first.c_str(), (long long)slice.second); 309 } 310 311 // Reset counters 312 for (auto& tracker : mAnomalyTrackers) { 313 tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum); 314 } 315 316 mCurrentSlicedBucket = std::make_shared<DimToValMap>(); 317 318 // Adjusts the bucket start time 319 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; 320 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs; 321 mCurrentBucketNum += numBucketsForward; 322 VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(), 323 (long long)mCurrentBucketStartTimeNs); 324} 325 326size_t GaugeMetricProducer::byteSize() const { 327 size_t totalSize = 0; 328 for (const auto& pair : mPastBuckets) { 329 totalSize += pair.second.size() * kBucketSize; 330 } 331 return totalSize; 332} 333 334} // namespace statsd 335} // namespace os 336} // namespace android 337