Source code for intergov.use_cases.process_message
import os
from intergov.domain.wire_protocols.generic_discrete import Message
from intergov.loggers import logging # NOQA
from intergov.monitoring import statsd_timer
logger = logging.getLogger(__name__)
ENV_SEND_LOOPBACK_MESSAGES = str(
os.environ.get('SEND_LOOPBACK_MESSAGES', None)
).lower() == "true"
[docs]class ProcessMessageUseCase:
"""
Used by the message processing background worker.
Gets one message from the channel inbox
and does number of things with it.
* dispatch document retrieval job
(if the message is from a foreign source)
* dispatch message sending task to channel-outbox
(if the message is from a domestic source)
* ensure the message is stored in the message lake
* ensure the access control lists are updated
for this message
* dispatch any WebSub events required for this message
Note: the inbound message may have come from
one of two sources: it may be a message from within
this jurisdiction, or it may be a message sent from another jurisdiction.
This use-case works with either message,
however it needs to know which jurisdiction it is working as
to get the logic right
(that is why it takes a jurisdiction parameter
when it is instantiated).
"""
def __init__(
self,
jurisdiction,
bc_inbox_repo,
message_lake_repo,
object_acl_repo,
object_retreval_repo,
notifications_repo,
blockchain_outbox_repo):
self.jurisdiction = jurisdiction
self.bc_inbox_repo = bc_inbox_repo
self.message_lake_repo = message_lake_repo
self.object_acl_repo = object_acl_repo
self.object_retreval_repo = object_retreval_repo
self.notifications_repo = notifications_repo
self.blockchain_outbox_repo = blockchain_outbox_repo
def execute(self):
# Get the message from the bc_inbox_repo (which is a events queue)
fetched_bc_inbox = self.bc_inbox_repo.get()
if not fetched_bc_inbox:
return None
(queue_message_id, message) = fetched_bc_inbox
return self.process(queue_message_id, message)
@statsd_timer("usecase.ProcessMessageUseCase.process")
def process(self, queue_message_id, message):
# let it be procssed
logger.info("Received message to process: %s", message)
# TODO: if something is fine and something is failed then first
# steps will be done again
# fine for object storage but not for queues
try:
ml_OK = self.message_lake_repo.post(message)
except Exception as e:
logger.exception(e)
ml_OK = False
try:
acl_OK = self.object_acl_repo.post(message)
except Exception as e:
logger.exception(e)
acl_OK = False
try:
# we delay it a little to make sure the message has got to the repo
# and remove status because notifications don't need it
message_without_status = Message.from_dict(
message.to_dict(exclude=['status'])
)
# fat ping for ones who understand
pub_OK = self.notifications_repo.post(
message_without_status,
delay_seconds=3
)
# light ping for ones who want everything
self.notifications_repo.post_job(
{
"predicate": f'message.{message.sender_ref}.received',
"sender_ref": f"{message.sender}:{message.sender_ref}"
}
)
except Exception as e:
logger.exception(e)
pub_OK = False
# blockchain part - pass the message to the blockchain worker
# so it can be shared to the foreign parties
outbox_OK = True
ret_OK = True
if str(message.sender) == str(self.jurisdiction) and message.status == 'pending':
# our jurisdiction -> remote
logger.info("Sending message to the channels: %s", message.subject)
try:
outbox_OK = self.blockchain_outbox_repo.post(message)
except Exception as e:
logger.exception(e)
outbox_OK = False
elif str(message.sender) != str(self.jurisdiction) and message.status == 'received':
# Incoming message from remote juridsiction
# might need to download remote documents using the
# Documents Spider
logger.info(
"Received message from remote jurisdiction %s with subject %s",
message.sender,
message.subject
)
logger.info(
"Scheduling download remote documents for: %s", message.subject
)
try:
ret_OK = self.object_retreval_repo.post_job({
"action": "download-object",
"sender": message.sender,
"object": message.obj
})
except Exception as e:
logger.exception(e)
ret_OK = False
else:
# strange situation
logger.warning(
"Message sender is %s and we are %s and the status is %s - strange",
message.sender,
self.jurisdiction,
message.status
)
return False
if ml_OK and acl_OK and ret_OK and pub_OK and outbox_OK:
self.bc_inbox_repo.delete(queue_message_id)
return True
else:
logger.error("Task processing failed, will try again later")
# what TODO with the failed ones?
# the problem is the fact that we have submitted message
# to some repos and some other failed,
# which means we must retry just failed submissions
# and it may introduce a tricky state when some external message
# processors will get info from the one source and won't get it
# from the another. They should wait then.
return False