GaugeMetricProducer.cpp revision 7c334a129e93e405a72e8299a1cd928af079d14f
1/* 2* Copyright (C) 2017 The Android Open Source Project 3* 4* Licensed under the Apache License, Version 2.0 (the "License"); 5* you may not use this file except in compliance with the License. 6* You may obtain a copy of the License at 7* 8* http://www.apache.org/licenses/LICENSE-2.0 9* 10* Unless required by applicable law or agreed to in writing, software 11* distributed under the License is distributed on an "AS IS" BASIS, 12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13* See the License for the specific language governing permissions and 14* limitations under the License. 15*/ 16 17#define DEBUG true // STOPSHIP if true 18#include "Log.h" 19 20#include "GaugeMetricProducer.h" 21#include "stats_util.h" 22 23#include <cutils/log.h> 24#include <limits.h> 25#include <stdlib.h> 26 27using android::util::FIELD_COUNT_REPEATED; 28using android::util::FIELD_TYPE_BOOL; 29using android::util::FIELD_TYPE_FLOAT; 30using android::util::FIELD_TYPE_INT32; 31using android::util::FIELD_TYPE_INT64; 32using android::util::FIELD_TYPE_MESSAGE; 33using android::util::FIELD_TYPE_STRING; 34using android::util::ProtoOutputStream; 35using std::map; 36using std::string; 37using std::unordered_map; 38using std::vector; 39 40namespace android { 41namespace os { 42namespace statsd { 43 44// for StatsLogReport 45const int FIELD_ID_NAME = 1; 46const int FIELD_ID_START_REPORT_NANOS = 2; 47const int FIELD_ID_END_REPORT_NANOS = 3; 48const int FIELD_ID_GAUGE_METRICS = 8; 49// for GaugeMetricDataWrapper 50const int FIELD_ID_DATA = 1; 51// for GaugeMetricData 52const int FIELD_ID_DIMENSION = 1; 53const int FIELD_ID_BUCKET_INFO = 2; 54// for KeyValuePair 55const int FIELD_ID_KEY = 1; 56const int FIELD_ID_VALUE_STR = 2; 57const int FIELD_ID_VALUE_INT = 3; 58const int FIELD_ID_VALUE_BOOL = 4; 59const int FIELD_ID_VALUE_FLOAT = 5; 60// for GaugeBucketInfo 61const int FIELD_ID_START_BUCKET_NANOS = 1; 62const int FIELD_ID_END_BUCKET_NANOS = 2; 63const int FIELD_ID_GAUGE = 3; 64 65GaugeMetricProducer::GaugeMetricProducer(const GaugeMetric& metric, const int conditionIndex, 66 const sp<ConditionWizard>& wizard, const int pullTagId, 67 const int64_t startTimeNs) 68 : MetricProducer(startTimeNs, conditionIndex, wizard), 69 mMetric(metric), 70 mPullTagId(pullTagId) { 71 if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) { 72 mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000; 73 } else { 74 mBucketSizeNs = kDefaultGaugemBucketSizeNs; 75 } 76 77 // TODO: use UidMap if uid->pkg_name is required 78 mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end()); 79 80 if (metric.links().size() > 0) { 81 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(), 82 metric.links().end()); 83 mConditionSliced = true; 84 } 85 86 // Kicks off the puller immediately. 87 if (mPullTagId != -1) { 88 mStatsPullerManager.RegisterReceiver(mPullTagId, this, 89 metric.bucket().bucket_size_millis()); 90 } 91 92 startNewProtoOutputStream(mStartTimeNs); 93 94 VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), 95 (long long)mBucketSizeNs, (long long)mStartTimeNs); 96} 97 98GaugeMetricProducer::~GaugeMetricProducer() { 99 VLOG("~GaugeMetricProducer() called"); 100} 101 102void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) { 103 mProto = std::make_unique<ProtoOutputStream>(); 104 mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); 105 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); 106 mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); 107} 108 109void GaugeMetricProducer::finish() { 110} 111 112std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() { 113 VLOG("gauge metric %s dump report now...", mMetric.name().c_str()); 114 115 // Dump current bucket if it's stale. 116 // If current bucket is still on-going, don't force dump current bucket. 117 // In finish(), We can force dump current bucket. 118 flushIfNeeded(time(nullptr) * NS_PER_SEC); 119 120 for (const auto& pair : mPastBuckets) { 121 const HashableDimensionKey& hashableKey = pair.first; 122 auto it = mDimensionKeyMap.find(hashableKey); 123 if (it == mDimensionKeyMap.end()) { 124 ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str()); 125 continue; 126 } 127 128 VLOG(" dimension key %s", hashableKey.c_str()); 129 long long wrapperToken = 130 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA); 131 132 // First fill dimension (KeyValuePairs). 133 for (const auto& kv : it->second) { 134 long long dimensionToken = 135 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION); 136 mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key()); 137 if (kv.has_value_str()) { 138 mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str()); 139 } else if (kv.has_value_int()) { 140 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int()); 141 } else if (kv.has_value_bool()) { 142 mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool()); 143 } else if (kv.has_value_float()) { 144 mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float()); 145 } 146 mProto->end(dimensionToken); 147 } 148 149 // Then fill bucket_info (GaugeBucketInfo). 150 for (const auto& bucket : pair.second) { 151 long long bucketInfoToken = 152 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO); 153 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS, 154 (long long)bucket.mBucketStartNs); 155 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS, 156 (long long)bucket.mBucketEndNs); 157 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge); 158 mProto->end(bucketInfoToken); 159 VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, 160 (long long)bucket.mBucketEndNs, (long long)bucket.mGauge); 161 } 162 mProto->end(wrapperToken); 163 } 164 mProto->end(mProtoToken); 165 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, 166 (long long)mCurrentBucketStartTimeNs); 167 168 std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto(); 169 170 startNewProtoOutputStream(time(nullptr) * NS_PER_SEC); 171 mPastBuckets.clear(); 172 173 return buffer; 174 175 // TODO: Clear mDimensionKeyMap once the report is dumped. 176} 177 178void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { 179 AutoMutex _l(mLock); 180 VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); 181 flushIfNeeded(eventTime); 182 mCondition = conditionMet; 183 184 // Push mode. No need to proactively pull the gauge data. 185 if (mPullTagId == -1) { 186 return; 187 } 188 if (!mCondition) { 189 return; 190 } 191 // Already have gauge metric for the current bucket, do not do it again. 192 if (mCurrentSlicedBucket->size() > 0) { 193 return; 194 } 195 vector<std::shared_ptr<LogEvent>> allData; 196 if (!mStatsPullerManager.Pull(mPullTagId, &allData)) { 197 ALOGE("Stats puller failed for tag: %d", mPullTagId); 198 return; 199 } 200 for (const auto& data : allData) { 201 onMatchedLogEvent(0, *data, false /*scheduledPull*/); 202 } 203 flushIfNeeded(eventTime); 204} 205 206void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { 207 VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); 208} 209 210int64_t GaugeMetricProducer::getGauge(const LogEvent& event) { 211 status_t err = NO_ERROR; 212 int64_t val = event.GetLong(mMetric.gauge_field(), &err); 213 if (err == NO_ERROR) { 214 return val; 215 } else { 216 VLOG("Can't find value in message."); 217 return -1; 218 } 219} 220 221void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { 222 AutoMutex mutex(mLock); 223 for (const auto& data : allData) { 224 onMatchedLogEvent(0, *data, true /*scheduledPull*/); 225 } 226} 227 228void GaugeMetricProducer::onMatchedLogEventInternal( 229 const size_t matcherIndex, const HashableDimensionKey& eventKey, 230 const map<string, HashableDimensionKey>& conditionKey, bool condition, 231 const LogEvent& event, bool scheduledPull) { 232 if (condition == false) { 233 return; 234 } 235 uint64_t eventTimeNs = event.GetTimestampNs(); 236 if (eventTimeNs < mCurrentBucketStartTimeNs) { 237 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, 238 (long long)mCurrentBucketStartTimeNs); 239 return; 240 } 241 242 // When the event happens in a new bucket, flush the old buckets. 243 if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) { 244 flushIfNeeded(eventTimeNs); 245 } 246 247 // For gauge metric, we just simply use the first guage in the given bucket. 248 if (!mCurrentSlicedBucket->empty()) { 249 return; 250 } 251 const long gauge = getGauge(event); 252 if (gauge >= 0) { 253 (*mCurrentSlicedBucket)[eventKey] = gauge; 254 } 255 for (auto& tracker : mAnomalyTrackers) { 256 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge); 257 } 258} 259 260// When a new matched event comes in, we check if event falls into the current 261// bucket. If not, flush the old counter to past buckets and initialize the new 262// bucket. 263// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside 264// the GaugeMetricProducer while holding the lock. 265void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { 266 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { 267 return; 268 } 269 270 GaugeBucket info; 271 info.mBucketStartNs = mCurrentBucketStartTimeNs; 272 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs; 273 info.mBucketNum = mCurrentBucketNum; 274 275 for (const auto& slice : *mCurrentSlicedBucket) { 276 info.mGauge = slice.second; 277 auto& bucketList = mPastBuckets[slice.first]; 278 bucketList.push_back(info); 279 VLOG("gauge metric %s, dump key value: %s -> %lld", mMetric.name().c_str(), 280 slice.first.c_str(), (long long)slice.second); 281 } 282 283 // Reset counters 284 for (auto& tracker : mAnomalyTrackers) { 285 tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum); 286 } 287 288 mCurrentSlicedBucket = std::make_shared<DimToValMap>(); 289 290 // Adjusts the bucket start time 291 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs; 292 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs; 293 mCurrentBucketNum += numBucketsForward; 294 VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(), 295 (long long)mCurrentBucketStartTimeNs); 296} 297 298size_t GaugeMetricProducer::byteSize() const { 299 size_t totalSize = 0; 300 for (const auto& pair : mPastBuckets) { 301 totalSize += pair.second.size() * kBucketSize; 302 } 303 return totalSize; 304} 305 306} // namespace statsd 307} // namespace os 308} // namespace android 309