from typing import Any
import hashlib
import ujson as json
from botocore.exceptions import ClientError
from insanic.conf import settings
from iniesta.log import error_logger
from iniesta.messages import MessageAttributes
from iniesta.sessions import BotoSession
empty = object()
VALID_SEND_MESSAGE_ARGS = [
"MessageBody",
"DelaySeconds",
"MessageAttributes",
"MessageDeduplicationId",
"MessageGroupId",
]
ERROR_MESSAGES = {
"delay_seconds_out_of_bounds": "Delay Seconds must be between 0 and 900 inclusive. Got {value}.",
"delay_seconds_type_error": "Delay Seconds must be an integer. Got {value}.",
}
[docs]class SQSMessage(MessageAttributes):
"""
The Message object that will be used to send a message to SQS.
:param client: The client that will be sending this message.
:type client: :code:`SQSClient`
:param message: The message to send. A json serializable value.
"""
def __init__(self, client, message: Any) -> None:
super().__init__()
self.client = client
self["MessageBody"] = message
self.message_id = None
self.original_message = None
self.receipt_handle = None
self.md5_of_body = None
self.attributes = None
[docs] @classmethod
def from_sqs(cls, client, message: Any):
"""
A helper method that unpacks everything from receive_message
:param client: SQSClient instance from which the message came from
:type client: :code:`SQSClient`
:param message: The message from receive_message when polling SQS.
:return: A initialized SQSMessage instance.
:rtype: :code:`SQSMessage`
"""
try:
message_object = cls(client, message["Body"])
message_object.original_message = message
message_object.message_id = message["MessageId"]
message_object.receipt_handle = message["ReceiptHandle"]
message_object.md5_of_body = message["MD5OfBody"]
message_object.attributes = message["Attributes"]
message_object["MessageAttributes"] = message.get(
"MessageAttributes", {}
)
except KeyError as e: # pragma: no cover
raise ValueError(f"SQS Message is invalid: {e.args[0]}")
else:
return message_object
def __eq__(self, other):
if self.message_id is not None:
return self.message_id == other.message_id
else:
return False
@property
def delay_seconds(self) -> int:
"""
The length of time in seconds to delay the message.
"""
return self.get("DelaySeconds", 0)
@delay_seconds.setter
def delay_seconds(self, value: int) -> None:
"""
To set the length of time in seconds to delay the message.
:raises TypeError: If the value is not an int.
:raises ValueError: If the value is not between 0 and 900.
"""
if not isinstance(value, int):
raise TypeError(
ERROR_MESSAGES["delay_seconds_type_error"].format(value=value)
)
elif value < 0 or value > 900:
raise ValueError(
ERROR_MESSAGES["delay_seconds_out_of_bounds"].format(
value=value
)
)
self["DelaySeconds"] = value
@property
def raw_body(self):
"""
The raw body of the message.
"""
return self["MessageBody"]
@property
def body(self):
"""
The body as a python object.
"""
try:
return json.loads(self.raw_body)
except ValueError:
return self.raw_body
@property
def event(self) -> str:
"""
The event that this message was received as.
"""
return self.message_attributes.get(settings.INIESTA_SNS_EVENT_KEY, None)
[docs] def checksum_body(self) -> bool:
"""
Verifies the body was properly received.
"""
return (
hashlib.md5(self["MessageBody"].encode("utf-8")).hexdigest()
== self.md5_of_body
)
@property
def message_attributes(self) -> dict:
"""
Any message attributes attached to this body.
Refer to https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
"""
_message_attributes = {}
for attribute, attribute_value in self["MessageAttributes"].items():
data_type = attribute_value["DataType"].split(".", 1)[0]
if data_type == "Number":
data_type = "String"
_message_attributes.update(
{attribute: attribute_value[f"{data_type}Value"]}
)
return _message_attributes
[docs] async def send(self):
"""
Sends this message to the queue defined in client.
:rtype: :code:`SQSMessage`
:raises botocore.exceptions.ClientError: If there was an issue when sending the message to SQS.
"""
session = BotoSession.get_session()
try:
async with session.create_client(
"sqs",
region_name=BotoSession.aws_default_region,
endpoint_url=self.client.endpoint_url,
aws_access_key_id=BotoSession.aws_access_key_id,
aws_secret_access_key=BotoSession.aws_secret_access_key,
) as client:
message = await client.send_message(
QueueUrl=self.client.queue_url,
**{
k: v
for k, v in self.items()
if k in VALID_SEND_MESSAGE_ARGS
},
)
self.message_id = message["MessageId"]
self.md5_of_body = message["MD5OfMessageBody"]
return self
except ClientError as e:
error_logger.critical(
f"[{e.response['Error']['Code']}]: {e.response['Error']['Message']}"
)
raise
except Exception: # pragma: no cover
error_logger.exception("Sending SQS message failed.")
raise