Coverage for anaconda_opentelemetry / signals.py: 94%
148 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
1# -*- coding: utf-8 -*-
2# SPDX-FileCopyrightText: 2025 Anaconda, Inc
3# SPDX-License-Identifier: Apache-2.0
5# signals.py
6"""
7Anaconda Telemetry - Metrics Module
9This module provides functionality for logging, metrics, and tracing (together called
10signals) using OpenTelemetry. It includes classes for handling logging, metrics, and
11tracing, as well as functions for initializing the telemetry system and recording metrics.
12"""
14import logging, socket
15from typing import Dict, Iterator, List
16from contextlib import contextmanager
18from opentelemetry.sdk._logs import LoggingHandler
20from .config import Configuration as Config
21from .attributes import ResourceAttributes as Attributes
22from .formatting import AttrDict
24from .common import _AnacondaCommon, MetricsNotInitialized
25from .logging import _AnacondaLogger
26from .metrics import _AnacondaMetrics
27from .tracing import _AnacondaTrace, ASpan, _ASpan
30# Internet and endpoint access check method
31def __check_internet_status(config: Config, timeout: float = 5.0) -> tuple[bool,bool]: # seconds max to pause....
32 # Relies on Configuration to validate the endpoint...
33 internet = True
34 access = True
35 if config._get_skip_internet_check():
36 return True, True
37 endpoint = config._get_default_endpoint()
38 try:
39 # Access to a highly available DNS site...
40 socket.create_connection(('8.8.8.8', 53), timeout=timeout / 2).close()
41 except OSError:
42 logging.getLogger(__package__).warning("Anaconda OpenTelemetry: No Internet was detected!")
43 internet = False # No internet, but internet is not an absolute requirement for on-prem solutions.
44 try:
45 socket.create_connection((config._endpoints['default_endpoint'].host, config._endpoints['default_endpoint']._internet_check_port), timeout=timeout / 2).close()
46 except OSError:
47 logging.getLogger(__package__).fatal(f"Anaconda OpenTelemetry: No access to the endpoint '{endpoint}'!")
48 access = False # This could be fatal, not endpoint for telemetry.
49 if access == True:
50 logging.getLogger(__package__).info(f"Anaconda OpenTelemetry: Successful access to the endpoint '{endpoint}'!")
51 return internet, access
53__ANACONDA_TELEMETRY_INITIALIZED = False
54__SIGNALS = None
55__CONFIG = None
58################################################################################
59# Exposed APIs
60def initialize_telemetry(config: Config,
61 attributes: Attributes = None,
62 signal_types: List[str] = ['metrics']): # Follows what the backend has implemented.
63 """
64 Initializes the telemetry system.
66 Args:
67 service_name (str): The name of the service.
68 service_version (str): The version of the service.
69 config (Configuration): The configuration for the telemetry. At a minimum, the Configuration must have a default endpoint
70 for connection to the collector.
71 attributes (ResourceAttributes, optional): A class containing common attributes. If provided,
72 it will override any values shared with configuration file.
73 signal_types (list, optional): List of metric types to initialize. Defaults to ['logging','metrics','tracing'].
74 Supported values are 'logging', 'metrics', and 'tracing'. If an empty list is provided, no metrics will be initialized.
76 Raises:
77 ValueError: If the config passed is None or the attributes passed are None.
78 """
79 global __ANACONDA_TELEMETRY_INITIALIZED
80 global __SIGNALS
81 global __CONFIG
83 if __ANACONDA_TELEMETRY_INITIALIZED is True:
84 return # Already initialized
85 if config is None:
86 raise ValueError(f"The config argument is required but was None")
87 if attributes is None:
88 raise ValueError(f"The attributes argument is required but was None")
90 __CONFIG = config
91 __SIGNALS = signal_types
93 # Check ResourceAttributes object
94 if attributes is None:
95 raise ValueError(f"The attributes argument is required but was None")
96 elif type(attributes.parameters) != dict:
97 raise ValueError(f"The parameters attribute in ResourceAttributes must be a dictionary")
99 # Right now, no acction is taken but it possible to disable telemetry with no access to the endpoint...
100 _, _ = __check_internet_status(config, timeout=4) # Max wait 4 seconds...
102 # all params are the same currently so only write them once
103 init_params = (config, attributes)
105 # Initialize logging here...
106 signal_type_count = 0
107 if 'logging' in signal_types:
108 _AnacondaLogger._instance = _AnacondaLogger(*init_params)
109 signal_type_count += 1
111 # Initialize the telemetry system here
112 if 'metrics' in signal_types:
113 _AnacondaMetrics._instance = _AnacondaMetrics(*init_params)
114 signal_type_count += 1
116 # Initialize tracing here...
117 if 'tracing' in signal_types:
118 _AnacondaTrace._instance = _AnacondaTrace(*init_params)
119 signal_type_count += 1
121 if signal_type_count == 0:
122 logging.getLogger(__package__).warning(
123 "No signal types were initialized. Was this intended? If not please check the " +
124 "'metrics' section in the configuration file and/or the list of " +
125 "metric types in the parameter 'signal_types'."
126 )
127 __ANACONDA_TELEMETRY_INITIALIZED = True
129def change_signal_endpoint(signal_type: str, new_endpoint: str, auth_token: str = None):
130 """
131 Updates the endpoint for the passed signal
133 Args:
134 signal_type (str): signal type to update endpoint for. Supported values are 'logging', 'metrics', and 'tracing'
136 Returns:
137 boolean: value indicating whether the update was successful or not
138 """
139 if signal_type.lower() == 'metrics':
140 _AnacondaTelInstance = _AnacondaMetrics
141 batch_access = _AnacondaTelInstance._instance.metric_reader
142 elif signal_type.lower() == 'tracing':
143 _AnacondaTelInstance = _AnacondaTrace
144 batch_access = _AnacondaTelInstance._instance._processor
145 elif signal_type.lower() == 'logging':
146 _AnacondaTelInstance = _AnacondaLogger
147 batch_access = _AnacondaTelInstance._instance._processor
148 else:
149 logging.getLogger(__package__).warning(f"{signal_type} not a valid signal type.")
150 return False
152 updated_endpoint = _AnacondaTelInstance._instance.exporter.change_signal_endpoint(
153 batch_access,
154 _AnacondaTelInstance._instance._config,
155 new_endpoint,
156 auth_token=auth_token
157 )
159 if not updated_endpoint:
160 logging.getLogger(__package__).warning(f"Endpoint for {signal_type} failed to update.")
161 return False
162 else:
163 logging.getLogger(__package__).info(f"Endpoint for {signal_type} was successfully updated.")
164 return True
166def record_histogram(metric_name, value, attributes: AttrDict={}) -> bool:
167 """
168 Records a increasing only metric with the given name and value. The value will
169 always appear in the attributes section in the raw OTLP output and the timestamp
170 will be the histogram value.
172 Will catch any exceptions generated by metric usage.
174 Args:
175 metric_name (str): The name of the metric.
176 value (float): The value of the metric. Can be any float since the timestamp is the ever increasing value of the histogram.
177 attributes (dict, optional): Additional attributes for the metric. Defaults to {}.
179 Returns:
180 bool: True if the metric was recorded successfully, False otherwise (logging the error).
181 """
182 if __ANACONDA_TELEMETRY_INITIALIZED is False:
183 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!!
184 return False
185 try:
186 return _AnacondaMetrics._instance.record_histogram(metric_name, value, _AnacondaMetrics._instance._process_attributes(attributes))
187 except MetricsNotInitialized as me:
188 logging.getLogger(__package__).warning(f"An attempt was made to record a histogram metric when metrics were not configured.")
189 return False
190 except Exception as e:
191 logging.getLogger(__package__).error(f"UNCAUGHT EXCEPTION:\n{e}")
192 return False
194def increment_counter(counter_name, by=1, attributes: AttrDict={}) -> bool:
195 """
196 Increments a counter or up down counter by the given parameter 'by'.
198 Will catch any exceptions generated by metric usage.
200 Args:
201 counter_name (str): The name of the counter.
202 by (int, optional): The value to increment by. Defaults to 1. The abs(by) is used to protect from negative numbers.
203 attributes (dict, optional): Additional attributes for the counter. Defaults to {}.
205 Returns:
206 bool: True if the counter was incremented successfully, False otherwise (logging the error).
207 """
208 if __ANACONDA_TELEMETRY_INITIALIZED is False:
209 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!!
210 return False
211 try:
212 return _AnacondaMetrics._instance.increment_counter(counter_name, by, _AnacondaMetrics._instance._process_attributes(attributes))
213 except MetricsNotInitialized:
214 logging.getLogger(__package__).warning(f"An attempt was made to change/create a counter metric when metrics were not configured.")
215 return False
216 except Exception as e:
217 logging.getLogger(__package__).error(f"UNCAUGHT EXCEPTION:\n{e}")
218 return False
220def decrement_counter(counter_name, by=1, attributes: AttrDict={}) -> bool:
221 """
222 Decrements a up down counter with the given name and value. If applied to a regular counter it will log a warning and silently fail.
224 Will catch any exceptions generated by metric usage.
226 Args:
227 counter_name (str): The name of the counter.
228 by (int, optional): The value to decrement by. Defaults to 1. abs(by) is used to protect from negative numbers.
229 attributes (dict, optional): Additional attributes for the counter. Defaults to {}.
231 Returns:
232 bool: True if the counter was decremented successfully, False otherwise (logging the error).
233 """
234 if __ANACONDA_TELEMETRY_INITIALIZED is False:
235 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!!
236 return False
237 try:
238 return _AnacondaMetrics._instance.decrement_counter(counter_name, by, _AnacondaMetrics._instance._process_attributes(attributes))
239 except MetricsNotInitialized:
240 logging.getLogger(__package__).warning(f"An attempt was made to change/create a counter metric when metrics were not configured.")
241 return False
242 except Exception as e:
243 logging.getLogger(__package__).error(f"UNCAUGHT EXCEPTION:\n{e}")
244 return False
246@contextmanager
247def get_trace(name: str, attributes: AttrDict = {}, carrier: Dict[str,str] = None) -> Iterator[_ASpan]:
248 """
249 Create or continue a named trace (based on the 'carrier' parameter).
251 Use the function like a Python I/O object (keyword 'with') to ensure the span is closed properly.
253 Will catch any exceptions generated by tracing usage.
255 Args:
256 name (str): The name of the trace.
257 attributes (dict, optional): Additional attributes for the trace. Defaults to {}.
258 carrier (dict, optional): The carrier used to continue a trace context in the output data. Defaults to None.
260 Example:
261 with get_trace("my_trace_name", {"key": "value"}) as span:
262 # Do some work here
263 pass
264 # The span will be closed automatically when exiting the 'with' block.
266 Returns:
267 Iterator[Tracer]: An iterator for the tracer.
268 """
269 if __ANACONDA_TELEMETRY_INITIALIZED is False:
270 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!!
271 return None
273 try:
274 aspan = _AnacondaTrace._instance.get_span(name, _AnacondaTrace._instance._process_attributes(attributes), carrier)
275 except: # Trace is different than the other signals, there is no easy way to log and continue.
276 logging.getLogger(__package__).warning(f"Attempt to trace a with-block when tracing was not configured.")
277 aspan = _ASpan("UNKNOWN", span=None, noop=True)
278 try:
279 yield aspan
280 except Exception as e:
281 aspan.add_exception(e)
282 aspan.set_error_status()
283 _AnacondaTrace._instance.logger.error(f"Error in trace span {name}: {e}")
284 finally:
285 aspan._close()
287def get_telemetry_logger_handler() -> LoggingHandler:
288 """
289 Returns the telemetry logger handler. This lets the package user control how the application uses the telemetry logger.
290 Insert this handler into your named logger.
292 log = logging.getLogger("my_logger")
293 log.addHandler(get_telemetry_logger_handler())
295 Previously, this was injected into the root logger, but this turned out to be problematic for some applications that
296 wanted to control the logging configuration more precisely. This injection behavior is now disabled. If you wish to
297 inject the handler into the root logger, you can do so manually. See the Python logging documentation for more information.
299 Returns:
300 logging.Logger: The telemetry logger handler if logging was enabled via signal_types in initialize_telemetry,
301 otherwise this function returns None.
302 Raises:
303 RuntimeError: if `initialize_telemetry` has not been called
304 """
305 global __ANACONDA_TELEMETRY_INITIALIZED
306 if __ANACONDA_TELEMETRY_INITIALIZED is False:
307 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!!
308 raise RuntimeError("Anaconda telemetry system not initialized.")
309 if _AnacondaLogger._instance is not None:
310 return _AnacondaLogger._instance._get_log_handler()
311 return None # No logger handler available, logging not initialized or not configured.
313def send_event(body: str, event_name: str, attributes: AttrDict={}) -> bool:
314 """
315 Sends a log event directly to the OpenTelemetry pipeline without using Python's logging module.
316 This is useful when you want to export log telemetry but don't want the output mixing with
317 your application's output or developer logs.
319 Params:
320 body (str): the log message body
321 event_name (str): mandatory event name added to attributes
322 attributes (AttrDict): optional attributes dict
323 Returns:
324 bool: True if the event was sent, False if logging was not initialized
325 Raises:
326 RuntimeError: if `initialize_telemetry` has not been called
327 """
328 global __ANACONDA_TELEMETRY_INITIALIZED
329 if __ANACONDA_TELEMETRY_INITIALIZED is False:
330 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.")
331 raise RuntimeError("Anaconda telemetry system not initialized.")
332 if _AnacondaLogger._instance is not None:
333 event_logger = _AnacondaLogger._instance._get_event_logger()
334 event_logger._send_event(body, event_name, _AnacondaLogger._instance._process_attributes(attributes))
335 return True
336 return False