Source code for iniesta.app

from types import MappingProxyType

from insanic import Insanic
from insanic.exceptions import ImproperlyConfigured
from insanic.functional import empty
from sanic.config import Config

from . import config
from .choices import InitializationTypes
from .listeners import IniestaListener
from .utils import filter_list_to_filter_policies


[docs]class _Iniesta(object): """ Initializes Iniesta with different methods. You should probably not use this. You should import just Iniesta. :code:`from iniesta import Iniesta` """ def __init__(self) -> None: self.config_imported = False self._initialization_type = empty self.INITIALIZATION_MAPPING = MappingProxyType( { InitializationTypes.QUEUE_POLLING: self._init_queue_polling, InitializationTypes.EVENT_POLLING: self._init_event_polling, InitializationTypes.SNS_PRODUCER: self._init_producer, InitializationTypes.CUSTOM: self._init_custom, } ) @property def initialization_type(self) -> InitializationTypes: final = InitializationTypes(0) if self._initialization_type is empty: return final for it in self._initialization_type: final |= it return final @initialization_type.setter def initialization_type(self, value: InitializationTypes) -> None: if not isinstance(value, InitializationTypes): raise ValueError("Must be an InitializationTypes choice.") if self._initialization_type is empty: self._initialization_type = [] self._initialization_type.append(value)
[docs] def check_global_arn(self, settings_object: Config) -> None: if settings_object.INIESTA_SNS_PRODUCER_GLOBAL_TOPIC_ARN is None: raise ImproperlyConfigured( "INIESTA_SNS_PRODUCER_GLOBAL_TOPIC_ARN not set in settings!" )
[docs] def load_config(self, settings_object: Config) -> None: if not self.config_imported: for c in dir(config): if c.isupper(): conf = getattr(config, c) if c == "INIESTA_CACHES": settings_object.INSANIC_CACHES.update(conf) elif not hasattr(settings_object, c): setattr(settings_object, c, conf) self.config_imported = True
[docs] def unload_config(self, settings_object: Config) -> None: for c in dir(config): if c.isupper(): try: delattr(settings_object, c) except AttributeError: pass self.config_imported = False
def _order_initialization_type(self, settings_object: Config) -> list: try: init_types = settings_object.INIESTA_INITIALIZATION_TYPE return sorted([InitializationTypes[it] for it in init_types]) except (KeyError, TypeError): raise ImproperlyConfigured( f"{str(init_types)} is " f"an invalid initialization type. " f"Choices are {', '.join(str(i) for i in self.INITIALIZATION_MAPPING.keys())}" ) def _validate_initialization_type(self, settings_object: Config): if settings_object.INIESTA_INITIALIZATION_TYPE is None: raise ImproperlyConfigured( "Please configure INIESTA_INITIALIZATION_TYPE in your config!" ) if not isinstance( settings_object.INIESTA_INITIALIZATION_TYPE, (list, tuple) ): raise ImproperlyConfigured( "INIESTA_INITIALIZATION_TYPE type is invalid. Must be list or tuple!" )
[docs] def init_app(self, app: Insanic) -> None: """ Initializes the application depending on INIESTA_INITIALIZATION_TYPE set in settings """ if self._initialization_type is not empty: raise ImproperlyConfigured("Iniesta has already been initialized!") self.load_config(app.config) self._validate_initialization_type(app.config) initialization_types = self._order_initialization_type(app.config) for choice in initialization_types: initialization_method = self.INITIALIZATION_MAPPING[choice] initialization_method(app) self.initialization_type = choice
def _init_custom(self, app: Insanic) -> None: """ Initializes the application for custom use. Actions this takes: - Loads iniesta configs """ self.load_config(app.config) def _init_producer(self, app: Insanic) -> None: """ Initializes the application with only SNS producing capabilities. Checks if the global sns arn exists. If not fails running of application. Actions: - Checks if global arn is valid - Loads iniesta configs """ self.load_config(app.config) if not app.config.INIESTA_DRY_RUN: # check if global arn exists self.check_global_arn(app.config) listener = IniestaListener() app.register_listener( listener.after_server_start_producer_check, "after_server_start" ) def _init_queue_polling(self, app: Insanic) -> None: """ Basic sqs queue polling without the need for checking subscriptions. Actions: - Loads iniesta configs - Attaches listeners to check if queue exists """ self.load_config(app.config) if not app.config.INIESTA_DRY_RUN: listener = IniestaListener() app.register_listener( listener.after_server_start_start_queue_polling, "after_server_start", ) app.register_listener( listener.before_server_stop_stop_polling, "before_server_stop" ) def _init_event_polling(self, app: Insanic) -> None: """ Initializes for event polling. Actions: - Checks if global arn exists - Checks if filters are 0 to avoid receiving all messages - Loads iniesta configs - Attaches listeners - Checks if queue exists (initialize) - Checks subscriptions - Checks permissions """ self.load_config(app.config) if not app.config.INIESTA_DRY_RUN: # check if global arn exists self.check_global_arn(app.config) # check if filters are 0 if len(app.config.INIESTA_SQS_CONSUMER_FILTERS) == 0: raise ImproperlyConfigured( "INIESTA_SQS_CONSUMER_FILTERS is an empty list. " "Please specify events to receive!" ) listener = IniestaListener() app.register_listener( listener.after_server_start_event_polling, "after_server_start" ) app.register_listener( listener.before_server_stop_stop_polling, "before_server_stop" )
[docs] def filter_policies(self) -> dict: """ Serializes the defined filter policies that AWS apis can use. """ from insanic.conf import settings return filter_list_to_filter_policies( settings.INIESTA_SNS_EVENT_KEY, settings.INIESTA_SQS_CONSUMER_FILTERS, )
Iniesta = _Iniesta()