Source code for intergov.processors.multichannel_router

"""
* Get pending messages from the ApiOutboxRepo
* Mark them as 'sending' in the local storage
* Pick a channel
* Send them to the channel
* Change their status in the message_rx_api to "accepted" (?)
* Marks them as 'accepted' in the local storage

local storage - postgres for example (and for the demo)
"""
import random
import time

from intergov.conf import env_json, env_postgres_config, env_queue_config
from intergov.channels.http_api_channel import HttpApiChannel
from intergov.repos.api_outbox import ApiOutboxRepo
from intergov.repos.api_outbox.postgres_objects import Message as PostgresMessageRepr
from intergov.repos.message_updates import MessageUpdatesRepo
from intergov.loggers import logging
from intergov.domain.wire_protocols import generic_discrete as gd
from intergov.use_cases.route_to_channel import RouteToChannelUseCase

logger = logging.getLogger('multichannel_router')


[docs]class MultichannelWorker(object): """ Iterate over the RouteToChannelUseCase. """ ROUTING_TABLE = env_json("IGL_MCHR_ROUTING_TABLE", default=[]) def _prepare_outbox_repo(self, conf): outbox_repo_conf = env_postgres_config('PROC_BCH_OUTBOX') if conf: outbox_repo_conf.update(conf) self.outbox_repo = ApiOutboxRepo(outbox_repo_conf) def _prepare_message_updates_repo(self, conf): # This repo used to talk to the message updater microservice, # which just changes statuses in the message lake repo_conf = env_queue_config('MCHR_MESSAGE_UPDATES_REPO', use_default=False) if not repo_conf: repo_conf = env_queue_config('BCH_MESSAGE_UPDATES') if conf: repo_conf.update(conf) self.message_updates_repo = MessageUpdatesRepo(repo_conf) def _prepare_use_cases(self): self.uc = RouteToChannelUseCase(self.ROUTING_TABLE) def _prepare_channels(self): """ For each channel in the use-case we create channel object and put it into the route table; so underlying use-cases don't think about it at all and just use the object. """ for routing_rule in self.ROUTING_TABLE: routing_rule["ChannelInstance"] = HttpApiChannel(routing_rule.copy()) return def _update_message_status(self, msg, new_status, channel_id=None, channel_msg_id=None): # In the message lake # if channel_id == DiscreteGenericMemoryChannel.ID: # channel_response = json.loads(channel_response) # channel_txn_id = channel_response['link'].split('=')[1] # else: # return False patch_data = { gd.STATUS_KEY: new_status, } if channel_id and channel_msg_id: patch_data.update({ gd.CHANNEL_ID_KEY: channel_id, gd.CHANNEL_TXN_ID_KEY: channel_msg_id, }) return self.message_updates_repo.post_job( { 'message': msg.to_dict(), 'patch': patch_data }, delay_seconds=random.randint(2, 7) ) def __init__( self, outbox_repo_conf=None, channel_pending_message_repo_conf=None, message_updates_repo_conf=None, config=None ): # self._prepare_config(config) self._prepare_outbox_repo(outbox_repo_conf) # self._prepare_channel_pending_message_repo(channel_pending_message_repo_conf) self._prepare_message_updates_repo(message_updates_repo_conf) self._prepare_use_cases() self._prepare_channels() def __iter__(self): logger.info( "Starting the multichannel worker with channels %s", [ch["Name"] for ch in self.ROUTING_TABLE] ) return self def __next__(self): try: pg_msg = self.outbox_repo.get_next_pending_message() if not pg_msg: return None logger.info("Processing message %s (%s)", pg_msg, pg_msg.id) self.outbox_repo.patch(pg_msg.id, {'status': 'sending'}) # If not result message wasn't posted to channel # it looks like ok situation from the use case point of view # therefore we just silently return None # BUT we probably want to change status of the message in # outbox_repo # first we convert message from the # intergov.repos.api_outbox.postgres_objects.Message # to # intergov.domain.wire_protocolsgeneric_discrete.Message # (actual while we use postgres as a storage for outbox repo) assert isinstance(pg_msg, PostgresMessageRepr) gd_msg = gd.Message.from_dict( pg_msg.to_dict() ) try: result = self.uc.execute(gd_msg) except Exception as e: # sleep some seconds after fails logger.error( "[%s] Rejecting due to use-case exception %s", gd_msg.sender_ref, str(e) ) self.outbox_repo.patch(pg_msg.id, {'status': 'rejected'}) for i in range(random.randint(30, 100)): time.sleep(0.1) return False if result: # message has been sent somewhere recipient_channel_id, recipient_channel_message_id = result logger.info( "[%s] The message has been sent to channel %s", gd_msg.sender_ref, recipient_channel_id ) self._update_message_status( gd_msg, new_status="accepted", channel_id=recipient_channel_id, channel_msg_id=recipient_channel_message_id ) if not self.outbox_repo.patch(pg_msg.id, {'status': 'accepted'}): logger.warning("[%s] Failed to update msg in outbox", gd_msg.sender_ref) result = False else: result = True else: # no channel accepted the message or there was other error logger.warning("[%s] Message has NOT been sent", gd_msg.sender_ref) self._update_message_status(gd_msg, "rejected") self.outbox_repo.patch(pg_msg.id, {'status': 'rejected'}) result = False return result except Exception as e: logger.exception(e) return None return True
if __name__ == '__main__': # pragma: no cover for result in MultichannelWorker(): if result is None: for i in range(10): time.sleep(0.1)