Source code for iniesta.sns.message

from typing import Union, Any

import botocore
import ujson as json

from insanic.conf import settings

from iniesta.log import logger, error_logger
from iniesta.sessions import BotoSession
from iniesta.messages import MessageAttributes

#: A constant for the max body size SNS can publish.
MAX_BODY_SIZE: int = 1024 * 256


[docs]class SNSMessage(MessageAttributes): """ A SNS Message object that will be serialized to send. """ def __init__(self, message: str = ""): super().__init__() self["Message"] = message self["MessageStructure"] = "string" self["MessageAttributes"] = {} @property def event(self) -> Union[str, None]: """ The event this message is attached to. Returns :code:`None` if event is not set. """ try: return self.message_attributes[settings.INIESTA_SNS_EVENT_KEY][ "StringValue" ] except KeyError: return None @property def message(self) -> Any: """ The message body. """ return self["Message"] @message.setter def message(self, value: str) -> None: """ Setter for the message body of the request. :param value: Any json serializable value. :raises TypeError: If the value is not able to be json dumpable. :raises ValueError: If the message is too long to be published. """ if not isinstance(value, str): try: value = json.dumps(value) except Exception: raise TypeError("Message must be a string.") if len(value.encode("utf8")) > MAX_BODY_SIZE: raise ValueError( f"Message is too long! Max is {MAX_BODY_SIZE} bytes. " f"{len(value.encode('utf8'))} bytes calculated." ) self["Message"] = value @property def size(self) -> int: """ The size of this message. """ return len(json.dumps(self.data).encode("utf8")) @property def subject(self) -> str: """ Subject to be used when sending to email subscription. """ return self["Subject"] @subject.setter def subject(self, value: str) -> None: """ :raises TypeError: If the value is not a string. """ if not isinstance(value, str): raise TypeError("Subject must be a string.") self["Subject"] = value @property def message_structure(self) -> str: """ The message structure. """ return self["MessageStructure"] @message_structure.setter def message_structure(self, value: str) -> None: """ Setter for message structure. :param value: Must be the literal value :code:`"json"` or :code:`"string"`. :raises ValueError: If the value is not either "json" or "string" """ if value not in ["json", "string"]: raise ValueError( "MessageStructure must either be 'json' or 'string'." ) self["MessageStructure"] = value
[docs] async def publish(self) -> dict: """ Serializes this message and publishes this message to SNS. :return: The response of the publish request to SNS. """ session = BotoSession.get_session() try: async with session.create_client( "sns", region_name=BotoSession.aws_default_region, endpoint_url=self.client.endpoint_url, aws_access_key_id=BotoSession.aws_access_key_id, aws_secret_access_key=BotoSession.aws_secret_access_key, ) as client: message = await client.publish( TopicArn=self.client.topic_arn, **self ) logger.debug( f"[INIESTA] Published ({self.event}) with " f"the following attributes: {self}" ) return message except botocore.exceptions.ClientError as e: error_logger.critical( f"[{e.response['Error']['Code']}]: {e.response['Error']['Message']}" ) raise except Exception: error_logger.exception("Publishing SNS Message Failed!") raise
[docs] @classmethod def create_message( cls, client, *, event: str, message: Any, version: int = 1, raw_event: bool = False, **message_attributes, ): """ A factory method to initialize an event message. :param client: The initialized SNSClient :type client: :code:`SNSClient` :param event: The event this message will publish. :param message: The message body to publish. :param version: Version of the message. :param raw_event: If the event should be passed in as itself. :param message_attributes: Any message attributes :return: Instantiated instance of self. :rtype: :code:`SNSMessage` """ message_object = cls(message) message_object.message = message for ma, mv in message_attributes.items(): message_object.add_attribute(ma, mv) message_object.add_event(event, raw=raw_event) message_object.add_number_attribute("version", version) message_object.client = client return message_object