pypz.plugins.rmq_io.channels module

class pypz.plugins.rmq_io.channels.RMQChannelReader(channel_name: str, context: InputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)

Bases: ChannelReader

_abc_impl = <_abc._abc_data object>
_aggregated_record_count: int

Metric value to store the summed up record count

_aggregated_time_between_reads: int

Metric value to store the summed up times between reads

_close_channel() bool

This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.

Returns:

True, if done, False if it is still in progress

_commit_offset(offset: int) None

This method shall implement the logic that commits the provided offset using the underlying technology.

Parameters:

offset – provided offset to commit

_config_data_consumer_timeout_sec: float

Configuration parameter to specify the timeout for draining events from the data queue

_config_max_poll_records: int

Configuration parameter to specify the max number of messages to process in one go

_config_status_consumer_timeout_sec: float

Configuration parameter to specify the timeout for draining events from the status stream

_configure_channel(channel_configuration: dict) None

This method shall implement the logic to interpret the provided configuration.

Parameters:

channel_configuration – config string

_create_resources() bool

This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.

Returns:

True, if done, False if it is still in progress

_current_read_record_count: int

Stores the record count of the most current read

_current_read_timestamp: int

Stores the timestamp of the most current read to be able to calculate elapsed time

_data_consumer: MessageConsumer | None

Consumer wrapper to consume data messages sent by the ChannelWriter

_data_exchange_name: str

Data goes through exchanges, a dedicated exchange with this name will be created for this channel.

_data_queue_name: str

Data queue name, where all the data messages go through

_delete_resources() bool

This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()

Returns:

True, if done, False if it is still in progress

_exchange_type

This controls, how the exchange shall forward messages to the consumers. In group mode, all the consumers shall get all the messages, hence we need to specify it as “fanout”

_generic_datum_reader: DatumReader | None

This is the generic datum reader, which converts bytes to generic records. It will be only initialized, if a schema is provided.

_initial_input_record_offset: int

This is variable stores the initial offset, which can be different than 0 in case of crash-restart. In this case this value is actually the difference by which the framework calculated offset shall be compensated.

_last_offset_committed: int

This value tracks the last committed offset. The main use-case of this variable is to prevent offset commit if the new offset equals the last committed.

_load_input_record_offset() int

Offset has no meaning in queues, nevertheless the value -1 is necessary, since if it signalizes that no offset ever was committed.

_metrics_buffer: list[ChannelMetric]

This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer

_open_channel() bool

This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.

Returns:

True, if done, False if it is still in progress

_read_record_count: int

Counter for how many records have been read by this channel. Note that this value can differ from the offset value in case of crash-restart. Still the framework maintains the difference between this value and the loaded offset and will compensate the framework calculated offset by that difference.

_read_record_offset: int

The actual write offset of the read records. Eg. if 10 records have been read and provided to the plugin then the offset is 10. Note that this might differ from the m_inputRecordCount_i64 in case of crash-restart.

_read_records() list[Any]

This method shall implement the logic to read records from the input channel. An ArrayList of records is expected.

Returns:

list of records OR empty ArrayList if no records read. Null is not accepted.

_reader_status_producer: MessageProducer | None

Producer wrapper to produce status messages for the ChannelWriters

_reader_status_stream_name: str

Name of the stream, which contains the reader status signals

_retrieve_status_messages() list | None

This implementation retrieves the status messages from the corresponding stream. Notice that unlike in case of the queue, where arbitrary amount of messages can be received via a drain_events() call, for streams it is always only 1 message. For this reason, we need to poll all available records at most the number of MaxStatusMessageRetrieveCount.

_send_status_message(message: str) None

This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.

Parameters:

message – message that shall be sent

_writer_status_consumer: MessageConsumer | None

Consumer wrapper to consume status messages sent by the ChannelWriters

_writer_status_stream_name: str

Name of the stream, which contains the writer status signals

can_close() bool

This method can implement the logic to determine, if the channel can be closed or not.

Returns:

True if the channel can be closed, False otherwise

has_records() bool

This method shall implement the logic to check, if the reader can still read records.

Returns:

True if channel has still records, False if not

class pypz.plugins.rmq_io.channels.RMQChannelWriter(channel_name: str, context: OutputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)

Bases: ChannelWriter

_abc_impl = <_abc._abc_data object>
_aggregated_record_count: int

Metric value to store the summed up record count

_aggregated_time_between_outputs: int

Metric value to store the summed up times between reads

_close_channel() bool

This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.

Returns:

True, if done, False if it is still in progress

_config_status_consumer_timeout_sec: float

Configuration parameter to specify the timeout for draining events from the status stream

_configure_channel(channel_configuration: dict) None

This method shall implement the logic to interpret the provided configuration.

Parameters:

channel_configuration – config string

_context: OutputPortPlugin

The context of this channel, which shall be an object of PortPlugin type

_create_resources() bool

This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.

Returns:

True, if done, False if it is still in progress

_current_output_record_count: int

Stores the record count of the most current read

_current_output_timestamp: int

Stores the timestamp of the most current read to be able to calculate elapsed time

_data_exchange_name: str

Data goes through exchanges, a dedicated exchange with this name will be created for this channel.

_data_producer: MessageProducer | None

Producer wrapper to produce data messages for the ChannelReaders

_data_queue_name: str

Data queue name

_delete_resources() bool

This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()

Returns:

True, if done, False if it is still in progress

_generic_datum_writer: DatumWriter | None

This is the datum writer that converts generic records to byte data. It is only initialized, if schema is provided.

_metrics_buffer: list[ChannelMetric]

This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer

_open_channel() bool

This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.

Returns:

True, if done, False if it is still in progress

_reader_status_consumer: MessageConsumer | None

Consumer wrapper to consume status messages sent by the ChannelReaders

_reader_status_stream_name: str

Name of the stream, which contains the reader status signals

_retrieve_status_messages() list | None

This implementation retrieves the status messages from the corresponding stream. Notice that unlike in case of the queue, where arbitrary amount of messages can be received via a drain_events() call, for streams it is always only 1 message. For this reason, we need to poll all available records at most the number of MaxStatusMessageRetrieveCount.

_send_status_message(message: str) None

This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.

Parameters:

message – message that shall be sent

_write_records(records: list[Any]) None

This method shall implement the logic that writes the records to the output resource. It will automatically be invoked by the plugin via the corresponding invoker method.

Parameters:

records – list of records to be written

_writer_status_producer: MessageProducer | None

Producer wrapper to produce status messages for the ChannelReaders

_writer_status_stream_name: str

Name of the stream, which contains the writer status signals

_written_record_count: int

Number of outputted records.

can_close() bool

This method can implement the logic to determine, if the channel can be closed or not.

Returns:

True if the channel can be closed, False otherwise