pypz.abstracts.channel_ports module

class pypz.abstracts.channel_ports.ChannelInputPort(name: str = None, *args, **kwargs)

Bases: InputPortPlugin, ResourceHandlerPlugin, ExtendedPlugin, ABC

This class represents an abstract InputPortPlugin, which utilizes the channels to realize transfer functionalities. It means that once there is a proper channel implementation, a fully functional InputPortPlugin can be made by providing that implementation in the abstract new_input_channel() method.

Parameters:
  • name – name of the instance, if not provided, it will be attempted to deduce from the variable’s name

  • schema – the schema of the port plugin, which will be used to send/retrieve data

  • group_mode – if set to True, the all the input ports in the group shall receive all messages

  • channel_reader_type – the type of the channel reader to be used

_abc_impl = <_abc._abc_data object>
_channel_config

Convenience class to represent an optional parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_channel_location

Convenience class to represent a required parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_channel_reader: ChannelReader | None

The only channel reader maintained by this plugin.

_delete_resources: bool

This flag signalizes, if resource deletion can be performed. It will be cleared, if there is an error on OperationRunning state to allow process restart.

_expected_output_count: int | None

During _on_port_open the expected number of connected outputs will be calculated. It then will be used in cases, where we need to wait for all outputs.

_interrupted: bool

Helper flag to register, if the execution has been interrupted.

_need_to_check_connections_opened: bool

Helper flag to signalize, if the channel shall be opened. This is necessary to maintain state in _on_port_open().

_on_error(source: Any, exception: Exception) None

This method can be implemented to react to error events during execution. The error itself may come from arbitrary sources.

_on_interrupt(system_signal: int = None) None

This method can be implemented to react to interrupt signals like SIGINT, SIGTERM etc. The specs implementation can then execute interrupt logic e.g., early termination of loops.

Parameters:

system_signal – id of the system signal that causes interrupt

_on_port_close() bool

This method shall implement the logic to shut down the i/o port functionalities.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_on_port_open() bool

This method shall implement the logic to initialize the i/o port functionalities.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_on_resource_creation() bool

This method shall implement the logic to create an arbitrary resource of any type.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_on_resource_deletion() bool

This method shall implement the logic to destroy the created resource.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_port_open_timeout_ms

Convenience class to represent an optional parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_post_execution() None

This method is called after the executor and its state machine exited, but before the program exits. It can be used to perform finalization/shutdown logic that is outside the execution context.

_pre_execution() None

This method will be called before the executor state machine starts. It can be used to perform initialization that is required before the execution.

_sequential_mode_enabled

Convenience class to represent an optional parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_sync_connections_open

Convenience class to represent an optional parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

can_retrieve() bool

This method shall implement the logic to signalize, whether the InputPort is still able to retrieve. Unable can mean for example that the OutputPort finished writing. This can be then used to terminate reading.

Returns:

True if port can retrieve, False if not

channel_reader_type: Type[ChannelReader]

Type of the channel reader to be created by this plugin.

commit_current_read_offset()

This method shall implement the logic of committing the current read offset based on the technology used.

retrieve() Any

This method shall implement the logic to retrieve data through the port.

Returns:

tbd by the implementation

class pypz.abstracts.channel_ports.ChannelOutputPort(name: str = None, *args, **kwargs)

Bases: OutputPortPlugin, ResourceHandlerPlugin, ExtendedPlugin, ABC

This class represents an abstract OutputPortPlugin, which utilizes the channels to realize transfer functionalities. It means that once there is a proper channel implementation, a fully functional OutputPortPlugin can be made by providing that implementation in the abstract new_output_channel() method.

Parameters:
  • name – name of the instance, if not provided, it will be attempted to deduce from the variable’s name

  • schema – the schema of the port plugin, which will be used to send/retrieve data

  • channel_writer_type – the type of the channel writer to be used

_abc_impl = <_abc._abc_data object>
_channel_config

Convenience class to represent an optional parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_channel_location

Convenience class to represent a required parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_channel_writers: set[ChannelWriter] | None

The channel writers to be maintained by this plugin. Note that a channel writer will be created for each connection from this output port plugin.

_delete_resources: bool

This flag signalizes, if resource deletion can be performed. It will be cleared, if there is an error on OperationRunning state to allow process restart.

_interrupted: bool

Helper flag to register, if the execution has been interrupted.

_on_error(source: Any, exception: Exception) None

This method can be implemented to react to error events during execution. The error itself may come from arbitrary sources.

_on_interrupt(system_signal: int = None) None

This method can be implemented to react to interrupt signals like SIGINT, SIGTERM etc. The specs implementation can then execute interrupt logic e.g., early termination of loops.

Parameters:

system_signal – id of the system signal that causes interrupt

_on_port_close() bool

This method shall implement the logic to shut down the i/o port functionalities.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_on_port_open() bool

This method shall implement the logic to initialize the i/o port functionalities.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_on_resource_creation() bool

This method shall implement the logic to create an arbitrary resource of any type.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_on_resource_deletion() bool

This method shall implement the logic to destroy the created resource.

Returns:

True succeeded, False if more iteration required (to not block the execution)

_port_close_errors: set[ChannelWriter]

Set to store the channels that had errors on port closure. It is necessary to be able not to reschedule those channels in re-execution of the state.

_port_open_timeout_ms

Convenience class to represent an optional parameter Usage:

class TestImpl(Instance):
    required_param = RequiredParameter(str)
    optional_param = OptionalParameter(str)
    def __init__(self):
        self.required_param = None
        self.optional_param = "defaultValue"
Parameters:
  • parameter_type – (str, int, float, set, list, dict, type(None))

  • alt_name – alternative name for the parameter, if specified it acts as reference to the parameter

  • on_update – callback to react on value update

_post_execution() None

This method is called after the executor and its state machine exited, but before the program exits. It can be used to perform finalization/shutdown logic that is outside the execution context.

_pre_execution() None

This method will be called before the executor state machine starts. It can be used to perform initialization that is required before the execution.

_resource_deletion_errors: set[ChannelWriter]

Set to store the channels that had errors on resource deletion. It is necessary to be able not to reschedule those channels in re-execution of the state.

channel_writer_type: Type[ChannelWriter]

Type of the channel writer to be created by this plugin.

send(data: Any) Any

This method shall implement the logic to send data provided as argument. The implementation shall specify the type of the data and the return value.

Parameters:

data – data to be sent

Returns:

tbd by the implementation