pypz.abstracts.channel_ports module
- class pypz.abstracts.channel_ports.ChannelInputPort(name: str = None, *args, **kwargs)
Bases:
InputPortPlugin
,ResourceHandlerPlugin
,ExtendedPlugin
,ABC
This class represents an abstract InputPortPlugin, which utilizes the channels to realize transfer functionalities. It means that once there is a proper channel implementation, a fully functional InputPortPlugin can be made by providing that implementation in the abstract new_input_channel() method.
- Parameters:
name – name of the instance, if not provided, it will be attempted to deduce from the variable’s name
schema – the schema of the port plugin, which will be used to send/retrieve data
group_mode – if set to True, the all the input ports in the group shall receive all messages
channel_reader_type – the type of the channel reader to be used
- _abc_impl = <_abc._abc_data object>
- _channel_config
Convenience class to represent an optional parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _channel_location
Convenience class to represent a required parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _channel_reader: ChannelReader | None
The only channel reader maintained by this plugin.
- _delete_resources: bool
This flag signalizes, if resource deletion can be performed. It will be cleared, if there is an error on OperationRunning state to allow process restart.
- _expected_output_count: int | None
During _on_port_open the expected number of connected outputs will be calculated. It then will be used in cases, where we need to wait for all outputs.
- _interrupted: bool
Helper flag to register, if the execution has been interrupted.
- _need_to_check_connections_opened: bool
Helper flag to signalize, if the channel shall be opened. This is necessary to maintain state in _on_port_open().
- _on_error(source: Any, exception: Exception) None
This method can be implemented to react to error events during execution. The error itself may come from arbitrary sources.
- _on_interrupt(system_signal: int = None) None
This method can be implemented to react to interrupt signals like SIGINT, SIGTERM etc. The specs implementation can then execute interrupt logic e.g., early termination of loops.
- Parameters:
system_signal – id of the system signal that causes interrupt
- _on_port_close() bool
This method shall implement the logic to shut down the i/o port functionalities.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _on_port_open() bool
This method shall implement the logic to initialize the i/o port functionalities.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _on_resource_creation() bool
This method shall implement the logic to create an arbitrary resource of any type.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _on_resource_deletion() bool
This method shall implement the logic to destroy the created resource.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _port_open_timeout_ms
Convenience class to represent an optional parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _post_execution() None
This method is called after the executor and its state machine exited, but before the program exits. It can be used to perform finalization/shutdown logic that is outside the execution context.
- _pre_execution() None
This method will be called before the executor state machine starts. It can be used to perform initialization that is required before the execution.
- _sequential_mode_enabled
Convenience class to represent an optional parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _sync_connections_open
Convenience class to represent an optional parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- can_retrieve() bool
This method shall implement the logic to signalize, whether the InputPort is still able to retrieve. Unable can mean for example that the OutputPort finished writing. This can be then used to terminate reading.
- Returns:
True if port can retrieve, False if not
- channel_reader_type: Type[ChannelReader]
Type of the channel reader to be created by this plugin.
- commit_current_read_offset()
This method shall implement the logic of committing the current read offset based on the technology used.
- retrieve() Any
This method shall implement the logic to retrieve data through the port.
- Returns:
tbd by the implementation
- class pypz.abstracts.channel_ports.ChannelOutputPort(name: str = None, *args, **kwargs)
Bases:
OutputPortPlugin
,ResourceHandlerPlugin
,ExtendedPlugin
,ABC
This class represents an abstract OutputPortPlugin, which utilizes the channels to realize transfer functionalities. It means that once there is a proper channel implementation, a fully functional OutputPortPlugin can be made by providing that implementation in the abstract new_output_channel() method.
- Parameters:
name – name of the instance, if not provided, it will be attempted to deduce from the variable’s name
schema – the schema of the port plugin, which will be used to send/retrieve data
channel_writer_type – the type of the channel writer to be used
- _abc_impl = <_abc._abc_data object>
- _channel_config
Convenience class to represent an optional parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _channel_location
Convenience class to represent a required parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _channel_writers: set[ChannelWriter] | None
The channel writers to be maintained by this plugin. Note that a channel writer will be created for each connection from this output port plugin.
- _delete_resources: bool
This flag signalizes, if resource deletion can be performed. It will be cleared, if there is an error on OperationRunning state to allow process restart.
- _interrupted: bool
Helper flag to register, if the execution has been interrupted.
- _on_error(source: Any, exception: Exception) None
This method can be implemented to react to error events during execution. The error itself may come from arbitrary sources.
- _on_interrupt(system_signal: int = None) None
This method can be implemented to react to interrupt signals like SIGINT, SIGTERM etc. The specs implementation can then execute interrupt logic e.g., early termination of loops.
- Parameters:
system_signal – id of the system signal that causes interrupt
- _on_port_close() bool
This method shall implement the logic to shut down the i/o port functionalities.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _on_port_open() bool
This method shall implement the logic to initialize the i/o port functionalities.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _on_resource_creation() bool
This method shall implement the logic to create an arbitrary resource of any type.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _on_resource_deletion() bool
This method shall implement the logic to destroy the created resource.
- Returns:
True succeeded, False if more iteration required (to not block the execution)
- _port_close_errors: set[ChannelWriter]
Set to store the channels that had errors on port closure. It is necessary to be able not to reschedule those channels in re-execution of the state.
- _port_open_timeout_ms
Convenience class to represent an optional parameter Usage:
class TestImpl(Instance): required_param = RequiredParameter(str) optional_param = OptionalParameter(str) def __init__(self): self.required_param = None self.optional_param = "defaultValue"
- Parameters:
parameter_type – (str, int, float, set, list, dict, type(None))
alt_name – alternative name for the parameter, if specified it acts as reference to the parameter
on_update – callback to react on value update
- _post_execution() None
This method is called after the executor and its state machine exited, but before the program exits. It can be used to perform finalization/shutdown logic that is outside the execution context.
- _pre_execution() None
This method will be called before the executor state machine starts. It can be used to perform initialization that is required before the execution.
- _resource_deletion_errors: set[ChannelWriter]
Set to store the channels that had errors on resource deletion. It is necessary to be able not to reschedule those channels in re-execution of the state.
- channel_writer_type: Type[ChannelWriter]
Type of the channel writer to be created by this plugin.
- send(data: Any) Any
This method shall implement the logic to send data provided as argument. The implementation shall specify the type of the data and the return value.
- Parameters:
data – data to be sent
- Returns:
tbd by the implementation