import json
from io import BytesIO
from urllib.parse import urljoin
import requests
from intergov.apis.common.interapi_auth import AuthMixin
from intergov.conf import env_json
from intergov.domain.jurisdiction import Jurisdiction
from intergov.loggers import logging
from intergov.monitoring import statsd_timer
logger = logging.getLogger(__name__)
[docs]class RetrieveAndStoreForeignDocumentsUseCase(AuthMixin):
"""
Processes single request from the queue to download
some remote document.
The process is recursive.
If an object has sub-objects,
add more jobs to download them later.
.. admonition:: Note
* returns None if the object has already been downloaded
* returns True in case of success
* raises exceptions for errors
"""
def __init__(
self,
jurisdiction,
object_retrieval_repo,
object_lake_repo,
object_acl_repo):
self.jurisdiction = jurisdiction
self.object_retrieval = object_retrieval_repo
self.object_lake = object_lake_repo
self.object_acl_repo = object_acl_repo
def execute(self):
retrieval_task = self.object_retrieval.get_job()
if not retrieval_task:
return False
(job_id, job) = retrieval_task
return self.process(job_id, job)
@statsd_timer("usecase.RetrieveAndStoreForeignDocumentsUseCase.process")
def process(self, job_id, job):
logger.info(
"[%s] Running the RetrieveAndStoreForeignDocumentsUseCase for job %s",
self.jurisdiction,
job
)
multihash = job['object']
sender = Jurisdiction(job['sender'])
# 1. check if this object is not in the object lake yet
# 2. if not - download it to the object lake
if not self._is_in_object_lake(multihash):
self._download_remote_obj(sender, multihash)
# 3. Give receiver access to the object
self.object_acl_repo.allow_access_to(
multihash,
self.jurisdiction.name
)
# 4. Delete the job as completed
# 4.1. Schedule downloads of sub-documents
self.object_retrieval.delete(job_id)
return True
def _is_in_object_lake(self, multihash):
try:
# TODO: replace by just exist check instead of reading the whole file
# maybe create and use an '.exists(multihash)' method on object_lake
self.object_lake.get_body(multihash)
except Exception as e:
if e.__class__.__name__ == 'NoSuchKey':
return False
else:
raise e
return True
def _get_docapi_auth_headers(self, source_jurisdiction, doc_api_url):
"""
We support 2 auth methods:
* dumb - no auth in fact, there are static dict of some demo data is passed around
* AWS OIDC/Cognito - when we have some client creds in the env variables
# and are able to retrieve short-living JWT using them, and then use
# the API using that JWT.
TODO: it's probably worth configuring it the same way like we do with channels
"""
source_jurisdiction = str(source_jurisdiction)
if doc_api_url.startswith("http://"):
# local/demo setup
logger.info("For document API request to %s the dumb auth is used", doc_api_url)
return {
'Authorization': 'JWTBODY {}'.format(
json.dumps({
"sub": "documents-api",
"party": "spider",
"jurisdiction": self.jurisdiction.name,
})
)
}
try:
# first we try to determine the oauth credentials for that
COGNITO_OAUTH_CREDENTIALS = {
"client_id": env_json("IGL_JURISDICTION_OAUTH_CLIENT_ID")[source_jurisdiction],
"client_secret": env_json("IGL_JURISDICTION_OAUTH_CLIENT_SECRET")[source_jurisdiction],
"scopes": env_json("IGL_JURISDICTION_OAUTH_SCOPES")[source_jurisdiction],
"wellknown_url": env_json("IGL_JURISDICTION_OAUTH_WELLKNOWN_URL")[source_jurisdiction],
}
except (KeyError, TypeError) as e:
# It seems that we don't have it configured - so use the demo auth
logger.info("We don't have the only supported real auth method configured (%s)", str(e))
COGNITO_OAUTH_CREDENTIALS = {}
if COGNITO_OAUTH_CREDENTIALS:
logger.info("Will try to retrieve JWT for %s", doc_api_url)
# good, may be we try to request some JWT?
return self._get_auth_headers(
auth_method="Cognito/JWT",
# auth well known urls and other configuration
auth_parameters=COGNITO_OAUTH_CREDENTIALS
)
return {}
def _download_remote_obj(self, sender, multihash):
logger.info("Downloading %s from %s as %s", multihash, sender, self.jurisdiction)
remote_doc_api_url = sender.object_api_base_url()
url = urljoin(remote_doc_api_url, multihash)
doc_resp = requests.get(
url,
{
"as_jurisdiction": self.jurisdiction.name,
},
# TODO: cognito JWT and other auth methods
headers=self._get_docapi_auth_headers(sender, remote_doc_api_url)
)
logger.info("GET %s: status %s", url, doc_resp.status_code)
# TODO: we should process various response codes differently:
# e.g. if 5xx, rescuedule for later
# if 429, rescuedule for later with increasing wait times
# different 4xx, different strategies
# (put thought into logging/monitoring)
assert doc_resp.status_code in (200, 201), "{} {}".format(doc_resp, doc_resp.content)
# logger.info("For URL %s we got resp %s", remote_doc_api_url, doc_resp)
self.object_lake.post_from_file_obj(
multihash,
BytesIO(doc_resp.content)
)
# try to parse the downloaded documents for `links` section
try:
json_document = json.loads(doc_resp.content)
except Exception:
# not a json, which is fine
pass
logger.info("Downloaded object %s is not JSON file", multihash)
else:
# TODO: security: document spider will blindly download any multihash
# (including links in links in links).
# not sure what negative impact is has.
links = json_document.get('links')
if isinstance(links, list):
for link in links:
# {"TYPE1": "document", "TYPE2": "Exporters Information Form Update",
# "name": "hmmm_6W4jRRG.png",
# "ct": "binary/octet-stream",
# "link": "QmZxJAJhq98T683RQSk3T2wkLBH2nFV4y43iCHRk3DZyWn"}
if 'link' in link:
link_qmhash = link['link']
assert '/' not in link_qmhash
assert ':' not in link_qmhash
logger.info("Posting sub-job to retrieve %s", link_qmhash)
self.object_retrieval.post_job(
{
'action': 'download-object',
'sender': sender.name,
'object': link_qmhash,
'parent': multihash
}
)