pypz.core.channels.io module
- class pypz.core.channels.io.ChannelReader(channel_name: str, context: InputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ChannelBase
This class is the base of the input channel classes. The idea is that the class provides some core, protected abstract methods, which shall be implemented by the developer. These implementations then will be invoked by invoker methods. This makes sure that additional necessary logic will be performed along the implemented.
- Parameters:
channel_name – name of the channel
context – the
PortPlugin
, which operates this channelexecutor – an external ThreadPoolExecutor, if not provided, on will be created internally
- MetricsBufferLength = 10
This variable specifies the length of the metric buffer and so the averaging window
- NotInitialized = -9999
Some helper static to init or identify if offset is not initialized
- _abc_impl = <_abc._abc_data object>
- _aggregated_record_count: int
Metric value to store the summed up record count
- _aggregated_time_between_reads: int
Metric value to store the summed up times between reads
- abstract _commit_offset(offset: int) None
This method shall implement the logic that commits the provided offset using the underlying technology.
- Parameters:
offset – provided offset to commit
- _current_read_record_count: int
Stores the record count of the most current read
- _current_read_timestamp: int
Stores the timestamp of the most current read to be able to calculate elapsed time
- _initial_input_record_offset: int
This is variable stores the initial offset, which can be different than 0 in case of crash-restart. In this case this value is actually the difference by which the framework calculated offset shall be compensated.
- _last_offset_committed: int
This value tracks the last committed offset. The main use-case of this variable is to prevent offset commit if the new offset equals the last committed.
- abstract _load_input_record_offset() int
This method shall implement the logic to retrieve stored read offset from the underlying technology. This can be arbitrary, but the method shall return the stored offset.
- Returns:
stored offset
- _metrics_buffer: list[ChannelMetric]
This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer
- _read_record_count: int
Counter for how many records have been read by this channel. Note that this value can differ from the offset value in case of crash-restart. Still the framework maintains the difference between this value and the loaded offset and will compensate the framework calculated offset by that difference.
- _read_record_offset: int
The actual write offset of the read records. Eg. if 10 records have been read and provided to the plugin then the offset is 10. Note that this might differ from the m_inputRecordCount_i64 in case of crash-restart.
- abstract _read_records() list[Any]
This method shall implement the logic to read records from the input channel. An ArrayList of records is expected.
- Returns:
list of records OR empty ArrayList if no records read. Null is not accepted.
- _status_sender_lock
This lock is used to prevent concurrent modification of metrics
- acknowledge_input() None
This method sends the corresponding ack signal to the ChannelWriter
- get_read_record_count() int
- get_read_record_offset() int
- abstract has_records() bool
This method shall implement the logic to check, if the reader can still read records.
- Returns:
True if channel has still records, False if not
- invoke_commit_current_read_offset() None
This method is used to invoke the implementation of the abstract method.
- invoke_commit_offset(offset: int, compensate_with_initial_offset: bool = True) None
This method is used to invoke the implementation of the abstract method. This is necessary to perform some additional actions like compensating the calculated offset with the initial to make sure that the proper offset is committed. Note that this compensation makes only sense in case of crash-restart, since the initial values are probably not 0. Note that the provided offset is calculated by the calculateApplicableOffsetFrom by breaking down the calculated offset by the framework.
- Parameters:
offset – offset calculated by the plugin for this channel
compensate_with_initial_offset – if True, the initial offset will be added to the provided offset
- invoke_read_records() list[Any]
An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.
- Returns:
list of read records or empty list in case no records read
- on_status_message_send() None
This method can be implemented to hook into the event of sending status message. For example one can use it to calculate aggregated metrics between 2 sending and attach the metrics to the healthCheckPayload. VERY IMPORTANT NOTE: keep the runtime as low as possible, because the higher the runtime the more load on the status message sender thread, which might cause double activation.
- set_initial_record_offset(initial_record_offset: int) None
This method initializes the internal variables
- Parameters:
initial_record_offset – value to be initialized to
- set_initial_record_offset_auto() None
This method sets the initial values of the internal variables to the values retrieved by the implementation.
- class pypz.core.channels.io.ChannelWriter(channel_name: str, context: OutputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ChannelBase
This class is the base of the output channel classes. The idea is that the class provides some core, protected abstract methods, which shall be implemented by the developer. These implementations then will be invoked by invoker methods. This makes sure that additional necessary logic will be performed along the implemented.
- Parameters:
channel_name – name of the channel
context – the
PortPlugin
, which operates this channelexecutor – an external ThreadPoolExecutor, if not provided, on will be created internally
- MetricsBufferLength = 10
This variable specifies the length of the metric buffer and so the averaging window
- _abc_impl = <_abc._abc_data object>
- _aggregated_record_count: int
Metric value to store the summed up record count
- _aggregated_time_between_outputs: int
Metric value to store the summed up times between reads
- _current_output_record_count: int
Stores the record count of the most current read
- _current_output_timestamp: int
Stores the timestamp of the most current read to be able to calculate elapsed time
- _metrics_buffer: list[ChannelMetric]
This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer
- _status_sender_lock
This lock is used to prevent concurrent modification of metrics
- abstract _write_records(records: list[Any]) None
This method shall implement the logic that writes the records to the output resource. It will automatically be invoked by the plugin via the corresponding invoker method.
- Parameters:
records – list of records to be written
- _written_record_count: int
Number of outputted records.
- get_written_record_count()
- invoke_write_records(records: list[Any]) None
This method is used to invoke the implementation of the abstract method. This is necessary to perform some additional actions e.g. updating number of outputted records.
- Parameters:
records – records to be written
- is_all_connected_input_channels_acknowledged() bool
This method retrieves the number of InputChannels that have acknowledged their input (i.e. the output of this channel). This information can be useful for scenarios, where a synchronization logic needs to be implemented e.g. where the output shall not provide more data until the inputs did not acknowledge. Note that the state is maintained entirely by the ChannelWriter i.e. if the ChannelReader sent the proper message, a flag will be set in the corresponding entry of the status map. Then this flag will be reset once this ChannelWriter produces new data (check invoke_write_records() for more detail).
- Returns:
number of acknowledged InputChannels
- on_status_message_send()
This method can be implemented to hook into the event of sending status message. For example one can use it to calculate aggregated metrics between 2 sending and attach the metrics to the healthCheckPayload. VERY IMPORTANT NOTE: keep the runtime as low as possible, because the higher the runtime the more load on the status message sender thread, which might cause double activation.