Simple Python Pipes and Filters

By | December 8, 2019

Inspired by the Enterprise Integration pattern of ‘pipes and filters‘ I wanted to make a simple python pipes and filters example. With some digging I found this GitHub repository, which does an excellent job breaking down the message-based approach to Object Oriented Programming described in a series of blog posts: 1, 2, 3.

To explore the idea of ‘pipes and filters’ I wanted to work through an even more simple python pipes and filters implementation. The implementation described here is probably over-simplified and doubtless has issues and limitations. I hope though it can be a useful tool for understanding, and perhaps even usable in some simple projects.

Simple Python Pipes and Filters

In overview, we’ll set up a ‘Pipeline’ class which holds a list of filters, and a method for executing the pipeline against a message provided.

Simple python pipes and filters - the filters mutate the dictionary message

Executing the pipeline applies each of the filters to the message.

class Pipeline:
    def __init__(self):
        self.filters = list()

    def add(self, filter):
        self.filters.extend(filter)

    def execute(self, message):
        print("Executing pipeline...")
        for message_filter in self.filters:
            message_filter(message)

def double_data(message):
    message['data'] = [x*2 for x in message['data']]

def update_header(message):
    message['header'] = 'this is the updated message'

pipeline = Pipeline()

pipeline.add([double_data,
              update_header]) # note we are adding the function object, not making a function call

message = {'header': 'this is the original message',
           'data': [1,2,3]}

for key, value in message.items():
    print(key + ' : ' + str(value))

Which prints the following output:

header : this is the original message
data : [1, 2, 3]
Executing pipeline...
header : this is the updated message
data : [2, 4, 6]

The Messages

The message itself is represented by a dictionary. Dictionaries are mutable and each of the filters is actually modifying the original message dictionary. In a sense this violates one of the common principles of working with messages: that they should be immutable.

We could think of the messages in this case as ‘document messages‘, which are permitted to be mutable. It also conceptually seems a bit odd that the message is not passed through the pipeline, so much as stays in the same place while a series of filters act on it.

In an attempt to reduce the number of concepts needed to understand this example, I have decided to stick with dictionaries as the message, and not attempt to address this in the filter/pipeline. See the section ‘taking things further’ for more discussion about some possible ways of handling immutability as well as some other topics.

The Filters

In this example filters are defined by functions. Each function takes a message as an argument and mutates the data in the message. Since the messages are mutable dictionaries there is no need to return the message itself in this implementation.

For more sophisticated filters, you may want to define the filters as classes rather than functions. See the ‘Taking Things Further’ for more about this.

The Pipeline

The pipeline itself is defined as an object with methods:

  • add – adds a filter to the pipeline. In our implementation this is a function object, where each function expects to be passed in a message dictionary as its single argument. The method has been implemented so a list of function objects can easily be added, rather than having to ‘add’ a filter one at a time.
  • execute – executes the pipeline against a target message. This is achieved by passing the input message as an argument to each of the functions provided in the ‘add’ method.

As mentioned elsewhere in this post, the use of a dictionary as the input message means that the effect of running the pipeline is to modify the message in place. The modified message can be read or used further once the pipeline has run.

Taking Things Further

This simple python pipes and filters example has deliberately taken a bare-bones approach, and there are several areas that could be refined or extended. Below are some examples:

  • Immutability – this simple python pipes and filters relies on the mutability of dictionaries to allow the pipeline to do something. An alternative would be to enforce some degree of mutability in the message. This could be achieved by using Data Classes or Named Tuples (which are both immutable types), or by changing how the filters are expected to work: modifying and returning a ‘deep copy‘ of the input dictionary.
  • Define filters as classes rather than simple functions. Defining the filters as classes means that they can be more easily instantiated with other data or configuration information. For example you could have a filter which communicates with a database and needs connection details; or a filter which does processing by comparison against some reference data.
  • Adding more pipes – this example only has a single pipe, so it would be interesting to extend the functionality to chain pipes together. A pipe (a collection of filters) can itself be treated as a filter in a separate pipe, which helps make the pipes and filters pattern powerful and flexible.
  • Separating into multiple files. In this example the filters have been defined in the same file as the pipeline and the actual usage of the pipeline. An alternative would be to separate out the filters into a separate ‘filters library’, and the pipeline into its own pipeline definition file. This could be done whether the filters are functions or separate classes.
  • Using generators – it is possible to implement pipes and filters using generators, which you may wish to explore. There is an example of this here.