GaugeMetricProducer.cpp revision 6a8c799d901cbd166aa6463d7dea231bcf594a1e
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG true // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "guardrail/StatsdStats.h" 22#include "stats_util.h" 23 24#include <cutils/log.h> 25#include <limits.h> 26#include <stdlib.h> 27 28using android::util::FIELD_COUNT_REPEATED; 29using android::util::FIELD_TYPE_BOOL; 30using android::util::FIELD_TYPE_FLOAT; 31using android::util::FIELD_TYPE_INT32; 32using android::util::FIELD_TYPE_INT64; 33using android::util::FIELD_TYPE_MESSAGE; 34using android::util::FIELD_TYPE_STRING; 35using android::util::ProtoOutputStream; 36using std::map; 37using std::string; 38using std::unordered_map; 39using std::vector; 40 41namespace android { 42namespace os { 43namespace statsd { 44 45// for StatsLogReport 46const int FIELD_ID_NAME = 1; 47const int FIELD_ID_START_REPORT_NANOS = 2; 48const int FIELD_ID_END_REPORT_NANOS = 3; 49const int FIELD_ID_GAUGE_METRICS = 8; 50// for GaugeMetricDataWrapper 51const int FIELD_ID_DATA = 1; 52// for GaugeMetricData 53const int FIELD_ID_DIMENSION = 1; 54const int FIELD_ID_BUCKET_INFO = 2; 55// for KeyValuePair 56const int FIELD_ID_KEY = 1; 57const int FIELD_ID_VALUE_STR = 2; 58const int FIELD_ID_VALUE_INT = 3; 59const int FIELD_ID_VALUE_BOOL = 4; 60const int FIELD_ID_VALUE_FLOAT = 5; 61// for GaugeBucketInfo 62const int FIELD_ID_START_BUCKET_NANOS = 1; 63const int FIELD_ID_END_BUCKET_NANOS = 2; 64const int FIELD_ID_GAUGE = 3; 65 66GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 67 const int conditionIndex, 68 const sp<ConditionWizard>& wizard, const int pullTagId, 69 const int64_t startTimeNs) 70 : MetricProducer(key, startTimeNs, conditionIndex, wizard), 71 mMetric(metric), 72 mPullTagId(pullTagId) { 73 if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) { 74 mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000; 75 } else { 76 mBucketSizeNs = kDefaultGaugemBucketSizeNs; 77 } 78 79 // TODO: use UidMap if uid->pkg_name is required 80 mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end()); 81 82 if (metric.links().size() > 0) { 83 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(), 84 metric.links().end()); 85 mConditionSliced = true; 86 } 87 88 // Kicks off the puller immediately. 89 if (mPullTagId != -1) { 90 mStatsPullerManager.RegisterReceiver(mPullTagId, this, 91 metric.bucket().bucket_size_millis()); 92 } 93 94 startNewProtoOutputStream(mStartTimeNs); 95 96 VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), 97 (long long)mBucketSizeNs, (long long)mStartTimeNs); 98} 99 100GaugeMetricProducer::~GaugeMetricProducer() { 101 VLOG("~GaugeMetricProducer() called"); 102} 103 104void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) { 105 mProto = std::make_unique<ProtoOutputStream>(); 106 mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); 107 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); 108 mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 109} 110 111void GaugeMetricProducer::finish() { 112} 113 114std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() { 115 VLOG("gauge metric %s dump report now...", mMetric.name().c_str()); 116 117 // Dump current bucket if it's stale. 118 // If current bucket is still on-going, don't force dump current bucket. 119 // In finish(), We can force dump current bucket. 120 flushIfNeeded(time(nullptr) * NS_PER_SEC); 121 122 for (const auto& pair : mPastBuckets) { 123 const HashableDimensionKey& hashableKey = pair.first; 124 auto it = mDimensionKeyMap.find(hashableKey); 125 if (it == mDimensionKeyMap.end()) { 126 ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str()); 127 continue; 128 } 129 130 VLOG(" dimension key %s", hashableKey.c_str()); 131 long long wrapperToken = 132 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 133 134 // First fill dimension (KeyValuePairs). 135 for (const auto& kv : it->second) { 136 long long dimensionToken = 137 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); 138 mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); 139 if (kv.has_value_str()) { 140 mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); 141 } else if (kv.has_value_int()) { 142 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); 143 } else if (kv.has_value_bool()) { 144 mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); 145 } else if (kv.has_value_float()) { 146 mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); 147 } 148 mProto->end(dimensionToken); 149 } 150 151 // Then fill bucket_info (GaugeBucketInfo). 152 for (const auto& bucket : pair.second) { 153 long long bucketInfoToken = 154 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 155 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, 156 (long long)bucket.mBucketStartNs); 157 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, 158 (long long)bucket.mBucketEndNs); 159 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge); 160 mProto->end(bucketInfoToken); 161 VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, 162 (long long)bucket.mBucketEndNs, (long long)bucket.mGauge); 163 } 164 mProto->end(wrapperToken); 165 } 166 mProto->end(mProtoToken); 167 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, 168 (long long)mCurrentBucketStartTimeNs); 169 170 std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); 171 172 startNewProtoOutputStream(time(nullptr) * NS_PER_SEC); 173 mPastBuckets.clear(); 174 175 return buffer; 176 177 // TODO: Clear mDimensionKeyMap once the report is dumped. 178} 179 180void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { 181 AutoMutex _l(mLock); 182 VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); 183 flushIfNeeded(eventTime); 184 mCondition = conditionMet; 185 186 // Push mode. No need to proactively pull the gauge data. 187 if (mPullTagId == -1) { 188 return; 189 } 190 if (!mCondition) { 191 return; 192 } 193 // Already have gauge metric for the current bucket, do not do it again. 194 if (mCurrentSlicedBucket->size() > 0) { 195 return; 196 } 197 vector<std::shared_ptr<LogEvent>> allData; 198 if (!mStatsPullerManager.Pull(mPullTagId, &allData)) { 199 ALOGE("Stats puller failed for tag: %d", mPullTagId); 200 return; 201 } 202 for (const auto& data : allData) { 203 onMatchedLogEvent(0, *data, false /*scheduledPull*/); 204 } 205 flushIfNeeded(eventTime); 206} 207 208void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { 209 VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); 210} 211 212int64_t GaugeMetricProducer::getGauge(const LogEvent& event) { 213 status_t err = NO_ERROR; 214 int64_t val = event.GetLong(mMetric.gauge_field(), &err); 215 if (err == NO_ERROR) { 216 return val; 217 } else { 218 VLOG("Can't find value in message."); 219 return -1; 220 } 221} 222 223void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 224 AutoMutex mutex(mLock); 225 for (const auto& data : allData) { 226 onMatchedLogEvent(0, *data, true /*scheduledPull*/); 227 } 228} 229 230bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { 231 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 232 return false; 233 } 234 // 1. Report the tuple count if the tuple count > soft limit 235 if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) { 236 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 237 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetric.name(), 238 newTupleCount); 239 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 240 if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) { 241 ALOGE("GaugeMetric %s dropping data for dimension key %s", mMetric.name().c_str(), 242 newKey.c_str()); 243 return true; 244 } 245 } 246 247 return false; 248} 249 250void GaugeMetricProducer::onMatchedLogEventInternal( 251 const size_t matcherIndex, const HashableDimensionKey& eventKey, 252 const map<string, HashableDimensionKey>& conditionKey, bool condition, 253 const LogEvent& event, bool scheduledPull) { 254 if (condition == false) { 255 return; 256 } 257 uint64_t eventTimeNs = event.GetTimestampNs(); 258 if (eventTimeNs < mCurrentBucketStartTimeNs) { 259 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 260 (long long)mCurrentBucketStartTimeNs); 261 return; 262 } 263 264 // When the event happens in a new bucket, flush the old buckets. 265 if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) { 266 flushIfNeeded(eventTimeNs); 267 } 268 269 // For gauge metric, we just simply use the first gauge in the given bucket. 270 if (!mCurrentSlicedBucket->empty()) { 271 return; 272 } 273 const long gauge = getGauge(event); 274 if (gauge >= 0) { 275 if (hitGuardRail(eventKey)) { 276 return; 277 } 278 (*mCurrentSlicedBucket)[eventKey] = gauge; 279 } 280 for (auto& tracker : mAnomalyTrackers) { 281 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge); 282 } 283} 284 285// When a new matched event comes in, we check if event falls into the current 286// bucket. If not, flush the old counter to past buckets and initialize the new 287// bucket. 288// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 289// the GaugeMetricProducer while holding the lock. 290void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { 291 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { 292 return; 293 } 294 295 GaugeBucket info; 296 info.mBucketStartNs = mCurrentBucketStartTimeNs; 297 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs; 298 info.mBucketNum = mCurrentBucketNum; 299 300 for (const auto& slice : *mCurrentSlicedBucket) { 301 info.mGauge = slice.second; 302 auto& bucketList = mPastBuckets[slice.first]; 303 bucketList.push_back(info); 304 VLOG("gauge metric %s, dump key value: %s -> %lld", mMetric.name().c_str(), 305 slice.first.c_str(), (long long)slice.second); 306 } 307 308 // Reset counters 309 for (auto& tracker : mAnomalyTrackers) { 310 tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum); 311 } 312 313 mCurrentSlicedBucket = std::make_shared<DimToValMap>(); 314 315 // Adjusts the bucket start time 316 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; 317 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs; 318 mCurrentBucketNum += numBucketsForward; 319 VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(), 320 (long long)mCurrentBucketStartTimeNs); 321} 322 323size_t GaugeMetricProducer::byteSize() const { 324 size_t totalSize = 0; 325 for (const auto& pair : mPastBuckets) { 326 totalSize += pair.second.size() * kBucketSize; 327 } 328 return totalSize; 329} 330 331} // namespace statsd 332} // namespace os 333} // namespace android 334