Event Polling Initialization

If your application need to receive events for consumption, you should set your INIESTA_INITIALIZATION_TYPE to include EVENT_POLLING.

Preparation

  • All necessary AWS resources have been created

    • A SNS Topic

    • A SQS Queue

    • Subscription with filter policies

    • Proper permissions

  • EVENT_POLLING is included in INIESTA_INITIALIZATION_TYPE

  • Set INIESTA_SNS_PRODUCER_GLOBAL_TOPIC_ARN

  • Set your INIESTA_SQS_CONSUMER_FILTERS

  • Set your INIESTA_SQS_QUEUE_NAME or fallback to INIESTA_SQS_QUEUE_NAME_TEMPLATE.

  • Set your cache connection details on INIESTA_CACHES that aioredlock will use. It is recommended to have at least 3 different cache connections.

To get an idea of what AWS Resources are needed, please refer to AWS Resources.

Execution

On initialization:

  1. Iniesta will verify the global topic arn defined in the settings exist. (e.g. the INIESTA_SNS_PRODUCER_GLOBAL_TOPIC_ARN setting)

  2. Verify that filter policies exist on AWS. The INIESTA_SQS_CONSUMER_FILTERS.

  3. Loads the iniesta settings to Insanic’s settings

  4. Attaches the appropriate listeners.

Then on start up:

  1. Initializes the SQSClient and sets it to app.messi.

  2. Checks if the queue exists.

  3. Checks if the queue is subscribed to the topic.

  4. Checks if the necessary permissions exists.

  5. Starts polling the queue for messages.

Now, we should be able to receive message from the queue. BUT, what do we do with those messages? We need to attach handlers.

Handlers

We need to attach handlers for each event. SQSClient provides a class method to attach handlers to process the received messages.

from iniesta.sqs import SQSClient

@SQSClient.handler("SomethingHappened.somewhere")
def something_happened_handler(message):
    # .. do some logic
    return

Or a class method.

class Handlers:
    @SQSClient.handler(['SomethingHappened.elsewhere', "AnotherEvent.knowhere"])
    def something_happened_handler(self, message):
        # .. some logic
        pass

Or directly.

def something_else_happened_handler(message):
    # do some logic
    return

SQSClient.add_handler(something_else_happened_handler,
                      "SomethingElseHappened.nowhere")

Note that the callable must receive a message argument. Also the events can be a list if you want to bind the same handler for multiple events.

To define a default handler, don’t set a event. The default handler is if Iniesta receives a message that doesn’t have an attached handler, it falls back to the default handler.

@SQSClient.handler
def default_handler(message)
    # do something
    pass

Polling

The typical flow for when Iniesta receives a message is as follows.

  1. Receives message from SQS.

  2. Acquires a lock with aioredlock using the message_id to enforce idempotency.

  3. Once acquired, look for a handler:

    1. Match the event in INIESTA_SNS_EVENT_KEY in the received message body for a registered handler.

    2. If not found, look for a default handler.

    3. If not found, raise KeyError.

  4. If a handler is found execute!

    1. If executed, and no exception is raised, delete the message from sqs.

    2. If exception is raises, do nothing so the message can be consumed again after the invisibility timeout.

Note

There is currently a know issue where if the module containing the handlers are not imported on start up the handlers do not get registered. For a quick fix import the module(s) where your Insanic app resides.

See Also