Intergov Ledger¶
Purpose¶
The TrustBridge InterGov codebase is a Proof Of Concept (POC) implementation of the edi3 Inter Government Ledger (IGL) Specification.
The specific goal of this codebase is to create infrastructure for an independant IGL “Node”.
This node is as it would be operated by a Country. It provides the interfaces required by the regulated community, (i.e. B2G interfaces) and it interfaces with “Channels” which are are used for communicating with other jurisdictions (i.e. G2G interfaces). It provides a suite of microservices that reliably route and deliver messages between the two.
Prototype edi3 standards¶
See https://edi3.org/icl/ for details of the interfaces that are implemented here.
This software is organised using a microservice architecture. The file DEPLOYMENT.rst contains instructions for running these components together in a local workstation.
There are three basic types of deployable component:
Microservices, that provide REST web services and depend on statefull backing services.
Backing Services, which are responsible for shared state between API Microservices and Worker Processes.
Worker Processes, which perform asynchronous tasks.
A very high level description of the design looks like this; Each jurisdiction operates a suite of services, that provides the following types of integration surface:
Government to Government (G2G) “channels”, These may use distributed ledger technology, but the details are hidden behind a “Channel API”.
Regulated Community APIs. These are used by members of the regulated community to interact with their Government (B2G/G2B). These interactions are are either direct with the API or indirect, through some commuity systems and identity provider.
The Document API. This is accessed by the regulated community, but also (as policy allows) by the counterparty of associated messages on the G2G channels. This also implies the use of an identity provider.
Support the UN process¶
The business case and background of the edi3 work is published at https://uncefact.unece.org/display/uncefactpublic/Cross+border+Inter-ledger+exchange+for+Preferential+CoO+using+Blockchain
Open Source Reference Implementation¶
The purpose of the POC is to use a real working system to evaluate the edi3 specification design decisions. We believe this will lead to a superior design than just developing the specifications on a theoretical basis, before trying to apply them.
This implementation is tracking the specifications on the edi3 web site. As the specifications change, we intend to modify this software to keep up. This software will remain a POC status as long as the specifications are considered a working draft, however, the software microservice architecture should support future large scale deployment and long term maintainability.
The status of the software will be updated to BETA when it is considered appropriate for pilot implementation. In the meantime, contributions are welcome at https://github.com/trustbridge
Terminology¶
Glossary¶
Term |
Description |
---|---|
Node Users |
Business Layer Systems that communicate with one or more Nodes to send and receive messages to other Jurisdictions. |
Node API |
A set of APIs used to send and receive messages over channels. |
Node |
Service that provides the Node API, sends messages (after validating them) to a channel on behalf of a jurisdiction, and recieves then disburses messages sent to the jurisdiction over the channels. The Node is authorised (Node Accreditation) to send and recieve messages messages on behalf of the Jurisdiction. The node validates each messages to to ensure compliance with the channel policy (from the juridictional context). In other words, nodes are trusted by the accrediting body to apply local knowledge to the interpretation of channel policy in that jurisdiction. The Node is also a router. It decides which channel (Node Routing Policy) each valid message should be used to sent the message to the recipient Jurisdiction. Nodes also receive inbound messages, retrieves the associated document (object) and store these for Node Users, as well as notifying the appropriate Node when new messages have been received.
|
Channel |
Implementation of an agreement between jurisdictions to exchange particular types of messages.
|
Channel Policy |
Rules, expressed in a common business language, that describe the acceptable use of the channel. |
Channel Media |
Append-only database where channel messages are written. Presumably a distributed database.
|
Channel Endpoint |
Deployed system that can read and write to the channel media. |
Channel API |
Abstraction over channel media. Feature of the Channel Endpoint used by the IGL Node to send messages. |
Channel Keys |
Notional mechanism for restricting access (in particular write access) to legitimate channel endpoints. |
Channel Operator |
Party with access to the Channel Keys, who provides the Channel Endpoint. |
Channel Manager |
Party responsible for the channel. May have the ability to grant/revoke Channel Keys. |
Channel Authentication |
Mechanism to restrict access to the Channel API, So authorised Nodes can access the Channel Endpoint. Not to be confused with Node Authentication (restricting access to the Node to Node Users), Channel Keys (used to restrict access to Channel Media, to Channel Operators) or document issuer / verification mechanisms. |
Notes¶
Architecture¶
High level overview:
![digraph d {
node [shape=component style=filled fillcolor=grey];
subgraph cluster_app {
label="application";
ucp_doc [label="manage\ndocuments" shape=ellipse];
ucp_sub [label="manage\nsubscriptions" shape=ellipse];
ucp_msg [label="manage\nmessages" shape=ellipse];
uc_rx_callback [label="receive\ncallback" shape=ellipse];
}
doc_api [label="document\nAPI" shape=component];
ucp_doc -> doc_api;
msg_api [label="message\nAPI" shape=component];
ucp_msg -> msg_api;
repo_api_outbox [label="<<rdbms>>\nAPI\noutbox" fillcolor=green];
repo_api_inbox [label="<<sqs>>\nAPI\ninbox" fillcolor=green];
repo_bc_inbox [label="<<sqs>>\nblockchain\ninbox" fillcolor=green];
subgraph cluster_ws {
websub [label="subscription\nAPI (websub)" fillcolor=purple];
}
repo_message_lake [label="<<s3>>\nmessage\nlake" fillcolor=orange];
repo_object_lake [label="<<s3>>\nobject\nlake" fillcolor=orange];
repo_object_acl [label="<<s3>>\nobject\nACL" fillcolor=orange];
repo_object_ret_q [
label="<<sqs>>\nobject\nretreival"
fillcolor=green
];
ucp_sub -> websub;
repo_foreign_objects [label="foreign\nobject\nproxy"];
uc_get_objects -> repo_foreign_objects;
uc_authenticated_object_access [
label="authenticated\nobject access"
shape=ellipse
fillcolor=orange
];
doc_api -> uc_authenticated_object_access -> repo_object_acl;
uc_authenticated_object_access -> repo_object_lake;
uc_record_object [
label="record\nobject"
shape=ellipse
fillcolor=orange
];
doc_api -> uc_record_object -> repo_object_lake;
uc_submit_message [
label="post message\nto api inbox"
shape=ellipse
fillcolor=orange
];
uc_check_message [
label="get message\nby reference id"
shape=ellipse
fillcolor=orange
];
msg_api -> uc_check_message -> repo_message_lake;
msg_api -> uc_submit_message -> repo_api_inbox;
uc_api_inout [
label="enqueue\ninbound\nmessages"
shape=ellipse fillcolor=green
];
repo_api_inbox -> uc_api_inout [dir=back];
repo_api_outbox -> uc_api_inout [dir=back];
w_api_inout [label="<<docker>>\nmessage\nreception\nworker"];
uc_api_inout -> w_api_inout;
mcu [label="<<docker>>\nmulti-channel\nblockchain\nrouter"];
uc_bc_tx_submit [
label="submit\nblockchain\ntransactions"
shape=ellipse
fillcolor=orange
];
repo_rejected_messages [
label="<<sqs>>\nrejected\nmessages"
fillcolor=green
];
mcu -> uc_bc_tx_submit;
uc_bc_tx_submit -> repo_api_outbox;
uc_bc_tx_submit -> repo_bc_channel;
uc_bc_tx_submit -> repo_rejected_messages;
uc_update_status_rejected [
label="update status\nof rejected\nmessages"
shape=ellipse
fillcolor=orange
];
repo_rejected_messages -> uc_update_status_rejected [dir=back];
repo_message_lake -> uc_update_status_rejected [dir=back];
w_status_updater [label="<<docker>>\nrejected\nstatus\nupdater"]
uc_update_status_rejected -> w_status_updater [dir=back];
subgraph cluster_chan {
label="channel specific";
repo_bc_channel [
label="<<blockchain>>\nchannels"
fillcolor=orange
shape=folder
];
scbcep [
label="<<docker>>\nsingle-channel\nblockchain\nevent processor"
];
uc_bc_rx_events [
label="receive\ninbound\nblockchain\nevents"
shape=ellipse
fillcolor=orange
];
uc_bc_rx_events -> scbcep [dir=back];
}
repo_bc_channel -> uc_bc_rx_events [dir=back];
mp [label="<<docker>>\ninbound\nmessage\nprocessor"]
uc_bc_proc_events [
label="initiate\ninbound message\nprocessing tasks"
shape=ellipse
fillcolor=green
];
repo_bc_inbox -> uc_bc_proc_events [dir=back];
uc_bc_proc_events -> mp [dir=back];
websub -> uc_bc_proc_events [dir=back];
repo_object_acl -> uc_bc_proc_events [dir=back];
repo_message_lake -> uc_bc_proc_events [dir=back];
repo_object_ret_q -> uc_bc_proc_events [dir=back];
uc_rx_callback -> websub [dir=back];
uc_get_objects [
label="retrieve and store\nforeign documents"
shape=ellipse
fillcolor=green
];
repo_object_lake -> uc_get_objects [dir=back];
repo_object_ret_q -> uc_get_objects [dir=back];
uc_enqueue_received_message [
label="enqueue\nreceived\nmessage"
shape=ellipse
fillcolor=green
];
repo_bc_inbox -> uc_enqueue_received_message [dir=back];
//uc_enqueue_received_message -> uc_synth_hidden_messages [dir=back];
reception_api [label="message\nreception\nAPI"]
uc_enqueue_received_message -> reception_api [dir=back];
post_msg_from_bc [
label="POST\nmessage\n(from blockchain)"
shape=ellipse
];
post_msg_from_bc -> reception_api;
uc_bc_rx_events -> post_msg_from_bc;
spider [label="<<docker>>\ndocument\nspider"];
uc_get_objects -> spider [dir=back];
}](_images/graphviz-4ac4f8750976066826787c5b62075fb7eeca93fa.png)
Document API and components:
![digraph d {
node [shape=component style=filled fillcolor=grey];
subgraph cluster_app {
label="application";
ucp_doc [label="manage\ndocuments" shape=ellipse];
}
doc_api [label="document\nAPI" shape=component];
ucp_doc -> doc_api;
repo_object_lake [label="<<s3>>\nobject\nlake" fillcolor=orange];
repo_object_acl [label="<<s3>>\nobject\nACL" fillcolor=orange];
repo_object_ret_q [
label="<<sqs>>\nobject\nretreival"
fillcolor=green
];
repo_foreign_objects [label="foreign\nobject\nproxy"];
uc_get_objects -> repo_foreign_objects;
uc_authenticated_object_access [
label="authenticated\nobject access"
shape=ellipse
fillcolor=orange
];
doc_api -> uc_authenticated_object_access -> repo_object_acl;
uc_authenticated_object_access -> repo_object_lake;
uc_record_object [
label="record\nobject"
shape=ellipse
fillcolor=orange
];
doc_api -> uc_record_object -> repo_object_lake;
uc_bc_proc_events [
label="initiate\ninbound message\nprocessing tasks"
shape=ellipse
fillcolor=green
];
repo_object_acl -> uc_bc_proc_events [dir=back];
repo_object_ret_q -> uc_bc_proc_events [dir=back];
uc_get_objects [
label="retrieve and store\nforeign documents"
shape=ellipse
fillcolor=green
];
repo_object_lake -> uc_get_objects [dir=back];
repo_object_ret_q -> uc_get_objects [dir=back];
spider [label="<<docker>>\ndocument\nspider"];
uc_get_objects -> spider [dir=back];
}](_images/graphviz-eaf5120fee0b41c24a5ec516f740ae42a6d51b3c.png)
Message API and components:
![digraph d {
node [shape=component style=filled fillcolor=grey];
subgraph cluster_app {
label="application";
ucp_sub [label="manage\nsubscriptions" shape=ellipse];
ucp_msg [label="manage\nmessages" shape=ellipse];
uc_rx_callback [label="receive\ncallback" shape=ellipse];
}
msg_api [label="message\nAPI" shape=component];
ucp_msg -> msg_api;
repo_api_outbox [label="<<rdbms>>\nAPI\noutbox" fillcolor=green];
repo_api_inbox [label="<<sqs>>\nAPI\ninbox" fillcolor=green];
repo_bc_inbox [label="<<sqs>>\nblockchain\ninbox" fillcolor=green];
subgraph cluster_ws {
websub [label="subscription\nAPI (websub)" fillcolor=purple];
}
repo_message_lake [label="<<s3>>\nmessage\nlake" fillcolor=orange];
ucp_sub -> websub;
uc_submit_message [
label="post message\nto api inbox"
shape=ellipse
fillcolor=orange
];
uc_check_message [
label="get message\nby reference id"
shape=ellipse
fillcolor=orange
];
msg_api -> uc_check_message -> repo_message_lake;
msg_api -> uc_submit_message -> repo_api_inbox;
uc_api_inout [
label="enqueue\ninbound\nmessages"
shape=ellipse fillcolor=green
];
repo_api_inbox -> uc_api_inout [dir=back];
repo_api_outbox -> uc_api_inout [dir=back];
w_api_inout [label="<<docker>>\nmessage\nreception\nworker"];
uc_api_inout -> w_api_inout;
mcu [label="<<docker>>\nmulti-channel\nblockchain\nrouter"];
uc_bc_tx_submit [
label="submit\nblockchain\ntransactions"
shape=ellipse
fillcolor=orange
];
repo_rejected_messages [
label="<<sqs>>\nrejected\nmessages"
fillcolor=green
];
mcu -> uc_bc_tx_submit;
uc_bc_tx_submit -> repo_api_outbox;
uc_bc_tx_submit -> channel;
uc_bc_tx_submit -> repo_rejected_messages;
uc_update_status_rejected [
label="update status\nof rejected\nmessages"
shape=ellipse
fillcolor=orange
];
repo_rejected_messages -> uc_update_status_rejected [dir=back];
repo_message_lake -> uc_update_status_rejected [dir=back];
w_status_updater [label="<<docker>>\nrejected\nstatus\nupdater"]
uc_update_status_rejected -> w_status_updater [dir=back];
mp [label="<<docker>>\ninbound\nmessage\nprocessor"]
uc_bc_proc_events [
label="initiate\ninbound message\nprocessing tasks"
shape=ellipse
fillcolor=green
];
repo_bc_inbox -> uc_bc_proc_events [dir=back];
uc_bc_proc_events -> mp [dir=back];
websub -> uc_bc_proc_events [dir=back];
repo_message_lake -> uc_bc_proc_events [dir=back];
uc_rx_callback -> websub [dir=back];
uc_enqueue_received_message [
label="enqueue\nreceived\nmessage"
shape=ellipse
fillcolor=green
];
repo_bc_inbox -> uc_enqueue_received_message [dir=back];
reception_api [label="message\nreception\nAPI"]
uc_enqueue_received_message -> reception_api [dir=back];
channel [
label="channel"
shape=cloud
];
channel -> reception_api;
}](_images/graphviz-0839ca06a808cd3c2f83cdbdc7cad2d5f40cd8c6.png)
Channel API and components:
![digraph d {
node [shape=component style=filled fillcolor=grey];
local_node [label="<<Intergov>>\nNode"];
channel_endpoint [label="<<Flask API>>\nChannel Endpoint\nand\nSubscription Hub", fillcolor=green];
sending_db [label="<<DB>>\nSending DB", fillcolor=lightblue];
receiving_db [label="<<DB>>\nReceiving DB", fillcolor=lightblue];
channel_media [label="<<blockchain>>\nChannel Media", fillcolor=purple];
subscription_processor [label="<<lambda>>\nSubscription\nProcessor", fillcolor=green];
subscription_store [label="<<s3>>\nSubscription\nStore", fillcolor=lightblue];
callback_spreader [label="<<lambda>>\nCallback\nSpreader", fillcolor=green];
subscription_event_queue [label="<<sqs>>\nSubscription\nEvent Queue", fillcolor=lightblue];
# grouping for nicer display
subgraph node_channel_interface{
rank="same"
channel_endpoint
callback_spreader
}
subgraph channel_stores{
rank="same"
sending_db
receiving_db
subscription_store
}
# Efferent messages
# node posts to channel
local_node -> channel_endpoint;
channel_endpoint -> sending_db;
channel_endpoint -> channel_media;
# node subscribe to updates about that message
// local_node -> channel_endpoint;
channel_endpoint -> subscription_store;
# subscription processor delivers events
subscription_processor -> sending_db;
subscription_processor -> subscription_event_queue;
callback_spreader -> subscription_event_queue;
callback_spreader -> local_node;
# Afferent messages
# channel observes new messages
// channel_endpoint -> channel_media;
channel_endpoint -> receiving_db;
# node subscribes to new messages
// local_node -> channel_endpoint;
// channel_endpoint -> subscription_store;
# subscription processor delivers events
subscription_processor -> receiving_db;
// subscription_processor -> subscription_event_queue;
// callback_spreader -> subscription_event_queue;
// callback_spreader -> local_node;
}](_images/graphviz-21ccdb8baeecbe32e6541536efc904ef46b2144b.png)
With processes:
![digraph d {
node [shape=component style=filled fillcolor=grey];
local_node [label="<<Intergov>>\nNode"];
uc_node_post_message [label="post\nmessage" shape=ellipse];
uc_subscribe_to_message_by_id [label="subscribe to\nmessage by id" shape=ellipse];
uc_subscribe_to_messages_by_jurisdiction [label="subscribe to\nmessage by\njurisdiction" shape=ellipse];
uc_deliver_subscription_event [label="deliver\nsubscription\nevent" shape=ellipse];
channel_endpoint [label="<<Flask API>>\nChannel Endpoint\nand\nSubscription Hub", fillcolor=green];
uc_channel_post_message [label="CHAN post\nmessage" shape=ellipse];
sending_db [label="<<DB>>\nSending DB", fillcolor=lightblue];
uc_get_new_messages [label="get new\nmessages" shape=ellipse];
receiving_db [label="<<DB>>\nReceiving DB", fillcolor=lightblue];
uc_write_subscription [label="write\nsubscription" shape=ellipse];
channel_media [label="<<blockchain>>\nChannel Media", fillcolor=purple];
subscription_processor [label="<<lambda>>\nSubscription\nProcessor", fillcolor=green];
subscription_store [label="<<s3>>\nSubscription\nStore", fillcolor=lightblue];
uc_notify_subscribers [label="notify\nsubscribers" shape=ellipse];
callback_spreader [label="<<lambda>>\nCallback\nSpreader", fillcolor=green];
uc_get_subscription_event [label="get\nsubscription\nevents" shape=ellipse];
subscription_event_queue [label="<<sqs>>\nSubscription\nEvent Queue", fillcolor=lightblue];
# Grouping for nicer display
subgraph node_channel_interface{
rank="same"
channel_endpoint
callback_spreader
}
subgraph node_ucs{
rank="same"
uc_node_post_message
uc_subscribe_to_message_by_id
uc_subscribe_to_messages_by_jurisdiction
uc_deliver_subscription_event
}
subgraph channel_components{
rank="same"
channel_endpoint
subscription_processor
callback_spreader
}
subgraph channel_ucs{
rank="same"
uc_channel_post_message
uc_get_new_messages
uc_write_subscription
uc_get_subscription_event
}
subgraph channel_stores{
rank="same"
sending_db
receiving_db
subscription_store
subscription_event_queue
}
# Efferent messages
# node posts to channel
local_node -> uc_node_post_message;
uc_node_post_message -> channel_endpoint;
uc_channel_post_message -> channel_endpoint [dir=back];
uc_channel_post_message -> sending_db;
uc_channel_post_message -> channel_media;
# node subscribe to updates about that message
local_node -> uc_subscribe_to_message_by_id;
uc_subscribe_to_message_by_id -> channel_endpoint;#subscription_hub
uc_write_subscription -> channel_endpoint [dir=back];
uc_write_subscription -> subscription_store;
# Afferent messages
# channel observes new messages
channel_endpoint -> uc_get_new_messages;
uc_get_new_messages -> channel_media;
uc_get_new_messages -> receiving_db;
# node subscribes to new messages
local_node -> uc_subscribe_to_messages_by_jurisdiction;
uc_subscribe_to_messages_by_jurisdiction -> channel_endpoint;#subscription_hub
# Subscriptions
# subscription processor delivers events
subscription_processor -> uc_notify_subscribers;
uc_notify_subscribers -> receiving_db;
uc_notify_subscribers -> sending_db;
uc_notify_subscribers -> subscription_event_queue;
# callback spreader makes callbacks
callback_spreader -> uc_get_subscription_event;
uc_get_subscription_event -> subscription_event_queue;
callback_spreader -> uc_deliver_subscription_event;
uc_deliver_subscription_event -> local_node;
}](_images/graphviz-d1bae2cff89676634bc89c17137593e4a001280e.png)
Public Interfaces¶
These are the interfaces (REST APIs) that are used for B2G interactions.
They map to the description on https://edi3.org/specs/edi3-icl/master/#architecture-overview
Note there is also a generic (general purpose) events subscription API that is also public. It is documented in the chapter following this one, as part of the event subsystem.
General Message API¶
This component provides the main API used by members of the regulated community to send messages to the Government, which may or may not be forwarded to other Governments (depending on intergovernment channels, and the policy of the regulator, etc).
It is also used to check the status of messages (e.g. have they been delivered to foreign governments yet?), and to update the status of messages.
The implementation is /intergov/apis/message_api/
The specific business logic is in these classes:
GetMessageBySenderRefUseCase (in /intergov/use_cases/get_message_by_sender_ref.py)
PatchMessageMettadataUseCase (in /intergov/use_cases/patch_message_metadata.py)
EnqueueMessageUseCase (in /intergov/use_cases/enqueue_message.py), which is the same business logic as in the Message Receiving API.
Document API¶
This is how people save and access documents, which are the subject of G2G messages.
The implementation is /intergov/apis/document_api.py and the logic is in the AuthenticatedObjectAccessUseCase (in /intergov/use_cases/authenticated_object_access.py) and StoreObjectUseCase (in /intergov/use_cases/store_object.py)
Event Subsystem¶
The event subsystem provides a mechanism that allows 3rd parties to remain well informed about the state of the system without having to poll it.
It is entirely optional, the system should work in a “fire and forget” manner. This means that the B2G interactions do not require further action on behalf of the Business. However, because the system operates with eventual consistency and best effort semantics (i.e. not guaranteed delivery semantics) the event subststem may help applications orchestrate their distributed processes.
Subscriptions API¶
This is basically an implementation of WebSub https://en.wikipedia.org/wiki/WebSub. It allows Message API clients to discover (be notified of) message changes without polling.
The implementation is /intergov/apis/subscriptions_api
The business logic is in these classes:
SubscriptionDeregistrationUseCase (in /intergov/use_cases/subscription_deregister.py)
SubscriptionRegisterUseCase (in /intergov/use_cases/subscription_register.py)
Callbacks Spreader¶
This is part of the WebSub infrastructure that processes each event once.
-
class
intergov.processors.callbacks_spreader.
CallbacksSpreaderProcessor
(notifications_repo_conf=None, delivery_outbox_repo_conf=None, subscriptions_repo_conf=None)[source]¶ Convert each incoming message to set of messages containing (websub_url, message) so they may be sent and fail separately
-
class
intergov.use_cases.dispatch_message_to_subscribers.
DispatchMessageToSubscribersUseCase
(notifications_repo: libtrustbridge.websub.repos.NotificationsRepo, delivery_outbox_repo: libtrustbridge.websub.repos.DeliveryOutboxRepo, subscriptions_repo: libtrustbridge.websub.repos.SubscriptionsRepo)[source]¶ Used by the callbacks spreader worker.
This is the “fan-out” part of the WebSub, where each event dispatched to all the relevant subscribers. For each event (notification), it looks-up the relevant subscribers and dispatches a callback task so that they will be notified.
There is a downstream delivery processor that actually makes the callback, it is insulated from this process by the delivery outbox message queue.
Note: In this application the subscription signature is based on the message predicate.
Callback Deliver¶
This is the part of the WebSub infrastructure that processes each message once for every relevant subscriber. It deffers to an external message queue to implement best-effort delivery semantics.
-
class
intergov.use_cases.deliver_callback.
DeliverCallbackUseCase
(delivery_outbox_repo: libtrustbridge.websub.repos.DeliveryOutboxRepo)[source]¶ Is used by a callback deliverer worker
- Reads queue delivery_outbox_repo consisting of tasks in format:
(url, message)
Then such message should be either sent to this URL and the task is deleted or, in case of any error, not to be deleted and to be tried again (up to MAX_RETRIES times)
TODO: rate limits, no more than 100 messages to a single url per 10 seconds?
Outbound Message Flow¶
These components are involved in the flow of messages from one Government to another. Specifically, the sending side of the equation.
Multichannel Router¶
This is a core component. It is responsible for routing messages between Governments (using the appropriate channel).
-
class
intergov.processors.multichannel_router.
MultichannelWorker
(outbox_repo_conf=None, channel_pending_message_repo_conf=None, message_updates_repo_conf=None, config=None)[source]¶ Iterate over the RouteToChannelUseCase.
Note: channels abstract over topology, technology and wire protocols. This means that jurisdictions are free to determine bilaterally or multilaterally agreeable channels. This component will be configured to use the channels as per the operating jurisdictions agreements.
-
class
intergov.use_cases.route_to_channel.
RouteToChannelUseCase
(routing_table)[source]¶ This code makes a routing decision. “Which channel should I use to send this message”. It then pushes the message to that channel.
As it currently stands, the channel_config object (passed in at construction) is a kind of routing table. It is an ordered list of channels. The router works through the list until it finds the first channel that does not “screen” the message, and uses that channel to deliver the message.
The channel config is a prototype, with hardcoded logic. Post POC versions will need a version with a configuration system that is more friendly to administrators.
This process needs to be slightly more complicated than it might seem at first. Channels need to be potentially asynchronous. For example, with a blockchain channel, messages are “written” to the extent of the consensus. It’s technically possible for blockchains to fork, meaning that the concensus “changes it’s mind” about the shared view of history.
This means that, in addition to routing the message to the channel, the router must also dispatch a couple of jobs (asynchronous processing tasks):
“Channel Pending” jobs are used keep track of messages that may not yet have been sucessfully delivered by the channel. Depending on the outcome of channel processing, the appropriate steps for processing these messages may not yet be known.
“Delivery Status” journal is updated to keep track of the channel delivery status so stakeholder processes can remain appraised of important delivery/non-delivery events.
Channel Poller¶
Not to be confused with the Channel Observer: This worker checks on the status of messages that have been sent to a channel BY US (the other one discovers new messages that have been sent on a channel TO US).
-
class
intergov.processors.channel_poller.
ChannelPollerWorker
(channel_pending_message_repo_conf=None, message_updates_repo_conf=None)[source]¶
This worker deletes jobs from the “pending messages” queue when an change is detected (by polling the channel). If no change is detected, the job is not deleted from the pending messages queue. But neither is it returned to the queue - the worker holds a lock on the job until it goes stale. This way, the worker polls the queue sequentially at most once per task timeout period configured on the queue.
It’s a bit of a cheap trick but it seems to work quite well.
Rejected Message Processor¶
When the multi channel router tries to send a message to a channel, there are various reasons why the attempt might fail. Because the process is asynchronous, the sending component (multi channel router) does not wait to know the status, it just carries on sending.
That is why the channel poller component manages data in the “pending deliveries” database and posts update jobs to the “message updates” queue. Thus, message updates queue contains jobs to be done updating the delivery status of messages.
The task of this component (rejected message processor) is to process those jobs.
-
class
intergov.processors.rejected_status_updater.
RejectedStatusUpdater
(rejected_message_repo_conf=None, message_lake_repo_conf=None)[source]¶ Iterate over RejectPendingMessageUseCase
-
class
intergov.use_cases.reject_pending_message.
RejectPendingMessageUseCase
(rejected_message_repo, message_lake_repo)[source]¶ Gets a single message from rejected message repo If rejected message payload is valid - has sender, sender_ref fields updates message metadata by changing status to rejected via message lake repo update_metadata method
- Fails if:
unable to update message status
rejected message payload is invalid
Inbound Message Flow¶
These components are involved in the flow of messages from one Government to another. Specifically, the receiving side of the equation.
Channel Observer¶
There is one of these workers for each distinct channel.
They are like listening posts. They observe every message on the channel, filter out the ones that originate from this jurisdiction, and send the rest to the Private Message API for processing.
This way the rest of the system can know about new messages, and does not need to know about the configuration of channels.
Private Message API¶
When a message is sent by another jurisdiction to this one, the Channel Observer component associated with that channel uses this private API to process the mesaages.
This is where many channels funnel into the one system.
The implementation is /intergov/apis/message_rx_api
The specific business logic code is in the class EnqueueMessageUseCase in the file /intergov/use_cases/enqueue_messages.py.
Object Spider¶
This worker process is a “spider” in the same sense of the word as used to describe the things that search engines use to maintain their index of web pages.
Unlike a search index spider this does not fetch web pages. It fetches the objects (files, documents, etc) that are the subject of G2G messages.
This component assumes the other Government is providing an interface that conforms with the edi3 Document API spec.
-
class
intergov.processors.obj_spider.
ObjectSpider
[source]¶ Iterate over the RetrieveAndStoreForeignDocumentUseCase.
-
class
intergov.use_cases.retrieve_and_store_foreign_documents.
RetrieveAndStoreForeignDocumentsUseCase
(jurisdiction, object_retrieval_repo, object_lake_repo, object_acl_repo)[source]¶ Processes single request from the queue to download some remote document.
The process is recursive. If an object has sub-objects, add more jobs to download them later.
Note
returns None if the object has already been downloaded
returns True in case of success
raises exceptions for errors
Platform Services¶
These components are involved in message processing regardless of the origin or destination of the message.
Inbound Message Processor¶
This worker processes new messages regardless of if they came from a B2G route (i.e. Public Message API) or a G2G route (i.e. from a G2G Channel).
The code in /intergov/processors/message_processor/ instantiates and runs an InboundMessageProcessor.
-
class
intergov.processors.message_processor.
InboundMessageProcessor
(bc_inbox_repo_conf=None, message_lake_repo_conf=None, object_acl_repo_conf=None, object_retrieval_repo_conf=None, notifications_repo_conf=None, blockchain_outbox_repo_conf=None)[source]¶ Efficiently iterate over the ProcessMessageUseCase.
-
class
intergov.use_cases.
ProcessMessageUseCase
(jurisdiction, bc_inbox_repo, message_lake_repo, object_acl_repo, object_retreval_repo, notifications_repo, blockchain_outbox_repo)[source]¶ Used by the message processing background worker.
Gets one message from the channel inbox and does number of things with it.
dispatch document retrieval job (if the message is from a foreign source)
dispatch message sending task to channel-outbox (if the message is from a domestic source)
ensure the message is stored in the message lake
ensure the access control lists are updated for this message
dispatch any WebSub events required for this message
Note: the inbound message may have come from one of two sources: it may be a message from within this jurisdiction, or it may be a message sent from another jurisdiction. This use-case works with either message, however it needs to know which jurisdiction it is working as to get the logic right (that is why it takes a jurisdiction parameter when it is instantiated).
The message processing task touches quire a few backing services.
Message Updater¶
This worker updates the metadata of existing messages, regardless of the source of the change.
Rather than updating messages directly, other workers dispatch a “message update job” to a queue, and this worker then performs the deed in the message lake (using a patch call on the message API).
-
class
intergov.processors.message_updater.
MessageUpdater
(message_updates_repo_conf=None)[source]¶ Iterate over message update jobs:
get a job from the queue
after some job validation, update the message using the API
if sucessful, delete the job from the queue
if unsucessful, increment retry counter and reschedule attempt
Backing Services¶
The various backing services have to be implemented somehow. For the purpose of this POC implementation, we are using three services (PostgreSQL, ElasticMQ and Minio). These were chosen because they are convenient to run locally (see the docker compose configurations in the GitHub repository) They also have directly compatible cloud-hosted alternatives, which greatly simplifies the “cloud engineering” required to run the service in a a highly scalable and highly available way.
What matters about these backing services is that they are hidden behind abstractions. These are shown as boundaries in the API and worker process diagrams (e.g. object_lake.get_body, object_lake.post_file, object_acl.search, object_acl.allow_access).
These boundaries deal only in objects from the domain model, i.e. the code in intergov/domain. That domain code is independent of any implementation (backing service).
This intent of this technical detail is to simplify the implementation of backing services using whatever technology is most appropriate for the jurisdiction operating the IGL node.
Channel API¶
A node may use multiple channel implementations, therefore we have decoupled the node from the channel by defining this channel API. The reasons behind this are documented in the design note on multichannel architectures.
A channel consists of two or more endpoints and a shared medium in the middle - eg. blockchain.
An endpoint consists of two parts, either of which are optional, but the endpoint is only useful if at least one part is implemented.
The Posting API - A node uses this to send a message through a channel
The Receiving API - A node uses this to receive messages addressed to its jurisdiction from a channel
These two APIs handle the complexities of the channel medium and are authorised to send messages on the channel (on behalf of an entity). Therefore, access to these APIs must be controlled.
Channel Auth
A channel posting endpoint is posting messages AS the jurisdiction and therefore must ensure that only nodes that are permitted to send messages AS the jurisdiction are allowed to post.
It is the channel endpoint operator’s business to determine access requirements for the channel.
For example, if a node operator is operating private channel APIs for its own use, and not allowing any other nodes to use their channel APIs, then network level security may be sufficient. Similarly, a developer may use docker networking connections to restrict access without implementing any explicit access controls. However, if a channel operator wanted to support multiple nodes, then they would need to develop a satisfactory access control regime, sufficient for the requirements of that channel.
A channel may have many nodes using it, but tens not 1000s.
Note
The current reference implementation at TODO-testnet assumes that the node operator is also the channel endpoint operator, therefore manual devops style auth configuration is fine (eg. subnet only networking/whitelisting IP addresses/API Gateway SIG4 certs etc…).
Channel Posting API¶
POST /messages
GET /messages/<id>
GET /messages/<id>?fields=status
States:
Received: The message either hasn’t been written to the channel (perhaps the first attempt errored and will be attempted again) or has been written but awaiting confirmation.
Confirmed: The message has passed through the channel. Effectively the end state for most successful messages.
On a blockchain, this means that there are sufficient blocks on top.
On a DB this means that the message was commit to the table.
Undeliverable: The channel was unable to write the message and has stopped trying
Revoked: Confirmation was erroneously issued on a fork. We expect this to be extremely rare; it is a theoretical possibility.
A typical BlockchainChannel:
received message and writes to a DB, returning an ID
writes to the blockchain
waits (forever; stays in Received) and observes until:
multiple blocks are written on top of the chain (Confirmed)
OR observes that it was on a fork and the chain has moved from a previous block and the message was never written (Undeliverable)
It is the channel API’s business to decide if it fails as Undeliverable on the first attempt, or whether it tries a few times (config value) before being marked as Undeliverable.
Channel Receiving API¶
POST /subscriptions
- follows WEBSUB standard
GET /messages?updated_since=2020-01-12Z123456&receiver=AU
- includes new messages
States:
Observed: The message has been seen on the channel medium, but we haven’t confirmed that it is really there.
Confirmed: Means that the message is definitely on the channel medium. This is the point at which we publish the message.
On a blockchain, this means that there are sufficient blocks on top.
On a DB this means that the message was commit to the table. ie. the first time we observe the message it will also become confirmed.
False_Alarm: The message was seen on the channel medium but it has now disappeared. If the message had previously been Confirmed, the channel must publish an update about the message. If the message had only been Observed but not Confirmed we don’t need to take any further action beyond changing the status of the message.
On a blockchain, this means we observed the message on a fork. We expect this to be extremely rare; it is a theoretical possibility.
On a DB, this shouldn’t happen unless a message is deleted from the table.
A typical BlockchainChannel:
observes the blockchain and records new messages into a DB to keen track of what messages it has seen and what it hasn’t
tells the subscription engine that a new message has arrived once a certain number of blocks are on top
Deploying a channel¶
Process of setting up a channel:
spin up channel medium (optional)
spin up channel endpoint and configure with medium details, auth, …
spin up second channel endpoint, same way
spin up new channel medium
spin up new endpoint pointing at new medium
Example integration test node setup
Architecture Decision Notes¶
Multi-Channel Architecture¶
This page explains the advantages of a multi-channel architecture facilitating cross-border trade by enabling Government to Government (G2G) document exchange.
Hubs models are obvious, but possibly wrong¶
First, let’s start by considering the alternative to multi-channel architecture; a single channel (or “Hub”) model.
In this model, there is a single logical Hub that all messages pass through. This logical Hub could be a distributed ledger, traditional database, paper clearinghouse, so some other technology. The basic idea is that jurisdictions send their messages to this hub and receive their messages from it too.
Hub models require participants to adopt a common technology platform. This platform must meet the needs of all participants both in the moment and in the future.
Hub architectures have been built many times before. Some people find this sort of design intuitively appealing, perhaps because the idea of standardising on a single solution seems like it should minimise interoperability challenges.
But standardising on a single implementation solves interoperability the wrong way. Interoperability comes from standard interfaces, not from common implementation. If two systems have an effective way to interoperate, then there is no reason for them to have the same implementation. Individual participants should be free to implement their parts of the system in the way that makes the most sense to them.
What is a multi-channel architecture?¶
As an alternative to the Hub model, consider the following:
The above illustration shows a multi-channel scenario where:
Country A and Country E have a bilateral arrangement for exchanging messages on any topic
There is a multilateral arrangement between Jurisdictions B, E and F that supports messages on any topic
There is a multilateral arrangement between Jurisdictions A, B, C, F and G that supports messages on a specific topic
There is a multilateral arrangement between Jurisdictions B, C, D, F, G and others (…) that supports messages on any topic
There is an arrangement between Country D and others supporting messages on some specific topic.
On first impression, the above scenario might seem overcomplicated. However, the reality of international trade is vastly more complex than this diagram!
There three distinct reasons why a multi-channel architecture is necessary.
Support for Variable Topology¶
Agreements between Jurisdictions are inherently bespoke. Some are bilateral (links), others are multilateral (networks). The scope and details are customised and optimised through a process of negotiation. They changes over time, as existing arrangements are refined or adjusted and new arrangements are made.
Even if a hub model is theoretically better (no such theory is offered here), the idea of asking almost 200 jurisdictions to agree on a precise scope and details for sharing cross-border trade documents seems like it would be slow, difficult and unlikely to succeed.
There are examples of universal hubs, but they have narrow scope (for example, ePhyto Certification).
It seems more pragmatic to assume that cooperative sharing arrangements involving cross-border trade documentation will involve a similar process of negotiation to other international agreements.
While technical standardisation may reduce waste, free jurisdictions will always ultimately determine who they share what with, when and how; and those arrangements will change over time with policy and circumstance.
Any design that does not support variable topologies seems likely to result in a sub-optimal compromise.
Support for Variable Technology¶
Technical solutions for cross-border document exchange have existed for many centuries. Emerging technologies (such as distributed ledgers) have different characteristics which may confer some advantages, make new things possible or make previously difficult things more easy. No doubt technology will continue to evolve and as-yet unimagined solutions will emerge with even more favourable characteristics.
Sometimes, the best technology choice in a given situation would not be the best choice in a different situation. The asset lifecycle of existing systems, infrastructure, organisational capacities and technology strategies of different groups can create a prediliction (or an aversion) for specific technologies.
Even if it were possible to determine a universal “best technology” to implement cross-border trade document sharing, that would be a fleeting anomaly.
Any design that does not allow jurisdictions to negotiate technology choices (and mutually agree to update or upgrade technology) seems incongruent with the other negotiated details of international arrangements. An attempt to unilaterally impose a single, unchanging technology choice would not only require impractically challenging negotiation, it would also pit the fate of the system against the march of technological progress.
Support for Variable Protocols¶
The current proof of concept supports a wire protocol that we called “Discrete Generic Message” (DGM). Each communication packet between jurisdictions contains a single (“discrete”) message, and there is no limit to the taxonomy of message types that could be sent (generic).
This protocol was adequate and sufficient for the first stage of our Proof Of Concept. It may yet prove to be a useful protocol in a wide range of situations. However, there are also situations where a different protocol design may be more appropriate.
If there are very high message volumes, or a technology is used with a low bandwidth (or high cost per transmission), then a batched protocol design may be more appropriate. Rather than sending “discrete” messages (one at a time) a batch protocol could send a compressed collection of messages in in each packet. This would involve trade-offs, especially with all-or-nothing validation semantics (such as blockchain consensus), but there may be situations where a batch protocol is the most practical choice.
Some distributed ledger technologies support a feature called “Smart Contracts”. These are sometimes known by other names, such as “Transaction Families” or “Chain Code”, but what they all have in common is that they allow the channel to enforce mutually agreed policies in a trustworthy way. Smart contracts allow distributed ledger to operate like an “independant umpire”, which is potentially useful in a wide variety situations that require adversarial trust. However, this has the downside of tightly coupling policies to the message transport mechanism. This means the channel can only be used for the purposes that correspond exactly to the policies implemented in the smart contract.
Given the bespoke nature of international trade agreements, developing a channel that fits them all well could be very difficult or perhaps impossible. The strategy of allowing multiple channels might make the solution seem more complicated from some perspectives, but if jurisdictions can route messages over multiple channels then it should be possible for a jurisdiction to maintain integration with the collection of channels that best fit their needs.
Interoperability requires standard interfaces¶
The multi-channel architecture theory needs to be tested.
This Proof of Concept software includes a “channel router” component, with a mechanism for deciding which channel should be used for each message (i.e. an “outbound message routing” mechanism). It also includes a “channel observer” component, which is a mechanism for accepting messages from different messages and funneling them all into the same process regardless of how they are transmitted.
The code is designed in a way that assumes that a standardised “Channel API” exists, however an actual Channel API has not been developed yet.
This requires active research, which would benefit greatly from integrating one (or preferably more) existing G2G message channels.
If a standard Channel API is developed that can successfully be applied to existing G2G message channels, then it should be possible to provide an abstraction over the existing channels such that:
Business to Government (B2G) transactions operate against standard APIs, which hide the details of which actual channel is used.
Governments should be able to modify their channel implementations in way that insulates their regulated community from the change. In other words, without impacting their users.
Makes it possible to integrate additional, new channels without modifying the standard Channel API design.