Coverage for anaconda_opentelemetry / signals.py: 94%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 15:45 +0000

1# -*- coding: utf-8 -*- 

2# SPDX-FileCopyrightText: 2025 Anaconda, Inc 

3# SPDX-License-Identifier: Apache-2.0 

4 

5# signals.py 

6""" 

7Anaconda Telemetry - Metrics Module 

8 

9This module provides functionality for logging, metrics, and tracing (together called 

10signals) using OpenTelemetry. It includes classes for handling logging, metrics, and 

11tracing, as well as functions for initializing the telemetry system and recording metrics. 

12""" 

13 

14import logging, socket 

15from typing import Dict, Iterator, List 

16from contextlib import contextmanager 

17 

18from opentelemetry.sdk._logs import LoggingHandler 

19 

20from .config import Configuration as Config 

21from .attributes import ResourceAttributes as Attributes 

22from .formatting import AttrDict 

23 

24from .common import _AnacondaCommon, MetricsNotInitialized 

25from .logging import _AnacondaLogger 

26from .metrics import _AnacondaMetrics 

27from .tracing import _AnacondaTrace, ASpan, _ASpan 

28 

29 

30# Internet and endpoint access check method 

31def __check_internet_status(config: Config, timeout: float = 5.0) -> tuple[bool,bool]: # seconds max to pause.... 

32 # Relies on Configuration to validate the endpoint... 

33 internet = True 

34 access = True 

35 if config._get_skip_internet_check(): 

36 return True, True 

37 endpoint = config._get_default_endpoint() 

38 try: 

39 # Access to a highly available DNS site... 

40 socket.create_connection(('8.8.8.8', 53), timeout=timeout / 2).close() 

41 except OSError: 

42 logging.getLogger(__package__).warning("Anaconda OpenTelemetry: No Internet was detected!") 

43 internet = False # No internet, but internet is not an absolute requirement for on-prem solutions. 

44 try: 

45 socket.create_connection((config._endpoints['default_endpoint'].host, config._endpoints['default_endpoint']._internet_check_port), timeout=timeout / 2).close() 

46 except OSError: 

47 logging.getLogger(__package__).fatal(f"Anaconda OpenTelemetry: No access to the endpoint '{endpoint}'!") 

48 access = False # This could be fatal, not endpoint for telemetry. 

49 if access == True: 

50 logging.getLogger(__package__).info(f"Anaconda OpenTelemetry: Successful access to the endpoint '{endpoint}'!") 

51 return internet, access 

52 

53__ANACONDA_TELEMETRY_INITIALIZED = False 

54__SIGNALS = None 

55__CONFIG = None 

56 

57 

58################################################################################ 

59# Exposed APIs 

60def initialize_telemetry(config: Config, 

61 attributes: Attributes = None, 

62 signal_types: List[str] = ['metrics']): # Follows what the backend has implemented. 

63 """ 

64 Initializes the telemetry system. 

65 

66 Args: 

67 service_name (str): The name of the service. 

68 service_version (str): The version of the service. 

69 config (Configuration): The configuration for the telemetry. At a minimum, the Configuration must have a default endpoint 

70 for connection to the collector. 

71 attributes (ResourceAttributes, optional): A class containing common attributes. If provided, 

72 it will override any values shared with configuration file. 

73 signal_types (list, optional): List of metric types to initialize. Defaults to ['logging','metrics','tracing']. 

74 Supported values are 'logging', 'metrics', and 'tracing'. If an empty list is provided, no metrics will be initialized. 

75 

76 Raises: 

77 ValueError: If the config passed is None or the attributes passed are None. 

78 """ 

79 global __ANACONDA_TELEMETRY_INITIALIZED 

80 global __SIGNALS 

81 global __CONFIG 

82 

83 if __ANACONDA_TELEMETRY_INITIALIZED is True: 

84 return # Already initialized 

85 if config is None: 

86 raise ValueError(f"The config argument is required but was None") 

87 if attributes is None: 

88 raise ValueError(f"The attributes argument is required but was None") 

89 

90 __CONFIG = config 

91 __SIGNALS = signal_types 

92 

93 # Check ResourceAttributes object 

94 if attributes is None: 

95 raise ValueError(f"The attributes argument is required but was None") 

96 elif type(attributes.parameters) != dict: 

97 raise ValueError(f"The parameters attribute in ResourceAttributes must be a dictionary") 

98 

99 # Right now, no acction is taken but it possible to disable telemetry with no access to the endpoint... 

100 _, _ = __check_internet_status(config, timeout=4) # Max wait 4 seconds... 

101 

102 # all params are the same currently so only write them once 

103 init_params = (config, attributes) 

104 

105 # Initialize logging here... 

106 signal_type_count = 0 

107 if 'logging' in signal_types: 

108 _AnacondaLogger._instance = _AnacondaLogger(*init_params) 

109 signal_type_count += 1 

110 

111 # Initialize the telemetry system here 

112 if 'metrics' in signal_types: 

113 _AnacondaMetrics._instance = _AnacondaMetrics(*init_params) 

114 signal_type_count += 1 

115 

116 # Initialize tracing here... 

117 if 'tracing' in signal_types: 

118 _AnacondaTrace._instance = _AnacondaTrace(*init_params) 

119 signal_type_count += 1 

120 

121 if signal_type_count == 0: 

122 logging.getLogger(__package__).warning( 

123 "No signal types were initialized. Was this intended? If not please check the " + 

124 "'metrics' section in the configuration file and/or the list of " + 

125 "metric types in the parameter 'signal_types'." 

126 ) 

127 __ANACONDA_TELEMETRY_INITIALIZED = True 

128 

129def change_signal_endpoint(signal_type: str, new_endpoint: str, auth_token: str = None): 

130 """ 

131 Updates the endpoint for the passed signal 

132 

133 Args: 

134 signal_type (str): signal type to update endpoint for. Supported values are 'logging', 'metrics', and 'tracing' 

135 

136 Returns: 

137 boolean: value indicating whether the update was successful or not 

138 """ 

139 if signal_type.lower() == 'metrics': 

140 _AnacondaTelInstance = _AnacondaMetrics 

141 batch_access = _AnacondaTelInstance._instance.metric_reader 

142 elif signal_type.lower() == 'tracing': 

143 _AnacondaTelInstance = _AnacondaTrace 

144 batch_access = _AnacondaTelInstance._instance._processor 

145 elif signal_type.lower() == 'logging': 

146 _AnacondaTelInstance = _AnacondaLogger 

147 batch_access = _AnacondaTelInstance._instance._processor 

148 else: 

149 logging.getLogger(__package__).warning(f"{signal_type} not a valid signal type.") 

150 return False 

151 

152 updated_endpoint = _AnacondaTelInstance._instance.exporter.change_signal_endpoint( 

153 batch_access, 

154 _AnacondaTelInstance._instance._config, 

155 new_endpoint, 

156 auth_token=auth_token 

157 ) 

158 

159 if not updated_endpoint: 

160 logging.getLogger(__package__).warning(f"Endpoint for {signal_type} failed to update.") 

161 return False 

162 else: 

163 logging.getLogger(__package__).info(f"Endpoint for {signal_type} was successfully updated.") 

164 return True 

165 

166def record_histogram(metric_name, value, attributes: AttrDict={}) -> bool: 

167 """ 

168 Records a increasing only metric with the given name and value. The value will 

169 always appear in the attributes section in the raw OTLP output and the timestamp 

170 will be the histogram value. 

171 

172 Will catch any exceptions generated by metric usage. 

173 

174 Args: 

175 metric_name (str): The name of the metric. 

176 value (float): The value of the metric. Can be any float since the timestamp is the ever increasing value of the histogram. 

177 attributes (dict, optional): Additional attributes for the metric. Defaults to {}. 

178 

179 Returns: 

180 bool: True if the metric was recorded successfully, False otherwise (logging the error). 

181 """ 

182 if __ANACONDA_TELEMETRY_INITIALIZED is False: 

183 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!! 

184 return False 

185 try: 

186 return _AnacondaMetrics._instance.record_histogram(metric_name, value, _AnacondaMetrics._instance._process_attributes(attributes)) 

187 except MetricsNotInitialized as me: 

188 logging.getLogger(__package__).warning(f"An attempt was made to record a histogram metric when metrics were not configured.") 

189 return False 

190 except Exception as e: 

191 logging.getLogger(__package__).error(f"UNCAUGHT EXCEPTION:\n{e}") 

192 return False 

193 

194def increment_counter(counter_name, by=1, attributes: AttrDict={}) -> bool: 

195 """ 

196 Increments a counter or up down counter by the given parameter 'by'. 

197 

198 Will catch any exceptions generated by metric usage. 

199 

200 Args: 

201 counter_name (str): The name of the counter. 

202 by (int, optional): The value to increment by. Defaults to 1. The abs(by) is used to protect from negative numbers. 

203 attributes (dict, optional): Additional attributes for the counter. Defaults to {}. 

204 

205 Returns: 

206 bool: True if the counter was incremented successfully, False otherwise (logging the error). 

207 """ 

208 if __ANACONDA_TELEMETRY_INITIALIZED is False: 

209 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!! 

210 return False 

211 try: 

212 return _AnacondaMetrics._instance.increment_counter(counter_name, by, _AnacondaMetrics._instance._process_attributes(attributes)) 

213 except MetricsNotInitialized: 

214 logging.getLogger(__package__).warning(f"An attempt was made to change/create a counter metric when metrics were not configured.") 

215 return False 

216 except Exception as e: 

217 logging.getLogger(__package__).error(f"UNCAUGHT EXCEPTION:\n{e}") 

218 return False 

219 

220def decrement_counter(counter_name, by=1, attributes: AttrDict={}) -> bool: 

221 """ 

222 Decrements a up down counter with the given name and value. If applied to a regular counter it will log a warning and silently fail. 

223 

224 Will catch any exceptions generated by metric usage. 

225 

226 Args: 

227 counter_name (str): The name of the counter. 

228 by (int, optional): The value to decrement by. Defaults to 1. abs(by) is used to protect from negative numbers. 

229 attributes (dict, optional): Additional attributes for the counter. Defaults to {}. 

230 

231 Returns: 

232 bool: True if the counter was decremented successfully, False otherwise (logging the error). 

233 """ 

234 if __ANACONDA_TELEMETRY_INITIALIZED is False: 

235 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!! 

236 return False 

237 try: 

238 return _AnacondaMetrics._instance.decrement_counter(counter_name, by, _AnacondaMetrics._instance._process_attributes(attributes)) 

239 except MetricsNotInitialized: 

240 logging.getLogger(__package__).warning(f"An attempt was made to change/create a counter metric when metrics were not configured.") 

241 return False 

242 except Exception as e: 

243 logging.getLogger(__package__).error(f"UNCAUGHT EXCEPTION:\n{e}") 

244 return False 

245 

246@contextmanager 

247def get_trace(name: str, attributes: AttrDict = {}, carrier: Dict[str,str] = None) -> Iterator[_ASpan]: 

248 """ 

249 Create or continue a named trace (based on the 'carrier' parameter). 

250 

251 Use the function like a Python I/O object (keyword 'with') to ensure the span is closed properly. 

252 

253 Will catch any exceptions generated by tracing usage. 

254 

255 Args: 

256 name (str): The name of the trace. 

257 attributes (dict, optional): Additional attributes for the trace. Defaults to {}. 

258 carrier (dict, optional): The carrier used to continue a trace context in the output data. Defaults to None. 

259 

260 Example: 

261 with get_trace("my_trace_name", {"key": "value"}) as span: 

262 # Do some work here 

263 pass 

264 # The span will be closed automatically when exiting the 'with' block. 

265 

266 Returns: 

267 Iterator[Tracer]: An iterator for the tracer. 

268 """ 

269 if __ANACONDA_TELEMETRY_INITIALIZED is False: 

270 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!! 

271 return None 

272 

273 try: 

274 aspan = _AnacondaTrace._instance.get_span(name, _AnacondaTrace._instance._process_attributes(attributes), carrier) 

275 except: # Trace is different than the other signals, there is no easy way to log and continue. 

276 logging.getLogger(__package__).warning(f"Attempt to trace a with-block when tracing was not configured.") 

277 aspan = _ASpan("UNKNOWN", span=None, noop=True) 

278 try: 

279 yield aspan 

280 except Exception as e: 

281 aspan.add_exception(e) 

282 aspan.set_error_status() 

283 _AnacondaTrace._instance.logger.error(f"Error in trace span {name}: {e}") 

284 finally: 

285 aspan._close() 

286 

287def get_telemetry_logger_handler() -> LoggingHandler: 

288 """ 

289 Returns the telemetry logger handler. This lets the package user control how the application uses the telemetry logger. 

290 Insert this handler into your named logger. 

291 

292 log = logging.getLogger("my_logger") 

293 log.addHandler(get_telemetry_logger_handler()) 

294 

295 Previously, this was injected into the root logger, but this turned out to be problematic for some applications that 

296 wanted to control the logging configuration more precisely. This injection behavior is now disabled. If you wish to 

297 inject the handler into the root logger, you can do so manually. See the Python logging documentation for more information. 

298 

299 Returns: 

300 logging.Logger: The telemetry logger handler if logging was enabled via signal_types in initialize_telemetry, 

301 otherwise this function returns None. 

302 Raises: 

303 RuntimeError: if `initialize_telemetry` has not been called 

304 """ 

305 global __ANACONDA_TELEMETRY_INITIALIZED 

306 if __ANACONDA_TELEMETRY_INITIALIZED is False: 

307 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") # Since init didn't happen this is not exported in OTel!!! 

308 raise RuntimeError("Anaconda telemetry system not initialized.") 

309 if _AnacondaLogger._instance is not None: 

310 return _AnacondaLogger._instance._get_log_handler() 

311 return None # No logger handler available, logging not initialized or not configured. 

312 

313def send_event(body: str, event_name: str, attributes: AttrDict={}) -> bool: 

314 """ 

315 Sends a log event directly to the OpenTelemetry pipeline without using Python's logging module. 

316 This is useful when you want to export log telemetry but don't want the output mixing with 

317 your application's output or developer logs. 

318 

319 Params: 

320 body (str): the log message body 

321 event_name (str): mandatory event name added to attributes 

322 attributes (AttrDict): optional attributes dict 

323 Returns: 

324 bool: True if the event was sent, False if logging was not initialized 

325 Raises: 

326 RuntimeError: if `initialize_telemetry` has not been called 

327 """ 

328 global __ANACONDA_TELEMETRY_INITIALIZED 

329 if __ANACONDA_TELEMETRY_INITIALIZED is False: 

330 logging.getLogger(__package__).error("Anaconda telemetry system not initialized.") 

331 raise RuntimeError("Anaconda telemetry system not initialized.") 

332 if _AnacondaLogger._instance is not None: 

333 event_logger = _AnacondaLogger._instance._get_event_logger() 

334 event_logger._send_event(body, event_name, _AnacondaLogger._instance._process_attributes(attributes)) 

335 return True 

336 return False