Coverage for anaconda_opentelemetry / logging.py: 97%
74 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
1# -*- coding: utf-8 -*-
2# SPDX-FileCopyrightText: 2025 Anaconda, Inc
3# SPDX-License-Identifier: Apache-2.0
5# logging.py
6"""
7Anaconda Telemetry - Logging signal class and EventLogger.
8"""
10import warnings
11from opentelemetry.sdk._logs import LogDeprecatedInitWarning
12warnings.filterwarnings(
13 "ignore",
14 category=LogDeprecatedInitWarning,
15 message=".*LogRecord will be removed.*",
16)
18import json
19import logging
20from typing import Dict
22from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler, LogRecord
23from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
25from .common import _AnacondaCommon
26from .config import Configuration as Config
27from .attributes import ResourceAttributes as Attributes
28from .exporter_shim import OTLPLogExporterShim
29from .formatting import AttrDict, EventPayload, log_event_name_key
32class EventLogger:
33 """
34 Emits log records purely as OTel log telemetry, bypassing Python's
35 logging hierarchy so they never appear in console/file handlers or
36 interfere with developer log levels. Optional way to export logs,
37 the Python logging module is supported as well.
38 """
40 def __init__(
41 self,
42 provider: LoggerProvider,
43 logger_name: str = "event_logger",
44 ):
45 self._logger = provider.get_logger(logger_name)
47 def _send_event(
48 self,
49 body: EventPayload,
50 event_name: str,
51 attributes: AttrDict={},
52 ):
53 if not isinstance(body, str):
54 body = json.dumps(body)
55 # update attributes with event name - mandatory for event logs
56 attributes.update({log_event_name_key: event_name})
57 record = LogRecord(
58 body=body,
59 attributes=attributes,
60 )
61 self._logger.emit(record)
64class _AnacondaLogger(_AnacondaCommon):
65 # Singleton instance (internal only); provide a logger handler for OpenTelemetry log instrumentation
66 _instance = None
68 _default_log_attributes = {log_event_name_key: "__LOG__"}
70 def __init__(self, config: Config, attributes: Attributes):
71 super().__init__(config, attributes)
72 self.log_level = self._get_log_level(config._get_logging_level())
73 self.logger_endpoint = config._get_logging_endpoint()
75 # Create logger provider
76 self._provider = LoggerProvider(resource=self.resource)
77 self._console_exporter: ConsoleLogExporter | None = None
78 # Add OTLP exporter
79 if self.use_console_exporters:
80 exporter = ConsoleLogExporter()
81 self._console_exporter = exporter
82 else:
83 auth_token = config._get_auth_token_logging()
84 headers: Dict[str, str] = {}
85 if auth_token is not None:
86 headers['authorization'] = f'Bearer {auth_token}'
87 if config._get_request_protocol_logging() in ['grpc', 'grpcs']: # gRPC
88 from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter as OTLPLogExportergRPC
89 insecure = not config._get_TLS_logging()
90 exporter = OTLPLogExporterShim(
91 OTLPLogExportergRPC,
92 endpoint=self.logger_endpoint,
93 insecure=insecure,
94 credentials=config._get_ca_cert_logging() if not insecure else None,
95 headers=headers
96 )
97 else: # HTTP
98 from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter as OTLPLogExporterHTTP
99 exporter = OTLPLogExporterShim(
100 OTLPLogExporterHTTP,
101 endpoint=self.logger_endpoint,
102 certificate_file=config._get_ca_cert_logging(),
103 headers=headers
104 )
106 self.exporter = exporter
107 self._processor = BatchLogRecordProcessor(self.exporter)
108 self._provider.add_log_record_processor(self._processor)
110 def _get_log_handler(self) -> LoggingHandler:
111 handler = LoggingHandler(level=self.log_level, logger_provider=self._provider)
112 handler.addFilter(self._set_default_attribute_filter())
113 return handler
115 def _set_default_attribute_filter(self) -> logging.Filter:
116 attrs = self._default_log_attributes
117 f = logging.Filter()
118 def _filter(record):
119 for k, v in attrs.items():
120 if not hasattr(record, k):
121 setattr(record, k, v)
122 return True
123 f.filter = _filter
124 return f
126 def _get_event_logger(self, logger_name: str = None) -> EventLogger:
127 if logger_name is None:
128 logger_name = f'{self.service_name}_event_logger'
129 return EventLogger(self._provider, logger_name=logger_name)
131 def _get_log_level(self, str_level: str)-> int:
132 # Convert string from config file to logging level.
133 levels = {
134 "debug": logging.DEBUG,
135 "info": logging.INFO,
136 "warning": logging.WARNING,
137 "warn": logging.WARNING,
138 "error": logging.ERROR,
139 "critical": logging.CRITICAL,
140 "fatal": logging.CRITICAL
141 }
142 return levels.get(str_level.lower(), logging.DEBUG)
144 def _test_set_console_mock(self, new_out): # For testing only...
145 if self._console_exporter is not None and new_out is not None:
146 saved = self._console_exporter.out
147 self._console_exporter.out = new_out
148 return saved
149 return None