Source code for intergov.processors.channel_poller

import time
import requests
from http import HTTPStatus

from intergov.conf import env_queue_config
from intergov.repos.channel_pending_message import ChannelPendingMessageRepo
from intergov.repos.message_updates import MessageUpdatesRepo
from intergov.channels.discrete_generic_memory import DiscreteGenericMemoryChannel
from intergov.domain.wire_protocols import generic_discrete as gd
from intergov.loggers import logging


logger = logging.getLogger('channel_poller_worker')


[docs]class ChannelPollerWorker: def _prepare_channel_pending_message_repo(self, conf): channel_pending_message_repo_conf = env_queue_config('PROC_BCH_CHANNEL_PENDING_MESSAGE') if conf: channel_pending_message_repo_conf.update(conf) self.channel_pending_message_repo = ChannelPendingMessageRepo(channel_pending_message_repo_conf) def _prepare_message_updates_repo(self, conf): message_updates_repo_conf = env_queue_config('BCH_MESSAGE_UPDATES') if conf: message_updates_repo_conf.update(conf) self.message_updates_repo = MessageUpdatesRepo(message_updates_repo_conf) def _poll_message(self, queue_job): result = None queue_job_id, payload = queue_job channel_id = payload['channel_id'] logger.info(f'Received pending message job for channel:{channel_id}') if channel_id == DiscreteGenericMemoryChannel.ID: result = self._poll_memory_channel(payload) if result: logger.info('Job executed successfully. Deleting from queue.') return self.channel_pending_message_repo.delete(queue_job_id) return False def _poll_memory_channel(self, payload): resp = requests.get(payload['channel_response']['link']) if resp.status_code != HTTPStatus.OK: raise RuntimeError("Can't get batch status") data = resp.json() # getting last data element # assuming it's a latest update status = data['data'][-1]['status'] sender_ref = payload['message']['sender_ref'] sender = payload['message']['sender'] logger.info(f'Memory channel batch status:{status}') if status == 'COMMITTED': return self._patch_message_status(sender, sender_ref, 'accepted') elif status == 'INVALID': return self._patch_message_status(sender, sender_ref, 'rejected') else: return False def _patch_message_status(self, sender, sender_ref, status): job = { 'message': { gd.SENDER_KEY: sender, gd.SENDER_REF_KEY: sender_ref }, 'patch': { gd.STATUS_KEY: status } } logger.info(f'Sending job to channel message updater:{job}') return self.message_updates_repo.post_job(job, delay_seconds=10) def __init__( self, channel_pending_message_repo_conf=None, message_updates_repo_conf=None ): self._prepare_channel_pending_message_repo(channel_pending_message_repo_conf) self._prepare_message_updates_repo(message_updates_repo_conf) def __iter__(self): logger.info('Starting channel poller worker') return self def __next__(self): try: queue_job = self.channel_pending_message_repo.get_job() if not queue_job: return None return self._poll_message(queue_job) except Exception as e: # pragma: no cover logger.exception(e) return None
if __name__ == '__main__': # pragma: no cover for result in ChannelPollerWorker(): if result is None: time.sleep(1)