The fan-out / fan-in pattern is a staple of more advanced API integrations - let's check out in practice how, with Zato, it can simplify asynchronous communication across applications that do not always exhibit the same kind of availability or performance characteristics.
What is fan-out / fan-in?
The core idea is this:
- A source system requests some kind of information from your Zato service
- The source system understands that the process of gathering this information may take time. This may be because the backend systems are slow to reply or there are many of them and the person using the source system will not actively wait for the reply, e.g. no one will want to wait 30 or 60 seconds in such a case.
- Zato accepts the initial request and returns to the initial system a correlation ID (CID), a random string that tells the system "I received your request, here, save this CID somewhere, and when I have the result, I will give it to you along with that CID to make it possible for you to understand a reply to which request it is"
- At this point the initial system knows that the response will arrive at some point in the future and it stores the CID in a datastore of choice, it can be in Redis, MongoDB, an SQL database or simply in RAM, this is completely up to the system
- The system does not block the connection it made to Zato anymore now - this is important - it does not wait for the response to be sent as part of the same TCP exchange of messages
- What exactly the system does instead of blocking depends on the protocol used to connect with Zato. If it is HTTP, it can possibly close the connection in the knowledge that Zato will invoke one of its callback endpoints once the results are ready. If it is AMQP, IBM or Zero MQ, then the system already listens for data in some reply queues.
The above concludes the initial phase and Zato can now carry out the rest of the work:
- Your Zato service uses self.patterns.fanout to invoke as many data sources as required to collect the information originally request
- A data source may mean a database of any sort, an external application or perhaps another service that in turn may invoke other applications, systems, services or fan-out / fan-in patterns as well
- The service initially using self.patterns.fanout tells the pattern what callback service to invoke once all the data sources returned their responses. The pattern is also given the CID so that the callback service knows what it was too.
Now, the data-collecting services and data sources do their job. What it means is purely up to a given integration process, perhaps some SQL databases are consulted, maybe Cassandra queries run and maybe a list of REST endpoints is invoked, any protocol or data format can be used.
- What Zato does in the meantime is keeping track of which of the data sources already replied and when they all have, Zato will invoke the callback service
- The callback service is given on input all the responses along with the original CID
- What the callback service will process the responses, e.g. by transforming them into a common format combining them into a single message
- Finally, the callback service invokes the callback endpoint that the initial system awaits on - for instance, it may publish the response to an AMQP queue or it may invoke a REST endpoint. Note that the technology used here can be independent of the one used initially - as an example, we can very well have the very first system use WebSockets for requests and IBM MQ for responses.
Thus, we can see why it is called fan-out / fan-in:
- First, the flow of information fans out to applications and systems containing the data needed
- Next, the flow fans in through Zato to the application that requsted the data
When to use it?
The pattern can be considered each time there is a rather long-running integration process that requires communication with more than one system.
This raises a question of what is long-running? The answer lies in whether there is a human being waiting for the results or not. Generally, people can wait 1-2 seconds for a response before growing impatient. Hence, if the process takes 30 seconds or more, they should not be made to wait as this can only lead to dissatisfaction.
Yet, even if there is no real person for the result, it is usually not a good idea to wait for results too long as that is a blocking process and blocking may mean consuming up resources unnecessarily, which in turn may lead to wasted CPU and, ultimately, energy.
This kind of processes is found when the data sources are uneven in the sense that some or all of them may use different technologies or because they may each return responses with different delays. For instance:
- Some data sources may include reporting databases that generate reports on demand - such an act may require many seconds or minutes to complete
- Some data sources may belong to different organisational units, e.g. an association of independent business organisations may request data from some of them and each may use its own system, database or endpoint, each with different a response time
Another trait of such processes is that the parallel paths are independent - if they are separate and each potentially takes a significant amount of time then we can just as well run them in parallel rather than serially, saving in this way time from the caller's perspective.
How does it look like in Python?
An example service using the pattern may look like below.
- The service receives a user_id on input and is to return a credit scoring based on that
- Three services, each representing a single backend system, are invoked in parallel
- Once all the responses are available, a callback service is invoked - this is the service that needs to deliver the combined response to the original caller
Note that this particular code happens to use AMQP in the callback service but this is just for illustration purposes and any other technology can be used just as well.
from zato.server.service import Service
class GetUserScoringCredit(Service):
def handle(self):
user_id = self.request.input.user_id
# A dictionary of services to invoke along with requests they receive
targets = {
'crm.get-user': {'user_id':user_id},
'erp.get-user-information': {'USER_ID':user_id},
'scoring.get-score': {'uid':user_id},
}
# Invoke both services and get the CID
cid = self.patterns.fanout.invoke(targets, callbacks)
# Let the caller know what the CID is
self.response.payload = {'cid': cid}
class GetUserScoringCreditAsyncCallbac(Service):
def handle(self):
# AMQP connection to use
out_name = 'AsyncResponses'
# AMQP exchange and routing key to use
exchange = '/exchange'
route_key = 'zato.responses'
# The actual data to send - we assume that we send it
# just as we received, without any transformations.
data = self.request.raw_request
self.outgoing.amqp.send(data, out_name, exchange, route_key)
More options
Be sure to check details of the pattern in the documentation to understand what other options and metadata are available for your services.
In particular, note that there is another type callback not shown in this article. The code above has only one, final callback when all of the data is already available. But what if you need to report progress each of the sub-tasks completed? This is when per-invocation callbacks come in handy, check the samples in the documentation for more information.
More patterns
Fan-out / fan-in is just one of the patterns that Zato ships with. The most prominent ones are listed below - keep it mind that the
-
Publish/subscribe - Zato comes with a full-featured multi-protocol publish/subscribe message broker in Python. If you need a platform for all your message queues and topics, here it is.
-
Parallel execution - Similar to fan-out / fan-in in that it can be used for communication with multiple systems in parallel, the key difference being that this pattern is used when a final callback is needed, e.g. when there is a need to update many independent systems but there is no need for a combined response
-
Invoke/retry - used when there is a need to invoke a resource and handle potential failures in a transparent way, e.g. a system may be down temporarily and with this pattern it is possible to repeat the invocation at a later time, according to a specific schedule
Learn more
Are you interested in learning more about enterprise API integrations in Python? You can start out by visiting the Zato main page, familiarising yourself with the extensive documentation or by jumping straight into the first part of the tutorial.
Be sure to visit our Twitter, GitHub and Gitter communities too!
from Planet Python
via read more
No comments:
Post a Comment