pypz.plugins.rmq_io.channels module
- class pypz.plugins.rmq_io.channels.RMQChannelReader(channel_name: str, context: InputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ChannelReader
- _abc_impl = <_abc._abc_data object>
- _aggregated_record_count: int
Metric value to store the summed up record count
- _aggregated_time_between_reads: int
Metric value to store the summed up times between reads
- _channel_name: str
This member stores the name of the channel, which normally reflects to the resource’s name.
- _channel_opened: bool
This member is a flag to signalize, whether a channel is open and ready to use.
- _channel_started: bool
Status flag to signalize, whether the channel has been started.
- _channel_stopped: bool
Status flag to signalize, whether the channel has been stopped. Note that this additional flag is necessary, since having the _channel_started on false does not necessarily mean that the output was ever started and is now finished.
- _close_channel() bool
This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.
- Returns:
True, if done, False if it is still in progress
- _commit_offset(offset: int) None
This method shall implement the logic that commits the provided offset using the underlying technology.
- Parameters:
offset – provided offset to commit
- _config_data_consumer_timeout_sec: float
Configuration parameter to specify the timeout for draining events from the data queue
- _config_max_poll_records: int
Configuration parameter to specify the max number of messages to process in one go
- _config_status_consumer_timeout_sec: float
Configuration parameter to specify the timeout for draining events from the status stream
- _configuration: dict
This member stores the configuration string of the channel. This is a kind of serialized storage, since the configuration can be an arbitrary type with arbitrary schema as long as the interpretation is knonw.
- _configure_channel(channel_configuration: dict) None
This method shall implement the logic to interpret the provided configuration.
- Parameters:
channel_configuration – config string
- _context: PortPlugin
The context of this channel, which shall be an object of PortPlugin type
- _control_loop_exception_timer: int
Control loop is, where the control information are sent to the counter channel. Should there an exception occur, we need to make sure that we give some time to recover, before terminating the channel. This timer defines that grace period.
- _create_resources() bool
This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.
- Returns:
True, if done, False if it is still in progress
- _current_read_record_count: int
Stores the record count of the most current read
- _current_read_timestamp: int
Stores the timestamp of the most current read to be able to calculate elapsed time
- _data_consumer: MessageConsumer | None
Consumer wrapper to consume data messages sent by the ChannelWriter
- _data_exchange_name: str
Data goes through exchanges, a dedicated exchange with this name will be created for this channel.
- _data_queue_name: str
Data queue name, where all the data messages go through
- _delete_resources() bool
This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()
- Returns:
True, if done, False if it is still in progress
- _exchange_type
This controls, how the exchange shall forward messages to the consumers. In group mode, all the consumers shall get all the messages, hence we need to specify it as “fanout”
- _executor: concurrent.futures.ThreadPoolExecutor | None
This executor is used to execute the background thread, which will invoke the sendHealthStatus method
- _executor_started: bool
Flag to signalize, whether the executor thread started
- _executor_stopped: SynchronizedReference[bool]
Flag to signalize that the executor thread has been terminated. This flag is necessary along wi _stopping_executor to synchronize the termination. Otherwise it can happen that the thread did not terminate yet, but the channel has already been closed resulting in exceptions.
- _generic_datum_reader: DatumReader | None
This is the generic datum reader, which converts bytes to generic records. It will be only initialized, if a schema is provided.
- _health_check_payload: dict[str, Any]
It is possible to send additional information along with health check events. This member serves as the storage of this additional payload.
- _initial_input_record_offset: int
This is variable stores the initial offset, which can be different than 0 in case of crash-restart. In this case this value is actually the difference by which the framework calculated offset shall be compensated.
- _last_offset_committed: int
This value tracks the last committed offset. The main use-case of this variable is to prevent offset commit if the new offset equals the last committed.
- _load_input_record_offset() int
Offset has no meaning in queues, nevertheless the value -1 is necessary, since if it signalizes that no offset ever was committed.
- _location: str | None
This member stores the location string of the channel. The location depends on the technology. It can refer to either a remote or a local URL as well.
- _log_level: str
Default log level. This is ok to set here, since if Operator’s context logger is present, its log level cannot be overriden, but if not, the DefaultContextLogger’s can be.
- _logger: ContextLogger
Channel logger
- _metrics_buffer: list[ChannelMetric]
This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer
- _metrics_enabled: bool
Enables metric calculation i.e., to calculate additional i/o related information
- _on_status_message_received_callbacks: set[Callable[[list[ChannelStatusMessage]], None]]
Stores the callbacks, which will be executed, if status messages received
- _open_channel() bool
This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.
- Returns:
True, if done, False if it is still in progress
- _read_record_count: int
Counter for how many records have been read by this channel. Note that this value can differ from the offset value in case of crash-restart. Still the framework maintains the difference between this value and the loaded offset and will compensate the framework calculated offset by that difference.
- _read_record_offset: int
The actual write offset of the read records. Eg. if 10 records have been read and provided to the plugin then the offset is 10. Note that this might differ from the m_inputRecordCount_i64 in case of crash-restart.
- _read_records() list[Any]
This method shall implement the logic to read records from the input channel. An ArrayList of records is expected.
- Returns:
list of records OR empty ArrayList if no records read. Null is not accepted.
- _reader_status_producer: MessageProducer | None
Producer wrapper to produce status messages for the ChannelWriters
- _reader_status_stream_name: str
Name of the stream, which contains the reader status signals
- _resources_created: bool
Status flag to signalize that the channel’s resources are created
- _resources_deleted: bool
Status flag to signalize that the channel’s resources are deleted
- _retrieve_status_messages() list | None
This implementation retrieves the status messages from the corresponding stream. Notice that unlike in case of the queue, where arbitrary amount of messages can be received via a drain_events() call, for streams it is always only 1 message. For this reason, we need to poll all available records at most the number of MaxStatusMessageRetrieveCount.
- _send_status_message(message: str) None
This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.
- Parameters:
message – message that shall be sent
- _silent_mode: bool
If this flag is set, then this channel will not send status messages. One use-case is, if a channelRW is created to sniff the status of channels.
- _status_map: dict[str, ChannelStatusMonitor]
This map stores the health statuses of all connected channels
- _stopping_executor: bool
Flag to signalize the termination attempt of the executor thread
- _unique_name: str
This name identifies the channel in its context. It is necessary, since channel name on its own is not unique, can be reused by different owners.
- _writer_status_consumer: MessageConsumer | None
Consumer wrapper to consume status messages sent by the ChannelWriters
- _writer_status_stream_name: str
Name of the stream, which contains the writer status signals
- can_close() bool
This method can implement the logic to determine, if the channel can be closed or not.
- Returns:
True if the channel can be closed, False otherwise
- has_records() bool
This method shall implement the logic to check, if the reader can still read records.
- Returns:
True if channel has still records, False if not
- class pypz.plugins.rmq_io.channels.RMQChannelWriter(channel_name: str, context: OutputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ChannelWriter
- _abc_impl = <_abc._abc_data object>
- _aggregated_record_count: int
Metric value to store the summed up record count
- _aggregated_time_between_outputs: int
Metric value to store the summed up times between reads
- _channel_name: str
This member stores the name of the channel, which normally reflects to the resource’s name.
- _channel_opened: bool
This member is a flag to signalize, whether a channel is open and ready to use.
- _channel_started: bool
Status flag to signalize, whether the channel has been started.
- _channel_stopped: bool
Status flag to signalize, whether the channel has been stopped. Note that this additional flag is necessary, since having the _channel_started on false does not necessarily mean that the output was ever started and is now finished.
- _close_channel() bool
This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.
- Returns:
True, if done, False if it is still in progress
- _config_status_consumer_timeout_sec: float
Configuration parameter to specify the timeout for draining events from the status stream
- _configuration: dict
This member stores the configuration string of the channel. This is a kind of serialized storage, since the configuration can be an arbitrary type with arbitrary schema as long as the interpretation is knonw.
- _configure_channel(channel_configuration: dict) None
This method shall implement the logic to interpret the provided configuration.
- Parameters:
channel_configuration – config string
- _context: PortPlugin
The context of this channel, which shall be an object of PortPlugin type
- _control_loop_exception_timer: int
Control loop is, where the control information are sent to the counter channel. Should there an exception occur, we need to make sure that we give some time to recover, before terminating the channel. This timer defines that grace period.
- _create_resources() bool
This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.
- Returns:
True, if done, False if it is still in progress
- _current_output_record_count: int
Stores the record count of the most current read
- _current_output_timestamp: int
Stores the timestamp of the most current read to be able to calculate elapsed time
- _data_exchange_name: str
Data goes through exchanges, a dedicated exchange with this name will be created for this channel.
- _data_producer: MessageProducer | None
Producer wrapper to produce data messages for the ChannelReaders
- _data_queue_name: str
Data queue name
- _delete_resources() bool
This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()
- Returns:
True, if done, False if it is still in progress
- _executor: concurrent.futures.ThreadPoolExecutor | None
This executor is used to execute the background thread, which will invoke the sendHealthStatus method
- _executor_started: bool
Flag to signalize, whether the executor thread started
- _executor_stopped: SynchronizedReference[bool]
Flag to signalize that the executor thread has been terminated. This flag is necessary along wi _stopping_executor to synchronize the termination. Otherwise it can happen that the thread did not terminate yet, but the channel has already been closed resulting in exceptions.
- _generic_datum_writer: DatumWriter | None
This is the datum writer that converts generic records to byte data. It is only initialized, if schema is provided.
- _health_check_payload: dict[str, Any]
It is possible to send additional information along with health check events. This member serves as the storage of this additional payload.
- _location: str | None
This member stores the location string of the channel. The location depends on the technology. It can refer to either a remote or a local URL as well.
- _log_level: str
Default log level. This is ok to set here, since if Operator’s context logger is present, its log level cannot be overriden, but if not, the DefaultContextLogger’s can be.
- _logger: ContextLogger
Channel logger
- _metrics_buffer: list[ChannelMetric]
This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer
- _metrics_enabled: bool
Enables metric calculation i.e., to calculate additional i/o related information
- _on_status_message_received_callbacks: set[Callable[[list[ChannelStatusMessage]], None]]
Stores the callbacks, which will be executed, if status messages received
- _open_channel() bool
This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.
- Returns:
True, if done, False if it is still in progress
- _reader_status_consumer: MessageConsumer | None
Consumer wrapper to consume status messages sent by the ChannelReaders
- _reader_status_stream_name: str
Name of the stream, which contains the reader status signals
- _resources_created: bool
Status flag to signalize that the channel’s resources are created
- _resources_deleted: bool
Status flag to signalize that the channel’s resources are deleted
- _retrieve_status_messages() list | None
This implementation retrieves the status messages from the corresponding stream. Notice that unlike in case of the queue, where arbitrary amount of messages can be received via a drain_events() call, for streams it is always only 1 message. For this reason, we need to poll all available records at most the number of MaxStatusMessageRetrieveCount.
- _send_status_message(message: str) None
This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.
- Parameters:
message – message that shall be sent
- _silent_mode: bool
If this flag is set, then this channel will not send status messages. One use-case is, if a channelRW is created to sniff the status of channels.
- _status_map: dict[str, ChannelStatusMonitor]
This map stores the health statuses of all connected channels
- _stopping_executor: bool
Flag to signalize the termination attempt of the executor thread
- _unique_name: str
This name identifies the channel in its context. It is necessary, since channel name on its own is not unique, can be reused by different owners.
- _write_records(records: list[Any]) None
This method shall implement the logic that writes the records to the output resource. It will automatically be invoked by the plugin via the corresponding invoker method.
- Parameters:
records – list of records to be written
- _writer_status_producer: MessageProducer | None
Producer wrapper to produce status messages for the ChannelReaders
- _writer_status_stream_name: str
Name of the stream, which contains the writer status signals
- _written_record_count: int
Number of outputted records.
- can_close() bool
This method can implement the logic to determine, if the channel can be closed or not.
- Returns:
True if the channel can be closed, False otherwise