GaugeMetricProducer.cpp revision a7259abde4e89fd91404b14b4845113cd313d1ec
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG true // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "guardrail/StatsdStats.h" 22#include "stats_util.h" 23 24#include <cutils/log.h> 25#include <limits.h> 26#include <stdlib.h> 27 28using android::util::FIELD_COUNT_REPEATED; 29using android::util::FIELD_TYPE_BOOL; 30using android::util::FIELD_TYPE_FLOAT; 31using android::util::FIELD_TYPE_INT32; 32using android::util::FIELD_TYPE_INT64; 33using android::util::FIELD_TYPE_MESSAGE; 34using android::util::FIELD_TYPE_STRING; 35using android::util::ProtoOutputStream; 36using std::map; 37using std::string; 38using std::unordered_map; 39using std::vector; 40 41namespace android { 42namespace os { 43namespace statsd { 44 45// for StatsLogReport 46const int FIELD_ID_NAME = 1; 47const int FIELD_ID_START_REPORT_NANOS = 2; 48const int FIELD_ID_END_REPORT_NANOS = 3; 49const int FIELD_ID_GAUGE_METRICS = 8; 50// for GaugeMetricDataWrapper 51const int FIELD_ID_DATA = 1; 52// for GaugeMetricData 53const int FIELD_ID_DIMENSION = 1; 54const int FIELD_ID_BUCKET_INFO = 2; 55// for KeyValuePair 56const int FIELD_ID_KEY = 1; 57const int FIELD_ID_VALUE_STR = 2; 58const int FIELD_ID_VALUE_INT = 3; 59const int FIELD_ID_VALUE_BOOL = 4; 60const int FIELD_ID_VALUE_FLOAT = 5; 61// for GaugeBucketInfo 62const int FIELD_ID_START_BUCKET_NANOS = 1; 63const int FIELD_ID_END_BUCKET_NANOS = 2; 64const int FIELD_ID_GAUGE = 3; 65 66GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric, 67 const int conditionIndex, 68 const sp<ConditionWizard>& wizard, const int pullTagId, 69 const int64_t startTimeNs) 70 : MetricProducer(key, startTimeNs, conditionIndex, wizard), 71 mMetric(metric), 72 mPullTagId(pullTagId) { 73 if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) { 74 mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000; 75 } else { 76 mBucketSizeNs = kDefaultGaugemBucketSizeNs; 77 } 78 79 // TODO: use UidMap if uid->pkg_name is required 80 mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end()); 81 82 if (metric.links().size() > 0) { 83 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(), 84 metric.links().end()); 85 mConditionSliced = true; 86 } 87 88 // Kicks off the puller immediately. 89 if (mPullTagId != -1) { 90 mStatsPullerManager.RegisterReceiver(mPullTagId, this, 91 metric.bucket().bucket_size_millis()); 92 } 93 94 VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), 95 (long long)mBucketSizeNs, (long long)mStartTimeNs); 96} 97 98GaugeMetricProducer::~GaugeMetricProducer() { 99 VLOG("~GaugeMetricProducer() called"); 100 if (mPullTagId != -1) { 101 mStatsPullerManager.UnRegisterReceiver(mPullTagId, this); 102 } 103} 104 105void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs, 106 ProtoOutputStream* protoOutput) { 107 VLOG("gauge metric %s dump report now...", mMetric.name().c_str()); 108 109 flushIfNeededLocked(dumpTimeNs); 110 111 protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); 112 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs); 113 long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 114 115 for (const auto& pair : mPastBuckets) { 116 const HashableDimensionKey& hashableKey = pair.first; 117 auto it = mDimensionKeyMap.find(hashableKey); 118 if (it == mDimensionKeyMap.end()) { 119 ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str()); 120 continue; 121 } 122 123 VLOG(" dimension key %s", hashableKey.c_str()); 124 long long wrapperToken = 125 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 126 127 // First fill dimension (KeyValuePairs). 128 for (const auto& kv : it->second) { 129 long long dimensionToken = protoOutput->start( 130 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); 131 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); 132 if (kv.has_value_str()) { 133 protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); 134 } else if (kv.has_value_int()) { 135 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); 136 } else if (kv.has_value_bool()) { 137 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); 138 } else if (kv.has_value_float()) { 139 protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); 140 } 141 protoOutput->end(dimensionToken); 142 } 143 144 // Then fill bucket_info (GaugeBucketInfo). 145 for (const auto& bucket : pair.second) { 146 long long bucketInfoToken = protoOutput->start( 147 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 148 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, 149 (long long)bucket.mBucketStartNs); 150 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, 151 (long long)bucket.mBucketEndNs); 152 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge); 153 protoOutput->end(bucketInfoToken); 154 VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, 155 (long long)bucket.mBucketEndNs, (long long)bucket.mGauge); 156 } 157 protoOutput->end(wrapperToken); 158 } 159 protoOutput->end(protoToken); 160 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs); 161 162 mPastBuckets.clear(); 163 mStartTimeNs = mCurrentBucketStartTimeNs; 164 // TODO: Clear mDimensionKeyMap once the report is dumped. 165} 166 167void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, 168 const uint64_t eventTime) { 169 VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); 170 flushIfNeededLocked(eventTime); 171 mCondition = conditionMet; 172 173 // Push mode. No need to proactively pull the gauge data. 174 if (mPullTagId == -1) { 175 return; 176 } 177 if (!mCondition) { 178 return; 179 } 180 // Already have gauge metric for the current bucket, do not do it again. 181 if (mCurrentSlicedBucket->size() > 0) { 182 return; 183 } 184 vector<std::shared_ptr<LogEvent>> allData; 185 if (!mStatsPullerManager.Pull(mPullTagId, &allData)) { 186 ALOGE("Stats puller failed for tag: %d", mPullTagId); 187 return; 188 } 189 for (const auto& data : allData) { 190 onMatchedLogEventLocked(0, *data); 191 } 192 flushIfNeededLocked(eventTime); 193} 194 195void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { 196 VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); 197} 198 199int64_t GaugeMetricProducer::getGauge(const LogEvent& event) { 200 status_t err = NO_ERROR; 201 int64_t val = event.GetLong(mMetric.gauge_field(), &err); 202 if (err == NO_ERROR) { 203 return val; 204 } else { 205 VLOG("Can't find value in message."); 206 return -1; 207 } 208} 209 210void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 211 std::lock_guard<std::mutex> lock(mMutex); 212 213 for (const auto& data : allData) { 214 onMatchedLogEventLocked(0, *data); 215 } 216} 217 218bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { 219 if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { 220 return false; 221 } 222 // 1. Report the tuple count if the tuple count > soft limit 223 if (mCurrentSlicedBucket->size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) { 224 size_t newTupleCount = mCurrentSlicedBucket->size() + 1; 225 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetric.name(), 226 newTupleCount); 227 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. 228 if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) { 229 ALOGE("GaugeMetric %s dropping data for dimension key %s", mMetric.name().c_str(), 230 newKey.c_str()); 231 return true; 232 } 233 } 234 235 return false; 236} 237 238void GaugeMetricProducer::onMatchedLogEventInternalLocked( 239 const size_t matcherIndex, const HashableDimensionKey& eventKey, 240 const map<string, HashableDimensionKey>& conditionKey, bool condition, 241 const LogEvent& event) { 242 if (condition == false) { 243 return; 244 } 245 uint64_t eventTimeNs = event.GetTimestampNs(); 246 if (eventTimeNs < mCurrentBucketStartTimeNs) { 247 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 248 (long long)mCurrentBucketStartTimeNs); 249 return; 250 } 251 252 // When the event happens in a new bucket, flush the old buckets. 253 if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) { 254 flushIfNeededLocked(eventTimeNs); 255 } 256 257 // For gauge metric, we just simply use the first gauge in the given bucket. 258 if (!mCurrentSlicedBucket->empty()) { 259 return; 260 } 261 const long gauge = getGauge(event); 262 if (gauge >= 0) { 263 if (hitGuardRailLocked(eventKey)) { 264 return; 265 } 266 (*mCurrentSlicedBucket)[eventKey] = gauge; 267 } 268 for (auto& tracker : mAnomalyTrackers) { 269 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge); 270 } 271} 272 273// When a new matched event comes in, we check if event falls into the current 274// bucket. If not, flush the old counter to past buckets and initialize the new 275// bucket. 276// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 277// the GaugeMetricProducer while holding the lock. 278void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { 279 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { 280 return; 281 } 282 283 GaugeBucket info; 284 info.mBucketStartNs = mCurrentBucketStartTimeNs; 285 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs; 286 info.mBucketNum = mCurrentBucketNum; 287 288 for (const auto& slice : *mCurrentSlicedBucket) { 289 info.mGauge = slice.second; 290 auto& bucketList = mPastBuckets[slice.first]; 291 bucketList.push_back(info); 292 VLOG("gauge metric %s, dump key value: %s -> %lld", mMetric.name().c_str(), 293 slice.first.c_str(), (long long)slice.second); 294 } 295 296 // Reset counters 297 for (auto& tracker : mAnomalyTrackers) { 298 tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum); 299 } 300 301 mCurrentSlicedBucket = std::make_shared<DimToValMap>(); 302 303 // Adjusts the bucket start time 304 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; 305 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs; 306 mCurrentBucketNum += numBucketsForward; 307 VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(), 308 (long long)mCurrentBucketStartTimeNs); 309} 310 311size_t GaugeMetricProducer::byteSizeLocked() const { 312 size_t totalSize = 0; 313 for (const auto& pair : mPastBuckets) { 314 totalSize += pair.second.size() * kBucketSize; 315 } 316 return totalSize; 317} 318 319} // namespace statsd 320} // namespace os 321} // namespace android 322