Coverage for anaconda_opentelemetry / exporter_shim.py: 98%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 15:45 +0000

1import threading, logging 

2from enum import Enum 

3from opentelemetry.sdk.metrics.export import MetricExporter 

4from opentelemetry.sdk.trace.export import SpanExporter 

5from opentelemetry.sdk._logs.export import LogExporter 

6 

7 

8class ExporterState(Enum): 

9 READY = 1 

10 UPDATING = 2 

11 

12 

13class _OTLPExporterMixin: 

14 """Mixin that provides common functionality for all OTLP exporter shims""" 

15 

16 def __init__(self, exporter_class, **kwargs): 

17 self._logger = logging.getLogger('exporter_shim_logger') 

18 self._lock = threading.Lock() 

19 self._exporter_class = exporter_class 

20 self._init_kwargs = kwargs 

21 self._exporter = exporter_class(**kwargs) 

22 self._state = ExporterState.READY 

23 

24 def _swap_exporter(self, batch_access=None): 

25 try: 

26 new_exporter = self._exporter_class(**self._init_kwargs) 

27 except Exception: 

28 with self._lock: 

29 self._state = ExporterState.READY 

30 return False 

31 

32 with self._lock: 

33 old_exporter = self._exporter 

34 if batch_access is not None: 

35 batch_access.force_flush() 

36 old_exporter.shutdown() 

37 self._exporter = new_exporter 

38 self._state = ExporterState.READY 

39 

40 return True 

41 

42 def change_signal_endpoint(self, batch_access, config, new_endpoint, auth_token=None): 

43 

44 endpoint = config._change_signal_endpoint( 

45 self._signal, 

46 new_endpoint, 

47 auth_token=auth_token 

48 ) 

49 self._init_kwargs['endpoint'] = endpoint 

50 

51 with self._lock: 

52 self._state = ExporterState.UPDATING 

53 

54 return self._swap_exporter(batch_access=batch_access) 

55 

56 def export(self, *args, **kwargs): 

57 try: 

58 return self._exporter.export(*args, **kwargs) 

59 except Exception as exception: 

60 self._logger.error(f"Failed to export: {exception}") 

61 return False 

62 

63 def shutdown(self, *args, **kwargs): 

64 return self._exporter.shutdown(*args, **kwargs) 

65 

66 def force_flush(self, *args, **kwargs): 

67 # this function doesn't need tests, it must be implemented but exporter method is stub 

68 return self._exporter.force_flush(*args, **kwargs) 

69 

70 

71class OTLPMetricExporterShim(_OTLPExporterMixin, MetricExporter): 

72 _signal = 'metrics' 

73 

74 @property 

75 def _preferred_temporality(self): 

76 return self._exporter._preferred_temporality 

77 

78 @property 

79 def _preferred_aggregation(self): 

80 return self._exporter._preferred_aggregation 

81 

82 

83class OTLPSpanExporterShim(_OTLPExporterMixin, SpanExporter): 

84 _signal = 'tracing' 

85 

86 

87class OTLPLogExporterShim(_OTLPExporterMixin, LogExporter): 

88 _signal = 'logging'