Coverage for anaconda_opentelemetry / exporter_shim.py: 98%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 15:45 +0000
1import threading, logging
2from enum import Enum
3from opentelemetry.sdk.metrics.export import MetricExporter
4from opentelemetry.sdk.trace.export import SpanExporter
5from opentelemetry.sdk._logs.export import LogExporter
8class ExporterState(Enum):
9 READY = 1
10 UPDATING = 2
13class _OTLPExporterMixin:
14 """Mixin that provides common functionality for all OTLP exporter shims"""
16 def __init__(self, exporter_class, **kwargs):
17 self._logger = logging.getLogger('exporter_shim_logger')
18 self._lock = threading.Lock()
19 self._exporter_class = exporter_class
20 self._init_kwargs = kwargs
21 self._exporter = exporter_class(**kwargs)
22 self._state = ExporterState.READY
24 def _swap_exporter(self, batch_access=None):
25 try:
26 new_exporter = self._exporter_class(**self._init_kwargs)
27 except Exception:
28 with self._lock:
29 self._state = ExporterState.READY
30 return False
32 with self._lock:
33 old_exporter = self._exporter
34 if batch_access is not None:
35 batch_access.force_flush()
36 old_exporter.shutdown()
37 self._exporter = new_exporter
38 self._state = ExporterState.READY
40 return True
42 def change_signal_endpoint(self, batch_access, config, new_endpoint, auth_token=None):
44 endpoint = config._change_signal_endpoint(
45 self._signal,
46 new_endpoint,
47 auth_token=auth_token
48 )
49 self._init_kwargs['endpoint'] = endpoint
51 with self._lock:
52 self._state = ExporterState.UPDATING
54 return self._swap_exporter(batch_access=batch_access)
56 def export(self, *args, **kwargs):
57 try:
58 return self._exporter.export(*args, **kwargs)
59 except Exception as exception:
60 self._logger.error(f"Failed to export: {exception}")
61 return False
63 def shutdown(self, *args, **kwargs):
64 return self._exporter.shutdown(*args, **kwargs)
66 def force_flush(self, *args, **kwargs):
67 # this function doesn't need tests, it must be implemented but exporter method is stub
68 return self._exporter.force_flush(*args, **kwargs)
71class OTLPMetricExporterShim(_OTLPExporterMixin, MetricExporter):
72 _signal = 'metrics'
74 @property
75 def _preferred_temporality(self):
76 return self._exporter._preferred_temporality
78 @property
79 def _preferred_aggregation(self):
80 return self._exporter._preferred_aggregation
83class OTLPSpanExporterShim(_OTLPExporterMixin, SpanExporter):
84 _signal = 'tracing'
87class OTLPLogExporterShim(_OTLPExporterMixin, LogExporter):
88 _signal = 'logging'