from typing import Any, Union
import ujson as json
from collections import UserDict
from insanic.conf import settings
[docs]class MessageAttributes(UserDict):
"""
Base class for :code:`SQSMessage` and :code:`SNSMessage`
because both messages uses message attributes.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self["MessageAttributes"] = {}
@property
def message_attributes(self) -> dict:
return self.get("MessageAttributes", {})
[docs] def add_event(self, value: str, *, raw: bool = False):
"""
Adds the event to the message to be sent.
"""
if not isinstance(value, str):
raise ValueError("Event must be a string.")
if not raw and not value.endswith(f".{settings.get('SERVICE_NAME')}"):
value = ".".join([value, settings.SERVICE_NAME])
self.add_string_attribute(settings.INIESTA_SNS_EVENT_KEY, value)
[docs] def add_attribute(self, attribute_name: str, attribute_value: Any) -> None:
"""
Adds an attribute depending on value type.
:raises ValueError: If the value is not a supported type.
"""
if isinstance(attribute_value, str):
self.add_string_attribute(attribute_name, attribute_value)
elif isinstance(attribute_value, (int, float)):
self.add_number_attribute(attribute_name, attribute_value)
elif isinstance(attribute_value, (list, tuple)):
self.add_list_attribute(attribute_name, attribute_value)
elif isinstance(attribute_value, bytes):
self.add_binary_attribute(attribute_name, attribute_value)
else:
raise ValueError("Invalid type.")
[docs] def add_string_attribute(
self, attribute_name: str, attribute_value: str
) -> None:
"""
Adds a string attribute to the message.
:raises ValueError: If the value is not a string.
"""
if not isinstance(attribute_value, str):
raise ValueError("Value is not a string.")
self["MessageAttributes"].update(
{
attribute_name: {
"DataType": "String",
"StringValue": attribute_value,
}
}
)
[docs] def add_number_attribute(
self, attribute_name: str, attribute_value: Union[int, float]
) -> None:
"""
Adds a number attribute to the message.
:raises ValueError: If the value is not an int or a float.
"""
if not isinstance(attribute_value, (int, float)):
raise ValueError("Value is not a number.")
self["MessageAttributes"].update(
{
attribute_name: {
"DataType": "Number",
"StringValue": str(attribute_value),
}
}
)
[docs] def add_list_attribute(
self, attribute_name: str, attribute_value: Union[list, tuple]
) -> None:
"""
Adds a list attribute to the message.
:raises ValueError: If the value is not a list or tuple type.
"""
if not isinstance(attribute_value, (list, tuple)):
raise ValueError("Value is not a list or tuple.")
self["MessageAttributes"].update(
{
attribute_name: {
"DataType": "String.Array",
"StringValue": json.dumps(attribute_value),
}
}
)
[docs] def add_binary_attribute(self, attribute_name: str, attribute_value: bytes):
"""
Adds a binary attribute.
:raises ValueError: If the value is not bytes
"""
if not isinstance(attribute_value, bytes):
raise ValueError("Value is not bytes.")
self["MessageAttributes"].update(
{
attribute_name: {
"DataType": "Binary",
"BinaryValue": attribute_value,
}
}
)