Coverage for anaconda_opentelemetry / logging.py: 97%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 15:45 +0000

1# -*- coding: utf-8 -*- 

2# SPDX-FileCopyrightText: 2025 Anaconda, Inc 

3# SPDX-License-Identifier: Apache-2.0 

4 

5# logging.py 

6""" 

7Anaconda Telemetry - Logging signal class and EventLogger. 

8""" 

9 

10import warnings 

11from opentelemetry.sdk._logs import LogDeprecatedInitWarning 

12warnings.filterwarnings( 

13 "ignore", 

14 category=LogDeprecatedInitWarning, 

15 message=".*LogRecord will be removed.*", 

16) 

17 

18import json 

19import logging 

20from typing import Dict 

21 

22from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, LogRecord 

23from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter 

24 

25from .common import _AnacondaCommon 

26from .config import Configuration as Config 

27from .attributes import ResourceAttributes as Attributes 

28from .exporter_shim import OTLPLogExporterShim 

29from .formatting import AttrDict, EventPayload, log_event_name_key 

30 

31 

32class EventLogger: 

33 """ 

34 Emits log records purely as OTel log telemetry, bypassing Python's 

35 logging hierarchy so they never appear in console/file handlers or 

36 interfere with developer log levels. Optional way to export logs, 

37 the Python logging module is supported as well. 

38 """ 

39 

40 def __init__( 

41 self, 

42 provider: LoggerProvider, 

43 logger_name: str = "event_logger", 

44 ): 

45 self._logger = provider.get_logger(logger_name) 

46 

47 def _send_event( 

48 self, 

49 body: EventPayload, 

50 event_name: str, 

51 attributes: AttrDict={}, 

52 ): 

53 if not isinstance(body, str): 

54 body = json.dumps(body) 

55 # update attributes with event name - mandatory for event logs 

56 attributes.update({log_event_name_key: event_name}) 

57 record = LogRecord( 

58 body=body, 

59 attributes=attributes, 

60 ) 

61 self._logger.emit(record) 

62 

63 

64class _AnacondaLogger(_AnacondaCommon): 

65 # Singleton instance (internal only); provide a logger handler for OpenTelemetry log instrumentation 

66 _instance = None 

67 

68 _default_log_attributes = {log_event_name_key: "__LOG__"} 

69 

70 def __init__(self, config: Config, attributes: Attributes): 

71 super().__init__(config, attributes) 

72 self.log_level = self._get_log_level(config._get_logging_level()) 

73 self.logger_endpoint = config._get_logging_endpoint() 

74 

75 # Create logger provider 

76 self._provider = LoggerProvider(resource=self.resource) 

77 self._console_exporter: ConsoleLogExporter | None = None 

78 # Add OTLP exporter 

79 if self.use_console_exporters: 

80 exporter = ConsoleLogExporter() 

81 self._console_exporter = exporter 

82 else: 

83 auth_token = config._get_auth_token_logging() 

84 headers: Dict[str, str] = {} 

85 if auth_token is not None: 

86 headers['authorization'] = f'Bearer {auth_token}' 

87 if config._get_request_protocol_logging() in ['grpc', 'grpcs']: # gRPC 

88 from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as OTLPLogExportergRPC 

89 insecure = not config._get_TLS_logging() 

90 exporter = OTLPLogExporterShim( 

91 OTLPLogExportergRPC, 

92 endpoint=self.logger_endpoint, 

93 insecure=insecure, 

94 credentials=config._get_ca_cert_logging() if not insecure else None, 

95 headers=headers 

96 ) 

97 else: # HTTP 

98 from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as OTLPLogExporterHTTP 

99 exporter = OTLPLogExporterShim( 

100 OTLPLogExporterHTTP, 

101 endpoint=self.logger_endpoint, 

102 certificate_file=config._get_ca_cert_logging(), 

103 headers=headers 

104 ) 

105 

106 self.exporter = exporter 

107 self._processor = BatchLogRecordProcessor(self.exporter) 

108 self._provider.add_log_record_processor(self._processor) 

109 

110 def _get_log_handler(self) -> LoggingHandler: 

111 handler = LoggingHandler(level=self.log_level, logger_provider=self._provider) 

112 handler.addFilter(self._set_default_attribute_filter()) 

113 return handler 

114 

115 def _set_default_attribute_filter(self) -> logging.Filter: 

116 attrs = self._default_log_attributes 

117 f = logging.Filter() 

118 def _filter(record): 

119 for k, v in attrs.items(): 

120 if not hasattr(record, k): 

121 setattr(record, k, v) 

122 return True 

123 f.filter = _filter 

124 return f 

125 

126 def _get_event_logger(self, logger_name: str = None) -> EventLogger: 

127 if logger_name is None: 

128 logger_name = f'{self.service_name}_event_logger' 

129 return EventLogger(self._provider, logger_name=logger_name) 

130 

131 def _get_log_level(self, str_level: str)-> int: 

132 # Convert string from config file to logging level. 

133 levels = { 

134 "debug": logging.DEBUG, 

135 "info": logging.INFO, 

136 "warning": logging.WARNING, 

137 "warn": logging.WARNING, 

138 "error": logging.ERROR, 

139 "critical": logging.CRITICAL, 

140 "fatal": logging.CRITICAL 

141 } 

142 return levels.get(str_level.lower(), logging.DEBUG) 

143 

144 def _test_set_console_mock(self, new_out): # For testing only... 

145 if self._console_exporter is not None and new_out is not None: 

146 saved = self._console_exporter.out 

147 self._console_exporter.out = new_out 

148 return saved 

149 return None