Coverage for anaconda_opentelemetry / metrics.py: 87%
98 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
1# -*- coding: utf-8 -*-
2# SPDX-FileCopyrightText: 2025 Anaconda, Inc
3# SPDX-License-Identifier: Apache-2.0
5# metrics.py
6"""
7Anaconda Telemetry - Metrics signal class.
8"""
10import logging, re
11from typing import Dict, Any
13from opentelemetry import metrics
14from opentelemetry.sdk.metrics import MeterProvider, Counter, UpDownCounter, Histogram, ObservableCounter, ObservableUpDownCounter
15from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter, AggregationTemporality
17from .common import _AnacondaCommon, MetricsNotInitialized
18from .config import Configuration as Config
19from .attributes import ResourceAttributes as Attributes
20from .exporter_shim import OTLPMetricExporterShim
21from .formatting import AttrDict
24class _AnacondaMetrics(_AnacondaCommon):
25 # Singleton instance (internal only); provide a single instance of the metrics class
26 _instance = None
28 _default_temporality: dict[type,AggregationTemporality] = {
29 Counter: AggregationTemporality.DELTA,
30 ObservableCounter: AggregationTemporality.DELTA,
31 Histogram: AggregationTemporality.CUMULATIVE,
32 UpDownCounter: AggregationTemporality.DELTA,
33 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
34 }
36 _cumulative_temporality: dict[type,AggregationTemporality] = {
37 Counter: AggregationTemporality.CUMULATIVE,
38 ObservableCounter: AggregationTemporality.CUMULATIVE,
39 Histogram: AggregationTemporality.CUMULATIVE,
40 UpDownCounter: AggregationTemporality.CUMULATIVE,
41 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
42 }
44 _temporalityValue: dict[bool,str] = {
45 False: "DELTA",
46 True: "CUMULATIVE"
47 }
49 def __init__(self, config: Config, attributes: Attributes):
50 super().__init__(config, attributes)
52 self.metrics_endpoint = config._get_metrics_endpoint()
53 self.telemetry_export_interval_millis = config._get_metrics_export_interval_ms()
54 self.counter_objects: Dict[str, Any] = {}
55 self.up_down_counter_objects: Dict[str, Any] = {}
56 self.histogram_objects: Dict[str, Any] = {}
58 self.meter = self._setup_metrics(config)
59 self.create_dispatcher = {
60 'simple_counter': self.meter.create_counter,
61 'simple_up_down_counter': self.meter.create_up_down_counter,
62 'histogram': self.meter.create_histogram
63 }
64 self.type_list = {
65 'simple_counter': self.counter_objects,
66 'simple_up_down_counter': self.up_down_counter_objects,
67 'histogram': self.histogram_objects
68 }
70 def _setup_metrics(self, config: Config) -> metrics.Meter:
71 if self.use_console_exporters:
72 exporter = ConsoleMetricExporter(preferred_temporality=self._get_temporality())
73 else:
74 auth_token = config._get_auth_token_metrics()
75 headers: Dict[str, str] = {}
76 if auth_token is not None:
77 headers['authorization'] = f'Bearer {auth_token}'
78 if config._get_request_protocol_metrics() in ['grpc', 'grpcs']: # gRPC
79 from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPMetricExportergRPC
80 insecure = not config._get_TLS_metrics()
81 exporter = OTLPMetricExporterShim(
82 OTLPMetricExportergRPC,
83 endpoint=self.metrics_endpoint,
84 insecure=insecure,
85 credentials=config._get_ca_cert_metrics() if not insecure else None,
86 headers=headers,
87 preferred_temporality=self._get_temporality()
88 )
89 else: # HTTP
90 from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPMetricExporterHTTP
91 exporter = OTLPMetricExporterShim(
92 OTLPMetricExporterHTTP,
93 endpoint=self.metrics_endpoint,
94 certificate_file=config._get_ca_cert_metrics(),
95 headers=headers,
96 preferred_temporality=self._get_temporality()
97 )
99 self.exporter = exporter
100 self.metric_reader = PeriodicExportingMetricReader(self.exporter, export_interval_millis=self.telemetry_export_interval_millis)
101 # Create and set meter provider
102 meter_provider = MeterProvider(
103 resource=self.resource,
104 metric_readers=[self.metric_reader]
105 )
106 try:
107 metrics.set_meter_provider(meter_provider)
108 except Exception as e:
109 self.logger.warning(f"The metrics provider was previously set and will take precidence over this call.")
110 # Get meter for this service
111 return metrics.get_meter(self.service_name, self.service_version)
113 def _get_temporality(self) -> dict[type,AggregationTemporality]:
114 if self._config._get_use_cumulative_metrics() == True:
115 return _AnacondaMetrics._cumulative_temporality
116 return _AnacondaMetrics._default_temporality
118 def _check_for_metric(self, metric_name: str, metric_type: str) -> bool:
119 bucket_list = self.type_list.get(metric_type, None)
120 if bucket_list is None:
121 return False
122 return bucket_list.get(metric_name, None) is not None
124 def _get_or_create_metric(self, metric_name: str, metric_type: str = 'simple_up_down_counter', units: str = '#', description='No description.') -> Any:
125 bucket_list = self.type_list.get(metric_type, None)
126 if bucket_list is None:
127 raise MetricsNotInitialized(f"Metric type '{metric_type}' is unknown!")
128 metric = bucket_list.get(metric_name, None)
129 if metric is None:
130 if not re.fullmatch(r"^[A-Za-z][A-Za-z_0-9]+$", metric_name):
131 self.logger.warning(f"Metric {metric_name} does not match valid regex: r\"^[A-Za-z][A-Za-z_0-9]+$\"")
132 return None
133 create = self.create_dispatcher.get(metric_type, None)
134 if create is None:
135 self.logger.warning(f"Metric '{metric_name}' has an invalid type '{metric_type}'; cannot create metric.")
136 return None
137 metric = create(
138 metric_name,
139 unit=units,
140 description=description
141 )
142 if metric is None:
143 self.logger.error(f"Failed to create metric '{metric_name}'!")
144 bucket_list[metric_name] = metric
145 return metric
147 def record_histogram(self, metric_name, value, attributes: AttrDict={}) -> bool:
148 # Record a histogram metric with the given name and value.
149 metric = self._get_or_create_metric(metric_name, metric_type='histogram', units='#', description='Dynamically create histogram metric.')
150 if metric is None:
151 self.logger.error(f"Metric '{metric_name}' failed to be created.")
152 return False
153 metric.record(value, attributes)
154 return True
156 def increment_counter(self, counter_name, by=1, attributes: AttrDict={}) -> bool:
157 # Increment a counter with the given name by the 'by' parameter. abs(by) is used.
158 metric = None
159 if self._check_for_metric(metric_name=counter_name, metric_type='simple_counter'):
160 metric = self._get_or_create_metric(counter_name, metric_type='simple_counter')
161 if metric is None:
162 metric = self._get_or_create_metric(counter_name, metric_type='simple_up_down_counter')
163 if metric is None:
164 self.logger.error(f"Metric '{counter_name}' failed to be created.")
165 return False
166 metric.add(abs(by), attributes)
167 return True
169 def decrement_counter(self, counter_name, by=1, attributes:AttrDict={}) -> bool:
170 # Decrement a up down counter with the given name by the 'by' parameter. abs(by) is used.
171 metric = self._get_or_create_metric(counter_name)
172 if metric is None:
173 self.logger.error(f"Metric '{counter_name}' failed to be created.")
174 return False
175 metric.add(-abs(by), attributes)
176 return True