pypz.core.channels.base module
- class pypz.core.channels.base.ChannelBase(channel_name: str, context: PortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ABC
The abstract base for the
ChannelReader
and theChannelWriter
classes. It contains the common logic to be used in both classes’ implementations.- Parameters:
channel_name – name of the channel
context – the
PortPlugin
, which operates this channelexecutor – an external ThreadPoolExecutor, if not provided, on will be created internally
- ControlLoopExceptionTimeoutInMs = 60000
This value defines the timeout for having unhandled exception in the control loop. After expiration, the control loop will be terminated.
- DefaultStatusThreadIntervalInMs = 2000
This value defines in how many ms the status messages (health check) will be sent to the counterpart channel.
- ParamKeyLogLevel = 'logLevel'
- ParamKeyMetricsEnabled = 'metricsEnabled'
- __executor_thread_handler()
- _abc_impl = <_abc._abc_data object>
- _channel_name: str
This member stores the name of the channel, which normally reflects to the resource’s name.
- _channel_opened: bool
This member is a flag to signalize, whether a channel is open and ready to use.
- _channel_started: bool
Status flag to signalize, whether the channel has been started.
- _channel_state_update_lock
This lock is used to synchronize the logic that updates channel states
- _channel_stopped: bool
Status flag to signalize, whether the channel has been stopped. Note that this additional flag is necessary, since having the _channel_started on false does not necessarily mean that the output was ever started and is now finished.
- abstract _close_channel() bool
This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.
- Returns:
True, if done, False if it is still in progress
- _configuration: dict
This member stores the configuration string of the channel. This is a kind of serialized storage, since the configuration can be an arbitrary type with arbitrary schema as long as the interpretation is knonw.
- abstract _configure_channel(channel_configuration: dict) None
This method shall implement the logic to interpret the provided configuration.
- Parameters:
channel_configuration – config string
- _context: PortPlugin
The context of this channel, which shall be an object of PortPlugin type
- _control_loop_exception_timer: int
Control loop is, where the control information are sent to the counter channel. Should there an exception occur, we need to make sure that we give some time to recover, before terminating the channel. This timer defines that grace period.
- abstract _create_resources() bool
This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.
- Returns:
True, if done, False if it is still in progress
- abstract _delete_resources() bool
This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()
- Returns:
True, if done, False if it is still in progress
- _executor: concurrent.futures.ThreadPoolExecutor | None
This executor is used to execute the background thread, which will invoke the sendHealthStatus method
- _executor_started: bool
Flag to signalize, whether the executor thread started
- _executor_stopped: SynchronizedReference[bool]
Flag to signalize that the executor thread has been terminated. This flag is necessary along wi _stopping_executor to synchronize the termination. Otherwise it can happen that the thread did not terminate yet, but the channel has already been closed resulting in exceptions.
- _health_check_payload: dict[str, Any]
It is possible to send additional information along with health check events. This member serves as the storage of this additional payload.
- _location: str | None
This member stores the location string of the channel. The location depends on the technology. It can refer to either a remote or a local URL as well.
- _log_level: str
Default log level. This is ok to set here, since if Operator’s context logger is present, its log level cannot be overriden, but if not, the DefaultContextLogger’s can be.
- _logger: ContextLogger
Channel logger
- _metrics_enabled: bool
Enables metric calculation i.e., to calculate additional i/o related information
- _on_status_message_received_callbacks: set[Callable[[list[ChannelStatusMessage]], None]]
Stores the callbacks, which will be executed, if status messages received
- abstract _open_channel() bool
This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.
- Returns:
True, if done, False if it is still in progress
- _resources_created: bool
Status flag to signalize that the channel’s resources are created
- _resources_deleted: bool
Status flag to signalize that the channel’s resources are deleted
- abstract _retrieve_status_messages() list | None
This method shall implement the logic that retrieves the status messages published by the counterpart channel. Notice that in case of ChannelWriter, there can be multiple InputChannels sending messages and in case of ChannelReader there can be multiple messages sent by the ChannelWriter, therefore this method shall return a list of messages. Note as well that in case you are using callbacks for your technology, you can directly use the method onStatusMessageReceived. In this case simply return null from this method to ensure that it will be ignored.
- Returns:
list of retrieved status messages or null if method is not used
- abstract _send_status_message(message: str) None
This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.
- Parameters:
message – message that shall be sent
- _silent_mode: bool
If this flag is set, then this channel will not send status messages. One use-case is, if a channelRW is created to sniff the status of channels.
- _status_map: dict[str, ChannelStatusMonitor]
This map stores the health statuses of all connected channels
- _stopping_executor: bool
Flag to signalize the termination attempt of the executor thread
- _unique_name: str
This name identifies the channel in its context. It is necessary, since channel name on its own is not unique, can be reused by different owners.
- can_close() bool
This method can implement the logic to determine, if the channel can be closed or not.
- Returns:
True if the channel can be closed, False otherwise
- get_channel_name()
- get_configuration() dict
- get_context() PortPlugin
- get_location()
- get_logger()
- get_unique_name()
- invoke_close_channel() bool
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- Returns:
True if operation done, False if still in progress
- invoke_configure_channel(channel_configuration: dict) None
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- invoke_open_channel() bool
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- Returns:
True if operation done, False if still in progress
- invoke_resource_creation() bool
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- Returns:
True if operation done, False if still in progress
- invoke_resource_deletion() bool
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- Returns:
True if operation done, False if still in progress
- invoke_sync_send_status_message(status: ChannelStatus, payload: Any = None) None
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- invoke_sync_status_update()
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- is_any_connected_channel_healthy_and_not_closed() bool
This method realises the question “Is there any connected channel that is healthy, but not closed?”. This can be seen as opposite of “Is all connected channels closed or unhealthy?” with the strong difference that the latter does not give a proper answer if there is no connected channels at all.
- Returns:
True if there is any channel healthy, but not closed, False if not or no connected channel
- is_any_connected_channel_healthy_and_not_stopped() bool
This method realises the question “Is there any connected channel that is healthy, but not stopped?”. This can be seen as opposite of “Is all connected channels stopped or unhealthy?” with the strong difference that the latter does not give a proper answer if there is no connected channels at all.
- Returns:
True if there is any channel healthy, but not stopped, False if not or no connected channel
- is_any_connected_channel_healthy_and_not_stopped_and_not_closed() bool
This method realises the question “Is there any connected channel that is healthy, but not stopped and not closed?”. This can be seen as opposite of “Is all connected channels closed or stopped or unhealthy?” with the strong difference that the latter does not give a proper answer if there is no connected channels at all.
- Returns:
True if there is any channel healthy, but not stopped and not closed, False if not or no connected channel
- is_any_connected_channels_healthy() bool
This method realises the question “Is there any channel healthy?”. This can be seen as opposite of “Is all channels unhealthy?” with the strong difference that the latter does not give a proper answer if there is no connected channels at all.
- Returns:
True if there is any channel healthy, False if not or no connected channel
- is_any_connected_channels_unhealthy() bool
This method realises the question “Is there any connected channels unhealthy?”. This can be seen as opposite of “Is all connected channels healthy?” with the strong difference that the latter does not give a proper answer if there is no connected channels at all.
- Returns:
True if there is any channel unhealthy, False if not or no connected channels
- is_channel_open()
- is_channel_started()
- is_channel_stopped()
- is_resource_created()
- is_resource_deleted()
- on_new_channel_status_monitor(status_monitor: ChannelStatusMonitor) None
This method can be overridden to handle the creation of the new ChannelStatusMonitor e.g. registering callbacks.
- Parameters:
status_monitor – ChannelStatusMonitor object newly created
- on_status_message_received(callback: Callable[[list[ChannelStatusMessage]], None]) None
Adds a callback to the set of callbacks, which will be executed upon status message received.
- Parameters:
callback – callback Callable[[list[str]], None]
- on_status_message_send()
This method can be implemented to hook into the event of sending status message. For example one can use it to calculate aggregated metrics between 2 sending and attach the metrics to the healthCheckPayload. VERY IMPORTANT NOTE: keep the runtime as low as possible, because the higher the runtime the more load on the status message sender thread, which might cause double activation.
- retrieve_all_connected_channel_count() int
Returns the number of tracked input channels. This can be used to synchronize some activities like starting the output.
- Returns:
number of tracked input channels
- retrieve_connected_channel_unique_names(filter_function: Callable[[ChannelFilter], bool] | None = None) set
This method returns a set of connected channel names given the evaluation criteria passed as argument.
- Parameters:
filter_function – check_function check the ChannelFilter for more details (nullable)
- Returns:
Set of channel names that passes the given evaluation
- retrieve_healthy_connected_channel_count() int
Returns the number of healthy tracked input channels. This can be used to synchronize some activities like starting the output.
- Returns:
number of healthy tracked input channels
- set_location(channel_location: str)
- start_channel(send_status_message: bool = True) None
This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been started.
- Parameters:
send_status_message – if True the status message sending will be called
- stop_channel(send_status_message: bool = True) None
This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been stopped.
- Parameters:
send_status_message – if True the status message sending will be called
- class pypz.core.channels.base.ChannelMetric(elapsedTimeSinceLastIO, recordCountInLastIO)
Bases:
object