Coverage for anaconda_opentelemetry / metrics.py: 87%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 15:45 +0000

1# -*- coding: utf-8 -*- 

2# SPDX-FileCopyrightText: 2025 Anaconda, Inc 

3# SPDX-License-Identifier: Apache-2.0 

4 

5# metrics.py 

6""" 

7Anaconda Telemetry - Metrics signal class. 

8""" 

9 

10import logging, re 

11from typing import Dict, Any 

12 

13from opentelemetry import metrics 

14from opentelemetry.sdk.metrics import MeterProvider, Counter, UpDownCounter, Histogram, ObservableCounter, ObservableUpDownCounter 

15from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter, AggregationTemporality 

16 

17from .common import _AnacondaCommon, MetricsNotInitialized 

18from .config import Configuration as Config 

19from .attributes import ResourceAttributes as Attributes 

20from .exporter_shim import OTLPMetricExporterShim 

21from .formatting import AttrDict 

22 

23 

24class _AnacondaMetrics(_AnacondaCommon): 

25 # Singleton instance (internal only); provide a single instance of the metrics class 

26 _instance = None 

27 

28 _default_temporality: dict[type,AggregationTemporality] = { 

29 Counter: AggregationTemporality.DELTA, 

30 ObservableCounter: AggregationTemporality.DELTA, 

31 Histogram: AggregationTemporality.CUMULATIVE, 

32 UpDownCounter: AggregationTemporality.DELTA, 

33 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, 

34 } 

35 

36 _cumulative_temporality: dict[type,AggregationTemporality] = { 

37 Counter: AggregationTemporality.CUMULATIVE, 

38 ObservableCounter: AggregationTemporality.CUMULATIVE, 

39 Histogram: AggregationTemporality.CUMULATIVE, 

40 UpDownCounter: AggregationTemporality.CUMULATIVE, 

41 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, 

42 } 

43 

44 _temporalityValue: dict[bool,str] = { 

45 False: "DELTA", 

46 True: "CUMULATIVE" 

47 } 

48 

49 def __init__(self, config: Config, attributes: Attributes): 

50 super().__init__(config, attributes) 

51 

52 self.metrics_endpoint = config._get_metrics_endpoint() 

53 self.telemetry_export_interval_millis = config._get_metrics_export_interval_ms() 

54 self.counter_objects: Dict[str, Any] = {} 

55 self.up_down_counter_objects: Dict[str, Any] = {} 

56 self.histogram_objects: Dict[str, Any] = {} 

57 

58 self.meter = self._setup_metrics(config) 

59 self.create_dispatcher = { 

60 'simple_counter': self.meter.create_counter, 

61 'simple_up_down_counter': self.meter.create_up_down_counter, 

62 'histogram': self.meter.create_histogram 

63 } 

64 self.type_list = { 

65 'simple_counter': self.counter_objects, 

66 'simple_up_down_counter': self.up_down_counter_objects, 

67 'histogram': self.histogram_objects 

68 } 

69 

70 def _setup_metrics(self, config: Config) -> metrics.Meter: 

71 if self.use_console_exporters: 

72 exporter = ConsoleMetricExporter(preferred_temporality=self._get_temporality()) 

73 else: 

74 auth_token = config._get_auth_token_metrics() 

75 headers: Dict[str, str] = {} 

76 if auth_token is not None: 

77 headers['authorization'] = f'Bearer {auth_token}' 

78 if config._get_request_protocol_metrics() in ['grpc', 'grpcs']: # gRPC 

79 from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPMetricExportergRPC 

80 insecure = not config._get_TLS_metrics() 

81 exporter = OTLPMetricExporterShim( 

82 OTLPMetricExportergRPC, 

83 endpoint=self.metrics_endpoint, 

84 insecure=insecure, 

85 credentials=config._get_ca_cert_metrics() if not insecure else None, 

86 headers=headers, 

87 preferred_temporality=self._get_temporality() 

88 ) 

89 else: # HTTP 

90 from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPMetricExporterHTTP 

91 exporter = OTLPMetricExporterShim( 

92 OTLPMetricExporterHTTP, 

93 endpoint=self.metrics_endpoint, 

94 certificate_file=config._get_ca_cert_metrics(), 

95 headers=headers, 

96 preferred_temporality=self._get_temporality() 

97 ) 

98 

99 self.exporter = exporter 

100 self.metric_reader = PeriodicExportingMetricReader(self.exporter, export_interval_millis=self.telemetry_export_interval_millis) 

101 # Create and set meter provider 

102 meter_provider = MeterProvider( 

103 resource=self.resource, 

104 metric_readers=[self.metric_reader] 

105 ) 

106 try: 

107 metrics.set_meter_provider(meter_provider) 

108 except Exception as e: 

109 self.logger.warning(f"The metrics provider was previously set and will take precidence over this call.") 

110 # Get meter for this service 

111 return metrics.get_meter(self.service_name, self.service_version) 

112 

113 def _get_temporality(self) -> dict[type,AggregationTemporality]: 

114 if self._config._get_use_cumulative_metrics() == True: 

115 return _AnacondaMetrics._cumulative_temporality 

116 return _AnacondaMetrics._default_temporality 

117 

118 def _check_for_metric(self, metric_name: str, metric_type: str) -> bool: 

119 bucket_list = self.type_list.get(metric_type, None) 

120 if bucket_list is None: 

121 return False 

122 return bucket_list.get(metric_name, None) is not None 

123 

124 def _get_or_create_metric(self, metric_name: str, metric_type: str = 'simple_up_down_counter', units: str = '#', description='No description.') -> Any: 

125 bucket_list = self.type_list.get(metric_type, None) 

126 if bucket_list is None: 

127 raise MetricsNotInitialized(f"Metric type '{metric_type}' is unknown!") 

128 metric = bucket_list.get(metric_name, None) 

129 if metric is None: 

130 if not re.fullmatch(r"^[A-Za-z][A-Za-z_0-9]+$", metric_name): 

131 self.logger.warning(f"Metric {metric_name} does not match valid regex: r\"^[A-Za-z][A-Za-z_0-9]+$\"") 

132 return None 

133 create = self.create_dispatcher.get(metric_type, None) 

134 if create is None: 

135 self.logger.warning(f"Metric '{metric_name}' has an invalid type '{metric_type}'; cannot create metric.") 

136 return None 

137 metric = create( 

138 metric_name, 

139 unit=units, 

140 description=description 

141 ) 

142 if metric is None: 

143 self.logger.error(f"Failed to create metric '{metric_name}'!") 

144 bucket_list[metric_name] = metric 

145 return metric 

146 

147 def record_histogram(self, metric_name, value, attributes: AttrDict={}) -> bool: 

148 # Record a histogram metric with the given name and value. 

149 metric = self._get_or_create_metric(metric_name, metric_type='histogram', units='#', description='Dynamically create histogram metric.') 

150 if metric is None: 

151 self.logger.error(f"Metric '{metric_name}' failed to be created.") 

152 return False 

153 metric.record(value, attributes) 

154 return True 

155 

156 def increment_counter(self, counter_name, by=1, attributes: AttrDict={}) -> bool: 

157 # Increment a counter with the given name by the 'by' parameter. abs(by) is used. 

158 metric = None 

159 if self._check_for_metric(metric_name=counter_name, metric_type='simple_counter'): 

160 metric = self._get_or_create_metric(counter_name, metric_type='simple_counter') 

161 if metric is None: 

162 metric = self._get_or_create_metric(counter_name, metric_type='simple_up_down_counter') 

163 if metric is None: 

164 self.logger.error(f"Metric '{counter_name}' failed to be created.") 

165 return False 

166 metric.add(abs(by), attributes) 

167 return True 

168 

169 def decrement_counter(self, counter_name, by=1, attributes:AttrDict={}) -> bool: 

170 # Decrement a up down counter with the given name by the 'by' parameter. abs(by) is used. 

171 metric = self._get_or_create_metric(counter_name) 

172 if metric is None: 

173 self.logger.error(f"Metric '{counter_name}' failed to be created.") 

174 return False 

175 metric.add(-abs(by), attributes) 

176 return True