Source code for intergov.processors.message_updater

import time
import requests
from http import HTTPStatus
from intergov.apis.common.interapi_auth import AuthMixin
from intergov.conf import env_queue_config
from intergov.domain.wire_protocols import generic_discrete as gd
from intergov.repos.message_updates import MessageUpdatesRepo
from intergov.loggers import logging
from intergov.processors.common.env import (
    MESSAGE_PATCH_API_ENDPOINT,
    MESSAGE_PATCH_API_ENDPOINT_AUTH,
)
from intergov.processors.common.utils import get_message_patch_api_endpoint_auth_params

logger = logging.getLogger('message_updater')


[docs]class MessageUpdater(AuthMixin, object): """ Iterate over message update jobs: * get a job from the queue * after some job validation, update the message using the API * if sucessful, delete the job from the queue * if unsucessful, increment retry counter and reschedule attempt """ # TODO: FIXME: push business logic into a testable use_case object # maybe also put the "update job" into a request model # TODO: tar-pit algorithm on retrys? # (prevent thundering herd after outage) def _prepare_message_updates_repo(self, conf): message_updates_repo_conf = env_queue_config('BCH_MESSAGE_UPDATES') if conf: message_updates_repo_conf.update(conf) self.message_updates_repo = MessageUpdatesRepo(message_updates_repo_conf) def _get_headers_for_message_api(self): result = { "Content-Type": "application/json", "Accept": "application/json", } if MESSAGE_PATCH_API_ENDPOINT_AUTH == "none": auth_parameters = None elif MESSAGE_PATCH_API_ENDPOINT_AUTH == "Cognito/JWT": auth_parameters = get_message_patch_api_endpoint_auth_params() else: raise Exception(f"Unsupported endpoint auth {MESSAGE_PATCH_API_ENDPOINT_AUTH}") result.update(self._get_auth_headers( auth_method=MESSAGE_PATCH_API_ENDPOINT_AUTH, # auth well known urls and other configuration auth_parameters=auth_parameters )) return result def _patch_message(self, job): msg = job['message'] patch_payload = job['patch'] retry = job.get('retry', 0) retry_max = job.get('retry_max', 5) sender = msg[gd.SENDER_KEY] sender_ref = msg[gd.SENDER_REF_KEY] logger.info( 'Patching message[sender:%s, sender_ref:%s, patch:%s]', sender, sender_ref, patch_payload ) resp = requests.patch( MESSAGE_PATCH_API_ENDPOINT.format( sender=sender, sender_ref=sender_ref ), json=patch_payload, headers=self._get_headers_for_message_api(), ) if resp.status_code == HTTPStatus.NOT_FOUND: if retry + 1 > retry_max: # this should probably be at least WARN level logger.warning('[%s] Message not found. Max retries reached.', sender_ref) return True logger.warning('[%s] Message not found. Schedule retry', sender_ref) job['retry'] = retry + 1 self.message_updates_repo.post_job(job, delay_seconds=30) return True if resp.status_code == HTTPStatus.CONFLICT: logger.warning('[%s] Patch causing conflict with the current message state.', sender_ref) return True if resp.status_code != HTTPStatus.OK: retry_number = retry + 1 if retry_number <= retry_max: logger.error( "[%s] Can't patch the message: %s; sheduling retry %s of %s", sender_ref, resp.text, retry_number, retry_max, ) job['retry'] = retry_number self.message_updates_repo.post_job(job, delay_seconds=30) return True else: logger.error( "[%s] Can't patch the message: %s; dropping - max retries reached", sender_ref, resp.text ) return True logger.info('[%s] Message patched successfully.', sender_ref) return True def __init__( self, message_updates_repo_conf=None ): self._prepare_message_updates_repo(message_updates_repo_conf) def __iter__(self): logger.info('Starting channel message updater') return self def __next__(self): try: result = self.message_updates_repo.get_job() if not result: return None job_queue_id, job_payload = result if self._patch_message(job_payload): logger.info('Deleting the job.') return self.message_updates_repo.delete(job_queue_id) except Exception as e: logger.exception(e) return None
if __name__ == '__main__': for result in MessageUpdater(): if result is None: time.sleep(1)