1# Copyright 2015-2017 ARM Limited 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16"""Aggregators are responsible for aggregating information 17for further analysis. These aggregations can produce 18both scalars and vectors and each aggregator implementation 19is expected to handle its "aggregation" mechanism. 20""" 21 22 23from trappy.utils import listify 24from trappy.stats.Indexer import MultiTriggerIndexer 25from abc import ABCMeta, abstractmethod 26 27 28class AbstractAggregator(object): 29 """Abstract class for all aggregators 30 31 :param indexer: Indexer is passed on by the Child class 32 for handling indices during correlation 33 :type indexer: :mod:`trappy.stats.Indexer.Indexer` 34 35 :param aggfunc: Function that accepts a pandas.Series and 36 process it for aggregation. 37 :type aggfunc: function 38 """ 39 40 __metaclass__ = ABCMeta 41 42 # The current implementation needs the index to 43 # be unified across data frames to account for 44 # variable sampling across data frames 45 def __init__(self, indexer, aggfunc=None): 46 47 self._result = {} 48 self._aggregated = False 49 self._aggfunc = aggfunc 50 self.indexer = indexer 51 52 def _add_result(self, pivot, series): 53 """Add the result for the given pivot and trace 54 55 :param pivot: The pivot for which the result is being generated 56 :type pivot(hashable) 57 58 :param series: series to be added to result 59 :type series: :mod:`pandas.Series` 60 """ 61 62 if pivot not in self._result: 63 self._result[pivot] = self.indexer.series() 64 65 for idx in series.index: 66 self._result[pivot][idx] = series[idx] 67 68 @abstractmethod 69 def aggregate(self, trace_idx, **kwargs): 70 """Abstract Method for aggregating data for various 71 pivots. 72 73 :param trace_idx: Index of the trace to be aggregated 74 :type trace_idx: int 75 76 :return: The aggregated result 77 78 """ 79 80 raise NotImplementedError("Method Not Implemented") 81 82 83class MultiTriggerAggregator(AbstractAggregator): 84 85 """This aggregator accepts a list of triggers and each trigger has 86 a value associated with it. 87 """ 88 89 def __init__(self, triggers, topology, aggfunc=None): 90 """ 91 :param triggers: trappy.stat.Trigger): A list or a singular trigger object 92 :type triggers: :mod:`trappy.stat.Trigger.Trigger` 93 94 :param topology (trappy.stat.Topology): A topology object for aggregation 95 levels 96 :type topology: :mod:`trappy.stat.Topology` 97 98 :param aggfunc: A function to be applied on each series being aggregated. 99 For each topology node, a series will be generated and this 100 will be processed by the aggfunc 101 :type aggfunc: function 102 """ 103 self._triggers = triggers 104 self.topology = topology 105 super( 106 MultiTriggerAggregator, 107 self).__init__(MultiTriggerIndexer(triggers), aggfunc) 108 109 def aggregate(self, **kwargs): 110 """ 111 Aggregate implementation that aggregates 112 triggers for a given topological level. All the arguments passed to 113 it are forwarded to the aggregator function except level (if present) 114 115 :return: A scalar or a vector aggregated result. Each group in the 116 level produces an element in the result list with a one to one 117 index correspondence 118 :: 119 120 groups["level"] = [[1,2], [3,4]] 121 result = [result_1, result_2] 122 """ 123 124 level = kwargs.pop("level", "all") 125 126 # This function is a hot spot in the code. It is 127 # worth considering a memoize decorator to cache 128 # the function. The memoization can also be 129 # maintained by the aggregator object. This will 130 # help the code scale efficeintly 131 level_groups = self.topology.get_level(level) 132 result = [] 133 134 135 if not self._aggregated: 136 self._aggregate_base() 137 138 for group in level_groups: 139 group = listify(group) 140 if self._aggfunc is not None: 141 level_res = self._aggfunc(self._result[group[0]], **kwargs) 142 else: 143 level_res = self._result[group[0]] 144 145 for node in group[1:]: 146 if self._aggfunc is not None: 147 node_res = self._aggfunc(self._result[node], **kwargs) 148 else: 149 node_res = self._result[node] 150 151 level_res += node_res 152 153 result.append(level_res) 154 155 return result 156 157 def _aggregate_base(self): 158 """A memoized function to generate the base series 159 for each node in the flattened topology 160 :: 161 162 topo["level_1"] = [[1, 2], [3, 4]] 163 164 This function will generate the fundamental 165 aggregations for all nodes 1, 2, 3, 4 and 166 store the result in _agg_result 167 """ 168 169 for trigger in self._triggers: 170 for node in self.topology.flatten(): 171 result_series = trigger.generate(node) 172 self._add_result(node, result_series) 173 174 self._aggregated = True 175