pypz.core.channels.io module

class pypz.core.channels.io.ChannelReader(channel_name: str, context: InputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)

Bases: ChannelBase

This class is the base of the input channel classes. The idea is that the class provides some core, protected abstract methods, which shall be implemented by the developer. These implementations then will be invoked by invoker methods. This makes sure that additional necessary logic will be performed along the implemented.

Parameters:
  • channel_name – name of the channel

  • context – the PortPlugin, which operates this channel

  • executor – an external ThreadPoolExecutor, if not provided, on will be created internally

MetricsBufferLength = 10

This variable specifies the length of the metric buffer and so the averaging window

NotInitialized = -9999

Some helper static to init or identify if offset is not initialized

_abc_impl = <_abc._abc_data object>
_aggregated_record_count: int

Metric value to store the summed up record count

_aggregated_time_between_reads: int

Metric value to store the summed up times between reads

abstract _commit_offset(offset: int) None

This method shall implement the logic that commits the provided offset using the underlying technology.

Parameters:

offset – provided offset to commit

_current_read_record_count: int

Stores the record count of the most current read

_current_read_timestamp: int

Stores the timestamp of the most current read to be able to calculate elapsed time

_initial_input_record_offset: int

This is variable stores the initial offset, which can be different than 0 in case of crash-restart. In this case this value is actually the difference by which the framework calculated offset shall be compensated.

_last_offset_committed: int

This value tracks the last committed offset. The main use-case of this variable is to prevent offset commit if the new offset equals the last committed.

abstract _load_input_record_offset() int

This method shall implement the logic to retrieve stored read offset from the underlying technology. This can be arbitrary, but the method shall return the stored offset.

Returns:

stored offset

_metrics_buffer: list[ChannelMetric]

This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer

_read_record_count: int

Counter for how many records have been read by this channel. Note that this value can differ from the offset value in case of crash-restart. Still the framework maintains the difference between this value and the loaded offset and will compensate the framework calculated offset by that difference.

_read_record_offset: int

The actual write offset of the read records. Eg. if 10 records have been read and provided to the plugin then the offset is 10. Note that this might differ from the m_inputRecordCount_i64 in case of crash-restart.

abstract _read_records() list[Any]

This method shall implement the logic to read records from the input channel. An ArrayList of records is expected.

Returns:

list of records OR empty ArrayList if no records read. Null is not accepted.

_status_sender_lock

This lock is used to prevent concurrent modification of metrics

acknowledge_input() None

This method sends the corresponding ack signal to the ChannelWriter

get_read_record_count() int
get_read_record_offset() int
abstract has_records() bool

This method shall implement the logic to check, if the reader can still read records.

Returns:

True if channel has still records, False if not

invoke_commit_current_read_offset() None

This method is used to invoke the implementation of the abstract method.

invoke_commit_offset(offset: int, compensate_with_initial_offset: bool = True) None

This method is used to invoke the implementation of the abstract method. This is necessary to perform some additional actions like compensating the calculated offset with the initial to make sure that the proper offset is committed. Note that this compensation makes only sense in case of crash-restart, since the initial values are probably not 0. Note that the provided offset is calculated by the calculateApplicableOffsetFrom by breaking down the calculated offset by the framework.

Parameters:
  • offset – offset calculated by the plugin for this channel

  • compensate_with_initial_offset – if True, the initial offset will be added to the provided offset

invoke_read_records() list[Any]

An invoker method that encapsulates the actual implementation. This method MUST be called instead of the implemented method directly to ensure proper channel functionality.

Returns:

list of read records or empty list in case no records read

on_status_message_send() None

This method can be implemented to hook into the event of sending status message. For example one can use it to calculate aggregated metrics between 2 sending and attach the metrics to the healthCheckPayload. VERY IMPORTANT NOTE: keep the runtime as low as possible, because the higher the runtime the more load on the status message sender thread, which might cause double activation.

set_initial_record_offset(initial_record_offset: int) None

This method initializes the internal variables

Parameters:

initial_record_offset – value to be initialized to

set_initial_record_offset_auto() None

This method sets the initial values of the internal variables to the values retrieved by the implementation.

class pypz.core.channels.io.ChannelWriter(channel_name: str, context: OutputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)

Bases: ChannelBase

This class is the base of the output channel classes. The idea is that the class provides some core, protected abstract methods, which shall be implemented by the developer. These implementations then will be invoked by invoker methods. This makes sure that additional necessary logic will be performed along the implemented.

Parameters:
  • channel_name – name of the channel

  • context – the PortPlugin, which operates this channel

  • executor – an external ThreadPoolExecutor, if not provided, on will be created internally

MetricsBufferLength = 10

This variable specifies the length of the metric buffer and so the averaging window

_abc_impl = <_abc._abc_data object>
_aggregated_record_count: int

Metric value to store the summed up record count

_aggregated_time_between_outputs: int

Metric value to store the summed up times between reads

_current_output_record_count: int

Stores the record count of the most current read

_current_output_timestamp: int

Stores the timestamp of the most current read to be able to calculate elapsed time

_metrics_buffer: list[ChannelMetric]

This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer

_status_sender_lock

This lock is used to prevent concurrent modification of metrics

abstract _write_records(records: list[Any]) None

This method shall implement the logic that writes the records to the output resource. It will automatically be invoked by the plugin via the corresponding invoker method.

Parameters:

records – list of records to be written

_written_record_count: int

Number of outputted records.

get_written_record_count()
invoke_write_records(records: list[Any]) None

This method is used to invoke the implementation of the abstract method. This is necessary to perform some additional actions e.g. updating number of outputted records.

Parameters:

records – records to be written

is_all_connected_input_channels_acknowledged() bool

This method retrieves the number of InputChannels that have acknowledged their input (i.e. the output of this channel). This information can be useful for scenarios, where a synchronization logic needs to be implemented e.g. where the output shall not provide more data until the inputs did not acknowledge. Note that the state is maintained entirely by the ChannelWriter i.e. if the ChannelReader sent the proper message, a flag will be set in the corresponding entry of the status map. Then this flag will be reset once this ChannelWriter produces new data (check invoke_write_records() for more detail).

Returns:

number of acknowledged InputChannels

on_status_message_send()

This method can be implemented to hook into the event of sending status message. For example one can use it to calculate aggregated metrics between 2 sending and attach the metrics to the healthCheckPayload. VERY IMPORTANT NOTE: keep the runtime as low as possible, because the higher the runtime the more load on the status message sender thread, which might cause double activation.