Coverage for anaconda_opentelemetry / tracing.py: 92%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 15:45 +0000

1# -*- coding: utf-8 -*- 

2# SPDX-FileCopyrightText: 2025 Anaconda, Inc 

3# SPDX-License-Identifier: Apache-2.0 

4 

5# tracing.py 

6""" 

7Anaconda Telemetry - Tracing signal class and span classes. 

8""" 

9 

10import logging 

11from typing import Dict, Optional 

12from abc import ABC 

13 

14from opentelemetry import trace 

15from opentelemetry.sdk.trace import TracerProvider 

16from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter 

17from opentelemetry.propagate import get_global_textmap 

18from opentelemetry.trace.status import StatusCode 

19 

20from .common import _AnacondaCommon 

21from .config import Configuration as Config 

22from .attributes import ResourceAttributes as Attributes 

23from .exporter_shim import OTLPSpanExporterShim 

24from .formatting import AttrDict 

25 

26 

27class ASpan(ABC): 

28 """ 

29 Abstract base class for a span in the tracing system. This class should not be instantiated directly. 

30 Use the get_trace function to create an instance of this class. 

31 """ 

32 def add_event(self, name: str, attributes: AttrDict = None) -> None: 

33 """ 

34 Add an event to the span with the given name and attributes. 

35 

36 Args: 

37 name (str): The name of the event. 

38 attributes (dict, optional): Additional attributes for the event. Defaults to None. 

39 """ 

40 pass 

41 

42 def add_exception(self, exception: Exception) -> None: 

43 """ 

44 Add an exception to the span. If the exception is None, a generic exception is recorded. 

45 

46 Args: 

47 exception (Exception): The exception to add to the span. 

48 """ 

49 pass 

50 

51 def set_error_status(self, msg: Optional[str] = None) -> None: 

52 """ 

53 Set the status of the span to ERROR. This indicates that an error occurred during the span's execution. 

54 

55 Args: 

56 msg (str, optional): An optional message to include in the error status. Defaults to None. 

57 """ 

58 pass 

59 

60 def add_attributes(self, attributes: AttrDict) -> None: 

61 """ 

62 Adds attributes for the span (adds to the orginal attribute on creation of the span). 

63 

64 Args: 

65 attributes (dict): A dictionary of attributes to add for the span. 

66 """ 

67 pass 

68 

69 

70class _ASpan(ASpan): 

71 # A single class for the tracing yielded return value. 

72 def __init__(self, name: str, span: trace.Span, attributes: AttrDict = {}, noop: bool = False) -> None: 

73 self._noop = noop 

74 self._name = name 

75 self._attributes: AttrDict = attributes 

76 self._span: trace.Span = span 

77 

78 def add_event(self, name: str, attributes: AttrDict = None) -> None: 

79 if self._noop: return 

80 if attributes is None: 

81 attributes = {} 

82 self._span.add_event(f"{self._name}.{name}", attributes=attributes) 

83 

84 def add_exception(self, exception: Exception) -> None: 

85 if self._noop: return 

86 if exception is None: 

87 exception = Exception("Generic exception because the exception passed was None.") 

88 

89 self._span.record_exception(exception, 

90 attributes={ 

91 "exception.type": type(exception).__name__, 

92 "exception.message": str(exception) 

93 }) 

94 

95 def set_error_status(self, msg: Optional[str] = None) -> None: 

96 if self._noop: return 

97 self._span.set_status(StatusCode.ERROR, msg if msg else "An error occurred during the span's execution.") 

98 

99 def add_attributes(self, attributes: AttrDict) -> None: 

100 if self._noop: return 

101 if not isinstance(attributes, dict): 

102 raise TypeError("Attributes must be a dictionary of string key and string values.") 

103 self._attributes.update(attributes) 

104 self._span.set_attributes(self._attributes) 

105 

106 def _close(self) -> None: 

107 if self._noop: return 

108 self._span.end() 

109 

110 

111class _AnacondaTrace(_AnacondaCommon): 

112 # Singleton instance (internal only); provide a single instance of the tracing class 

113 _instance = None 

114 

115 def __init__(self, config: Config, attributes: Attributes): 

116 # Init singleton instance 

117 super().__init__(config, attributes) 

118 self.telemetry_export_interval_millis = config._get_tracing_export_interval_ms() 

119 self.tracing_endpoint = config._get_tracing_endpoint() 

120 

121 self.tracer = self._setup_tracing(config) 

122 

123 def _setup_tracing(self, config: Config) -> trace.Tracer: 

124 # Create tracer provider 

125 tracer_provider = TracerProvider(resource=self.resource) 

126 

127 # Add OTLP exporter 

128 if self.use_console_exporters: 

129 exporter = ConsoleSpanExporter() 

130 else: 

131 auth_token = config._get_auth_token_tracing() 

132 headers: Dict[str, str] = {} 

133 if auth_token is not None: 

134 headers['authorization'] = f'Bearer {auth_token}' 

135 if config._get_request_protocol_tracing() in ['grpc', 'grpcs']: # gRPC 

136 insecure = not config._get_TLS_tracing() 

137 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as OTLPSpanExportergRPC 

138 exporter = OTLPSpanExporterShim( 

139 OTLPSpanExportergRPC, 

140 endpoint=self.tracing_endpoint, 

141 insecure=insecure, 

142 credentials=config._get_ca_cert_tracing() if not insecure else None, 

143 headers=headers 

144 ) 

145 else: # HTTP 

146 from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPSpanExporterHTTP 

147 exporter = OTLPSpanExporterShim( 

148 OTLPSpanExporterHTTP, 

149 endpoint=self.tracing_endpoint, 

150 certificate_file=config._get_ca_cert_tracing(), 

151 headers=headers 

152 ) 

153 

154 self.exporter = exporter 

155 self._processor = BatchSpanProcessor(self.exporter, schedule_delay_millis=self.telemetry_export_interval_millis) 

156 tracer_provider.add_span_processor( 

157 self._processor 

158 ) 

159 

160 # Set as global provider 

161 try: 

162 trace.set_tracer_provider(tracer_provider) 

163 except Exception: 

164 self.logger.warning(f"The tracer provider was previously set, and will take precidence over the set in this package: anaconda_opentelemetry.") 

165 

166 # Get tracer for this service 

167 return trace.get_tracer(self.service_name, self.service_version) 

168 

169 def get_span(self, name: str, attributes: AttrDict = {}, carrier: Dict[str,str] = None) -> trace.Span: 

170 # Extract context if applicable 

171 if carrier is None: 

172 context = None 

173 else: 

174 context = get_global_textmap().extract(carrier) 

175 

176 span = self.tracer.start_span(name, context=context, attributes=attributes) 

177 

178 # Inject context if applicable 

179 if carrier is not None: 

180 context = trace.set_span_in_context(span, context) 

181 get_global_textmap().inject(carrier, context=context) 

182 

183 return _ASpan(name, span, attributes=attributes)