Coverage for anaconda_opentelemetry / common.py: 100%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-17 15:45 +0000

1# -*- coding: utf-8 -*- 

2# SPDX-FileCopyrightText: 2025 Anaconda, Inc 

3# SPDX-License-Identifier: Apache-2.0 

4 

5# common.py 

6""" 

7Anaconda Telemetry - Common base class and exceptions for signal classes. 

8""" 

9 

10import logging, hashlib, json 

11from typing import Dict 

12from dataclasses import fields 

13 

14from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION 

15 

16from .config import Configuration as Config 

17from .attributes import ResourceAttributes as Attributes 

18from .__version__ import __SDK_VERSION__, __TELEMETRY_SCHEMA_VERSION__ 

19from .formatting import AttrDict 

20 

21 

22class MetricsNotInitialized(RuntimeError): 

23 pass 

24 

25 

26class _AnacondaCommon: 

27 # Base class for common attributes and methods (internal only) 

28 def __init__(self, config: Config, attributes: Attributes): 

29 self._config = config 

30 # Init resource_attributes 

31 self._resource_attributes = {} 

32 self.resource = None 

33 # session id 

34 self._session_id = None 

35 # user id 

36 self._user_id = None 

37 

38 # Make self._resource_attributes and self.resource 

39 self.make_otel_resource(attributes) 

40 

41 self.logger = logging.getLogger(__package__) 

42 

43 # assemble config and attribute values 

44 # default endpoint 

45 self.default_endpoint = config._get_default_endpoint() 

46 # export options 

47 self.use_console_exporters = config._get_console_exporter() 

48 

49 def make_otel_resource(self, attributes: Attributes): 

50 # Read resource attributes 

51 resource_attrs = attributes._get_attributes() 

52 # Required parameters 

53 self.service_name = resource_attrs["service_name"] 

54 self.service_version = resource_attrs["service_version"] 

55 # prepare to use `_process_attributes` 

56 self._user_id = resource_attrs["user_id"] 

57 del resource_attrs["service_name"], resource_attrs["service_version"], resource_attrs["user_id"] 

58 

59 # convert parameters value to stringified JSON 

60 resource_attrs["parameters"] = json.dumps(resource_attrs["parameters"]) 

61 # Init resource_attributes 

62 self._resource_attributes = { 

63 SERVICE_NAME: self.service_name, 

64 SERVICE_VERSION: self.service_version 

65 } 

66 self._resource_attributes.update(resource_attrs) 

67 # convert to otel names 

68 for attr in fields(attributes): 

69 otel_name = attr.metadata.get('otel_name', None) 

70 if otel_name: 

71 self._resource_attributes[attr.metadata['otel_name']] = self._resource_attributes.pop(attr.name) 

72 self._session_id = self._hash_session_id(self._config._get_tracing_session_entropy()) 

73 self._resource_attributes['session.id'] = self._session_id 

74 self.resource = Resource.create(self._resource_attributes) 

75 

76 def _hash_session_id(self, entropy): 

77 # Hashes a session id for common attributes based on timestamp and user_id 

78 # entropy value ensures unique session_ids 

79 if entropy is None: 

80 raise KeyError("The entropy key has been removed.") 

81 

82 user_id = self._resource_attributes.get('user.id', '') 

83 combined = f"{entropy}|{user_id}|{self.service_name}" 

84 hashed = hashlib.sha256(combined.encode("utf-8")).hexdigest() 

85 

86 return hashed 

87 

88 def _process_attributes(self, attributes: AttrDict={}): 

89 # ensure attributes are of type AttrDict 

90 if not isinstance(attributes, Dict): 

91 self.logger.error(f"Attributes `{attributes}` are not a dictionary, they are not valid. They will be converted to an empty one.") 

92 attributes = {} 

93 # check attributes for invalid keys 

94 if any(not isinstance(key, str) or not key for key in attributes): 

95 self.logger.error(f"Attributes `{attributes}` passed with non empty str type key. Invalid attributes.") 

96 attributes = {} 

97 

98 # pulls a user id initially passed to ResourceAttributes and adds it to event specific events 

99 # for backwards compatability if people have been setting user.id with ResourceAttributes 

100 if not self._user_id: 

101 return attributes # no op 

102 elif 'user.id' in attributes: 

103 return attributes # key already exists 

104 else: 

105 attributes['user.id'] = self._user_id 

106 return attributes