pypz.plugins.kafka_io.channels module

class pypz.plugins.kafka_io.channels.KafkaChannelReader(channel_name: str, context: InputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)

Bases: ChannelReader

DataConsumerTimeoutInMs = 5000
InitialDataConsumerTimeoutInMs = 10000
StatusConsumerTimeoutInMs = 1000
_abc_impl = <_abc._abc_data object>
_admin_client: KafkaAdminClient | None

Kafka admin client. Necessary to check, whether topic is existing on retrieving 0 records on poll.

_aggregated_record_count: int

Metric value to store the summed up record count

_aggregated_time_between_reads: int

Metric value to store the summed up times between reads

_channel_name: str

This member stores the name of the channel, which normally reflects to the resource’s name.

_channel_opened: bool

This member is a flag to signalize, whether a channel is open and ready to use.

_channel_started: bool

Status flag to signalize, whether the channel has been started.

_channel_stopped: bool

Status flag to signalize, whether the channel has been stopped. Note that this additional flag is necessary, since having the _channel_started on false does not necessarily mean that the output was ever started and is now finished.

_close_channel()

This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.

Returns:

True, if done, False if it is still in progress

_commit_offset(offset: int) None

This method shall implement the logic that commits the provided offset using the underlying technology.

Parameters:

offset – provided offset to commit

_configuration: dict

This member stores the configuration string of the channel. This is a kind of serialized storage, since the configuration can be an arbitrary type with arbitrary schema as long as the interpretation is knonw.

_configure_channel(configuration: dict)

This method shall implement the logic to interpret the provided configuration.

Parameters:

channel_configuration – config string

_consumer_timeout_ms: int

This member stores the timeout value for the consumer polling.

_context: PortPlugin

The context of this channel, which shall be an object of PortPlugin type

_control_loop_exception_timer: int

Control loop is, where the control information are sent to the counter channel. Should there an exception occur, we need to make sure that we give some time to recover, before terminating the channel. This timer defines that grace period.

_create_resources()

This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.

Returns:

True, if done, False if it is still in progress

_current_read_record_count: int

Stores the record count of the most current read

_current_read_timestamp: int

Stores the timestamp of the most current read to be able to calculate elapsed time

_data_consumer: KafkaConsumer | None

This is a kafka consumer to poll data from the specified data topic.

_data_consumer_properties: dict

Properties for data consumer as expected by the KafkaConsumer.

_data_topic_name: str

This member stores the name of the data topic.

_delete_resources()

This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()

Returns:

True, if done, False if it is still in progress

_executor: concurrent.futures.ThreadPoolExecutor | None

This executor is used to execute the background thread, which will invoke the sendHealthStatus method

_executor_started: bool

Flag to signalize, whether the executor thread started

_executor_stopped: SynchronizedReference[bool]

Flag to signalize that the executor thread has been terminated. This flag is necessary along wi _stopping_executor to synchronize the termination. Otherwise it can happen that the thread did not terminate yet, but the channel has already been closed resulting in exceptions.

_generic_datum_reader: DatumReader | None

This is the generic datum reader, which converts bytes to generic records.

_health_check_payload: dict[str, Any]

It is possible to send additional information along with health check events. This member serves as the storage of this additional payload.

_initial_input_record_offset: int

This is variable stores the initial offset, which can be different than 0 in case of crash-restart. In this case this value is actually the difference by which the framework calculated offset shall be compensated.

_last_offset_committed: int

This value tracks the last committed offset. The main use-case of this variable is to prevent offset commit if the new offset equals the last committed.

_load_input_record_offset() int

This method shall implement the logic to retrieve stored read offset from the underlying technology. This can be arbitrary, but the method shall return the stored offset.

Returns:

stored offset

_location: str | None

This member stores the location string of the channel. The location depends on the technology. It can refer to either a remote or a local URL as well.

_log_level: str

Default log level. This is ok to set here, since if Operator’s context logger is present, its log level cannot be overriden, but if not, the DefaultContextLogger’s can be.

_logger: ContextLogger

Channel logger

_metrics_buffer: list[ChannelMetric]

This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer

_metrics_enabled: bool

Enables metric calculation i.e., to calculate additional i/o related information

_on_status_message_received_callbacks: set[Callable[[list[ChannelStatusMessage]], None]]

Stores the callbacks, which will be executed, if status messages received

_open_channel()

This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.

Returns:

True, if done, False if it is still in progress

_partition_count: int

The number of partitions to be created for the data channel. If group mode, then it shall be 1, since all the channel readers in the group will read all the records sent to the channel. Otherwise, it is the size of the group.

_read_record_count: int

Counter for how many records have been read by this channel. Note that this value can differ from the offset value in case of crash-restart. Still the framework maintains the difference between this value and the loaded offset and will compensate the framework calculated offset by that difference.

_read_record_offset: int

The actual write offset of the read records. Eg. if 10 records have been read and provided to the plugin then the offset is 10. Note that this might differ from the m_inputRecordCount_i64 in case of crash-restart.

_read_records()

This method shall implement the logic to read records from the input channel. An ArrayList of records is expected.

Returns:

list of records OR empty ArrayList if no records read. Null is not accepted.

_reader_status_producer: KafkaProducer | None

Control producer, responsible to produce status messages

_reader_status_producer_properties: dict

Properties for status producer. This is mainly a copy of the properties of the data consumer. The only exception is the group id.

_reader_status_topic_name: str

Input status topic name

_resources_created: bool

Status flag to signalize that the channel’s resources are created

_resources_deleted: bool

Status flag to signalize that the channel’s resources are deleted

_retrieve_status_messages() list | None

This method shall implement the logic that retrieves the status messages published by the counterpart channel. Notice that in case of ChannelWriter, there can be multiple InputChannels sending messages and in case of ChannelReader there can be multiple messages sent by the ChannelWriter, therefore this method shall return a list of messages. Note as well that in case you are using callbacks for your technology, you can directly use the method onStatusMessageReceived. In this case simply return null from this method to ensure that it will be ignored.

Returns:

list of retrieved status messages or null if method is not used

_send_status_message(message)

This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.

Parameters:

message – message that shall be sent

_silent_mode: bool

If this flag is set, then this channel will not send status messages. One use-case is, if a channelRW is created to sniff the status of channels.

_status_map: dict[str, ChannelStatusMonitor]

This map stores the health statuses of all connected channels

_stopping_executor: bool

Flag to signalize the termination attempt of the executor thread

_target_partition: TopicPartition | None

The topic partition that this channel will read

_unique_name: str

This name identifies the channel in its context. It is necessary, since channel name on its own is not unique, can be reused by different owners.

_writer_status_consumer: KafkaConsumer | None

This is a kafka consumer to poll status updates

_writer_status_consumer_properties: dict

Properties for control consumer as expected by the KafkaConsumer. Note that with some extension the complete data consumer properties will be copied.

_writer_status_topic_name: str

This member stores the name of the control topic.

can_close() bool

This method can implement the logic to determine, if the channel can be closed or not.

Returns:

True if the channel can be closed, False otherwise

get_consumer_lag() int
has_records() bool

This method shall implement the logic to check, if the reader can still read records.

Returns:

True if channel has still records, False if not

set_location(channel_location: str)
start_channel(send_status_message: bool = True)

This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been started.

Parameters:

send_status_message – if True the status message sending will be called

stop_channel(send_status_message: bool = True)

This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been stopped.

Parameters:

send_status_message – if True the status message sending will be called

class pypz.plugins.kafka_io.channels.KafkaChannelWriter(channel_name: str, context: OutputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)

Bases: ChannelWriter

_abc_impl = <_abc._abc_data object>
_admin_client: KafkaAdminClient | None

Kafka admin client. Necessary to create/delete topics.

_aggregated_record_count: int

Metric value to store the summed up record count

_aggregated_time_between_outputs: int

Metric value to store the summed up times between reads

_channel_name: str

This member stores the name of the channel, which normally reflects to the resource’s name.

_channel_opened: bool

This member is a flag to signalize, whether a channel is open and ready to use.

_channel_started: bool

Status flag to signalize, whether the channel has been started.

_channel_stopped: bool

Status flag to signalize, whether the channel has been stopped. Note that this additional flag is necessary, since having the _channel_started on false does not necessarily mean that the output was ever started and is now finished.

_close_channel()

This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.

Returns:

True, if done, False if it is still in progress

_configuration: dict

This member stores the configuration string of the channel. This is a kind of serialized storage, since the configuration can be an arbitrary type with arbitrary schema as long as the interpretation is knonw.

_configure_channel(configuration: dict)

This method shall implement the logic to interpret the provided configuration.

Parameters:

channel_configuration – config string

_consumer_timeout_ms: int

This member stores the timeout value for the consumer polling.

_context: PortPlugin

The context of this channel, which shall be an object of PortPlugin type

_control_loop_exception_timer: int

Control loop is, where the control information are sent to the counter channel. Should there an exception occur, we need to make sure that we give some time to recover, before terminating the channel. This timer defines that grace period.

_create_resources()

This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.

Returns:

True, if done, False if it is still in progress

_current_output_record_count: int

Stores the record count of the most current read

_current_output_timestamp: int

Stores the timestamp of the most current read to be able to calculate elapsed time

_data_producer: KafkaProducer | None

Data producer, responsible to produce data received by the plugin

_data_producer_properties: dict

Property collection for data producer

_data_topic_name: str

Data topic name

_delete_resources()

This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()

Returns:

True, if done, False if it is still in progress

_executor: concurrent.futures.ThreadPoolExecutor | None

This executor is used to execute the background thread, which will invoke the sendHealthStatus method

_executor_started: bool

Flag to signalize, whether the executor thread started

_executor_stopped: SynchronizedReference[bool]

Flag to signalize that the executor thread has been terminated. This flag is necessary along wi _stopping_executor to synchronize the termination. Otherwise it can happen that the thread did not terminate yet, but the channel has already been closed resulting in exceptions.

_generic_datum_writer: DatumWriter | None

This is the datum writer that converts generic records to byte data

_health_check_payload: dict[str, Any]

It is possible to send additional information along with health check events. This member serves as the storage of this additional payload.

_location: str | None

This member stores the location string of the channel. The location depends on the technology. It can refer to either a remote or a local URL as well.

_log_level: str

Default log level. This is ok to set here, since if Operator’s context logger is present, its log level cannot be overriden, but if not, the DefaultContextLogger’s can be.

_logger: ContextLogger

Channel logger

_metrics_buffer: list[ChannelMetric]

This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer

_metrics_enabled: bool

Enables metric calculation i.e., to calculate additional i/o related information

_on_status_message_received_callbacks: set[Callable[[list[ChannelStatusMessage]], None]]

Stores the callbacks, which will be executed, if status messages received

_open_channel()

This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.

Returns:

True, if done, False if it is still in progress

_reader_status_consumer: KafkaConsumer | None

Input status consumer for the channel. Reads the status provided by the KafkaInputChannels.

_reader_status_consumer_properties: dict

Properties for status consumer. This is mainly a copy of the properties of the data consumer. The only exception is the group id.

_reader_status_topic_name

Input status topic name

_resources_created: bool

Status flag to signalize that the channel’s resources are created

_resources_deleted: bool

Status flag to signalize that the channel’s resources are deleted

_retrieve_status_messages() list | None

This method shall implement the logic that retrieves the status messages published by the counterpart channel. Notice that in case of ChannelWriter, there can be multiple InputChannels sending messages and in case of ChannelReader there can be multiple messages sent by the ChannelWriter, therefore this method shall return a list of messages. Note as well that in case you are using callbacks for your technology, you can directly use the method onStatusMessageReceived. In this case simply return null from this method to ensure that it will be ignored.

Returns:

list of retrieved status messages or null if method is not used

_round_robin_partition_idx: int

This index ensures that we produce in a true round-robin fashion. It is necessary, since Kafka’s round-robin considers batches of records instead of simple records

_send_status_message(status_string)

This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.

Parameters:

message – message that shall be sent

_silent_mode: bool

If this flag is set, then this channel will not send status messages. One use-case is, if a channelRW is created to sniff the status of channels.

_status_map: dict[str, ChannelStatusMonitor]

This map stores the health statuses of all connected channels

_stopping_executor: bool

Flag to signalize the termination attempt of the executor thread

_target_partition_count: int

The number of partitions of the topic that is created by the channel reader. This value will be read from the Kafka directly.

_unique_name: str

This name identifies the channel in its context. It is necessary, since channel name on its own is not unique, can be reused by different owners.

_write_records(records: list[Any])

This method shall implement the logic that writes the records to the output resource. It will automatically be invoked by the plugin via the corresponding invoker method.

Parameters:

records – list of records to be written

_writer_status_producer: KafkaProducer | None

Output status producer, responsible to produce control signals and schema

_writer_status_producer_properties: dict

Property collection for output status producer

_writer_status_topic_name: str

Output status topic name

_written_record_count: int

Number of outputted records.

can_close() bool

This method can implement the logic to determine, if the channel can be closed or not.

Returns:

True if the channel can be closed, False otherwise

set_location(channel_location: str)
start_channel(send_status_message: bool = True)

This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been started.

Parameters:

send_status_message – if True the status message sending will be called

stop_channel(send_status_message: bool = True)

This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been stopped.

Parameters:

send_status_message – if True the status message sending will be called