Models

There is a multi-layered design behind pypz with the following layers from top to bottom: pipeline, operator, plugin. As you can see, in the top layer there is the pipeline, which contains the operators with the actual business logic. It is possible to enhance the functionalities of the operators via plugins e.g., the port plugins by which the operators can transfer data between each other or the logger plugins that enables the operator to send logs to the specified targets. The plugins are on the lowest level.

Basic pipeline

Basic pipeline

Instances

Each entities in pypz (pipelines, operators, plugins) are represented as instances in runtime. The blueprints of the instances are called specs and are represented as classes in the code. In other words, you can consider the specs as the classes and the instances as the objects created from the classes. This design enables pypz to model the pipelines as code. The Instance class itself is the base for every other specs. It contains all the necessary logic and feature that is required by the mentioned design.

Inheritance diagram of pypz.core.specs.pipeline.Pipeline, pypz.core.specs.operator.Operator, pypz.core.specs.plugin.LoggerPlugin, pypz.core.specs.plugin.ExtendedPlugin, pypz.core.specs.plugin.ServicePlugin, pypz.core.specs.plugin.InputPortPlugin, pypz.core.specs.plugin.OutputPortPlugin

Inheritance diagram

The Instance class is designed in a way that it could be used on its own. However, the Pipeline, Operator and Plugin classes are specializing and somewhat restricting the base Instance class in a way that is necessary for pypz.

Attributes

Basically an instance has the following attributes:

  • name (required), which identifies the instance in runtime

  • parameters

  • dependencies to other instances

  • nested instances i.e., other instances attached to the current instance’s context e.g., an operator is a nested instance to a pipeline

  • context instance i.e., the parent instance e.g., the pipeline is the context of an operator

Note

The name of the instance can be provided either as constructor argument or if it has a context and the argument is omitted, then the name of instance will be the name of the variable.

Generic Type

The Instance class is a generic class, where the generic attribute describes the expected type of the nested instances. It is important, since at construction time all attributes will be scanned and if one of them has the type of the expected nested instance type, then it will be automatically identified as a nested instance.

Metaclass

Notice that a custom metaclass is used (pypz.core.specs.instance.InterceptedInstance) to be able to intercept the creation of the Instance object. This is necessary to allow to perform potential instance name deduction from variable name and some other checks.

Dependencies

It is possible to define dependencies between instances at runtime. However, you should be aware that the meaning of the dependencies are defined by the runtime context i.e., just by defining them has no effect. For example, the operator executor uses the dependency definitions of the plugins to build a call order list, which then will determine, what plugin runs when.

Parameters

You can define expected parameters for your Instance spec. You can expect both required and optional parameters to be set for your instance. Check the parameters section for a more detailed explanation.

Data Transfer Object (DTO)

If you want to transfer instances to outside of the current process, you need to convert it into a representation, which can then be serialized. This representation is an additional model called the DTO. By having an intermediate model, we can ensure that only those information are transmitted that is really necessary to be able to reconstruct an instance object.

To get the DTO of an Instance, you can use:

pypz.core.specs.instance.Instance.get_dto()

To construct an Instance object from a DTO, you can use:

pypz.core.specs.instance.Instance.create_from_dto()

Working with YAML

Currently pypz serializes the DTOs into YAML, since it is human readable and provides features out of the box, which JSON lacks the support of (e.g., interpreting sets).

To convert an instance into a YAML string, you can use:

yaml_string = str(instance)

In the background, the instance will be converted into a DTO, which then will be converted into a YAML string.

To construct an instance from a YAML string, you can use:

pypz.core.specs.instance.Instance.create_from_string()

You can find a usage example here

Instance Groups

Instances can be grouped together. For example, if you replicate an operator, then you are creating a group of operators. The original instance in this case is called “principal”.

The class pypz.core.specs.instance.InstanceGroup provides the methods to access useful group related information.

Note

Notice that this interface shall be implemented by the instance types that can form groups e.g. Operator and Plugin.

Pipeline

As its name suggests, the Pipeline class represents a pipeline in pypz. Its nested type is defined as the Operator class. Basically it does not alter or extend the functionality of the Instance class, since unlike the operators and plugins, the pipeline is a virtual organization of the Operators.

Operator

The Operator class represents the operator in pypz. Its nested type is defined as the Plugin class. The Operator is the most important model, since it contains the implementation of the business logic. In addition to the Instance class, the Operator class provides the following features:

Expected Parameters

A more detailed explanation about the expected parameters can be found in parameters section. This section presents only the expected parameters of the Operator instances.

operatorImageName (default: None)

The name of the Docker image, where you included the implemented Operator. This will mainly be used by the container related deployers.

replicationFactor (default: 0)

This value will determine, how many replicas shall be created from an operator.

Check operator replication <operator_replication> for more information.

Connections

You can connect operators by the so called port plugins (InputPortPlugin, OutputPortPlugin).

The connection can be defined on pipeline level, where you shall specify, which operator’s which port plugin shall be connected to which other operator’s which port plugin.

class WriterOperator(Operator):
    def __init__(self, name: str = None, *args, **kwargs):
        super().__init__(name, *args, **kwargs)
        self.output_port = OutputPortPlugin()

class ReaderOperator(Operator):
    def __init__(self, name: str = None, *args, **kwargs):
        super().__init__(name, *args, **kwargs)
        self.input_port = InputPortPlugin()

class CustomPipeline(Pipeline):
    def __init__(self, name: str, *args, **kwargs):
        super().__init__(name, *args, **kwargs)

        self.reader_operator = ReaderOperator()
        self.writer_operator = WriterOperator()

        self.reader_operator.input_port.connect(self.writer_operator.output_port)

Check Data Transfer for more details.

Replication

Let’s take the following pipeline as example:

Without replication

Without replication

The first operator reads the files from some share, the second operator performs some KPI extraction and the third operator aggregates the results and stores in some database.

If there is 1 million files and the KPI extraction takes 1 second for each file then it takes ~11.5 days to go through all the files. This is why pypz provides the feature of operator replication. This enables a selective scaling in the pipeline. For example, let’s replicate the KPI extractor operator 99x, which will result in total 100 operator (original + replicas):

With replication

With replication

The output of the FileReader operator will distribute the files evenly across all the processor operator, hence those will share the load, which will result appr. 100x faster execution. This results ~2.7 hours for the entire set instead of 11.5 days.

Based on your capacities, you can go even higher with the replication to reduce even more the processing time.

Replication at its core means that a pypz creates an Instance based on the following rules:

  • the replica instance has the same type (class) as the original

  • the original and the replica instances are forming a group, where the original is called principal

  • each replica has a permanent index attached to it, which is visible in the name of the instance e.g., if the original operator’s name is ‘inst’ then the first replica’s name is ‘inst_0’

  • to ensure consistency across instances all the attributes of the original operator is shared with the replicas like parameters, connections, etc.

Note

Note that the concept of replications differs from scaling in a way that with setting the replication factor, you are creating copies of the original. A replication factor of 0 does not delete the original, it just does not create additional copies.

Methods

The Operator class provides methods to implement your business logic. Each method is called in specific order based on the executor implementation.

Logging

Although the logging functionality is provided by the LoggerPlugin class, the Operator class provides an aggregated logging interface, which means that should there be multiple LoggerPlugin in an operator context, with a single method call all LoggerPlugins’ corresponding method will be called in the background:

self.get_logger().info("Text to log")

The aggregated logger is realized by the class pypz.core.specs.operator.Operator.Logger.

Check the Logging section for more information.

Plugins

Plugins are the lowest level entities in pypz. They allow to extend an enhance the functionality of an operator. As you can check on the Inheritance diagram, there are several plugin interfaces with different purposes: