pypz.plugins.kafka_io.channels module
- class pypz.plugins.kafka_io.channels.KafkaChannelReader(channel_name: str, context: InputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ChannelReader- DataConsumerTimeoutInMs = 5000
- InitialDataConsumerTimeoutInMs = 10000
- StatusConsumerTimeoutInMs = 1000
- _abc_impl = <_abc._abc_data object>
- _admin_client: KafkaAdminClient | None
Kafka admin client. Necessary to check, whether topic is existing on retrieving 0 records on poll.
- _aggregated_record_count: int
Metric value to store the summed up record count
- _aggregated_time_between_reads: int
Metric value to store the summed up times between reads
- _close_channel()
This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.
- Returns:
True, if done, False if it is still in progress
- _commit_offset(offset: int) None
This method shall implement the logic that commits the provided offset using the underlying technology.
- Parameters:
offset – provided offset to commit
- _configure_channel(configuration: dict)
This method shall implement the logic to interpret the provided configuration.
- Parameters:
channel_configuration – config string
- _consumer_timeout_ms: int
This member stores the timeout value for the consumer polling.
- _create_resources()
This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.
- Returns:
True, if done, False if it is still in progress
- _current_read_record_count: int
Stores the record count of the most current read
- _current_read_timestamp: int
Stores the timestamp of the most current read to be able to calculate elapsed time
- _data_consumer: KafkaConsumer | None
This is a kafka consumer to poll data from the specified data topic.
- _data_consumer_properties: dict
Properties for data consumer as expected by the KafkaConsumer.
- _data_topic_name: str
This member stores the name of the data topic.
- _delete_resources()
This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()
- Returns:
True, if done, False if it is still in progress
- _generic_datum_reader: DatumReader | None
This is the generic datum reader, which converts bytes to generic records.
- _initial_input_record_offset: int
This is variable stores the initial offset, which can be different than 0 in case of crash-restart. In this case this value is actually the difference by which the framework calculated offset shall be compensated.
- _last_offset_committed: int
This value tracks the last committed offset. The main use-case of this variable is to prevent offset commit if the new offset equals the last committed.
- _load_input_record_offset() int
This method shall implement the logic to retrieve stored read offset from the underlying technology. This can be arbitrary, but the method shall return the stored offset.
- Returns:
stored offset
- _metrics_buffer: list[ChannelMetric]
This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer
- _open_channel()
This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.
- Returns:
True, if done, False if it is still in progress
- _partition_count: int
The number of partitions to be created for the data channel. If group mode, then it shall be 1, since all the channel readers in the group will read all the records sent to the channel. Otherwise, it is the size of the group.
- _read_record_count: int
Counter for how many records have been read by this channel. Note that this value can differ from the offset value in case of crash-restart. Still the framework maintains the difference between this value and the loaded offset and will compensate the framework calculated offset by that difference.
- _read_record_offset: int
The actual write offset of the read records. Eg. if 10 records have been read and provided to the plugin then the offset is 10. Note that this might differ from the m_inputRecordCount_i64 in case of crash-restart.
- _read_records()
This method shall implement the logic to read records from the input channel. An ArrayList of records is expected.
- Returns:
list of records OR empty ArrayList if no records read. Null is not accepted.
- _reader_status_producer: KafkaProducer | None
Control producer, responsible to produce status messages
- _reader_status_producer_properties: dict
Properties for status producer. This is mainly a copy of the properties of the data consumer. The only exception is the group id.
- _reader_status_topic_name: str
Input status topic name
- _retrieve_status_messages() list | None
This method shall implement the logic that retrieves the status messages published by the counterpart channel. Notice that in case of ChannelWriter, there can be multiple InputChannels sending messages and in case of ChannelReader there can be multiple messages sent by the ChannelWriter, therefore this method shall return a list of messages. Note as well that in case you are using callbacks for your technology, you can directly use the method onStatusMessageReceived. In this case simply return null from this method to ensure that it will be ignored.
- Returns:
list of retrieved status messages or null if method is not used
- _send_status_message(message)
This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.
- Parameters:
message – message that shall be sent
- _target_partition: TopicPartition | None
The topic partition that this channel will read
- _writer_status_consumer: KafkaConsumer | None
This is a kafka consumer to poll status updates
- _writer_status_consumer_properties: dict
Properties for control consumer as expected by the KafkaConsumer. Note that with some extension the complete data consumer properties will be copied.
- _writer_status_topic_name: str
This member stores the name of the control topic.
- can_close() bool
This method can implement the logic to determine, if the channel can be closed or not.
- Returns:
True if the channel can be closed, False otherwise
- get_consumer_lag() int
- has_records() bool
This method shall implement the logic to check, if the reader can still read records.
- Returns:
True if channel has still records, False if not
- set_location(channel_location: str)
- start_channel(send_status_message: bool = True)
This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been started.
- Parameters:
send_status_message – if True the status message sending will be called
- stop_channel(send_status_message: bool = True)
This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been stopped.
- Parameters:
send_status_message – if True the status message sending will be called
- class pypz.plugins.kafka_io.channels.KafkaChannelWriter(channel_name: str, context: OutputPortPlugin, executor: ThreadPoolExecutor | None = None, **kwargs)
Bases:
ChannelWriter- _abc_impl = <_abc._abc_data object>
- _admin_client: KafkaAdminClient | None
Kafka admin client. Necessary to create/delete topics.
- _aggregated_record_count: int
Metric value to store the summed up record count
- _aggregated_time_between_outputs: int
Metric value to store the summed up times between reads
- _close_channel()
This method shall implement the logic to close a channel. Normally closing a channel is the last step so clean up of used resource shall happen here as well.
- Returns:
True, if done, False if it is still in progress
- _configure_channel(configuration: dict)
This method shall implement the logic to interpret the provided configuration.
- Parameters:
channel_configuration – config string
- _consumer_timeout_ms: int
This member stores the timeout value for the consumer polling.
- _create_resources()
This method shall implement the logic of creating resources of the channel. IMPORTANT NOTE - this method shall be invoked before the open_channel() to make sure that the resources are existing beforehand.
- Returns:
True, if done, False if it is still in progress
- _current_output_record_count: int
Stores the record count of the most current read
- _current_output_timestamp: int
Stores the timestamp of the most current read to be able to calculate elapsed time
- _data_producer: KafkaProducer | None
Data producer, responsible to produce data received by the plugin
- _data_producer_properties: dict
Property collection for data producer
- _data_topic_name: str
Data topic name
- _delete_resources()
This method shall implement the logic of deleting resources of the channel. IMPORTANT NOTE - this method shall be invoked after the close_channel()
- Returns:
True, if done, False if it is still in progress
- _generic_datum_writer: DatumWriter | None
This is the datum writer that converts generic records to byte data
- _metrics_buffer: list[ChannelMetric]
This list holds metric elements, however the only purpose is to be aware of the first element to discard like in a circular buffer
- _open_channel()
This method shall implement the logic to open a channel. The meaning of ‘open’ however is to be defined by the actual implementation. One developer can define it like an opened connection, other as created file etc.
- Returns:
True, if done, False if it is still in progress
- _reader_status_consumer: KafkaConsumer | None
Input status consumer for the channel. Reads the status provided by the KafkaInputChannels.
- _reader_status_consumer_properties: dict
Properties for status consumer. This is mainly a copy of the properties of the data consumer. The only exception is the group id.
- _reader_status_topic_name
Input status topic name
- _retrieve_status_messages() list | None
This method shall implement the logic that retrieves the status messages published by the counterpart channel. Notice that in case of ChannelWriter, there can be multiple InputChannels sending messages and in case of ChannelReader there can be multiple messages sent by the ChannelWriter, therefore this method shall return a list of messages. Note as well that in case you are using callbacks for your technology, you can directly use the method onStatusMessageReceived. In this case simply return null from this method to ensure that it will be ignored.
- Returns:
list of retrieved status messages or null if method is not used
- _round_robin_partition_idx: int
This index ensures that we produce in a true round-robin fashion. It is necessary, since Kafka’s round-robin considers batches of records instead of simple records
- _send_status_message(status_string)
This method shall implement the logic that publishes the channel’s state to the counterpart channel. The state string is provided by the channel itself. Note that there is defined schema, how and what will be provided by the channel as string, however you can append your own custom information, you only need to append as string separated by StateMessageSeparatorChar.
- Parameters:
message – message that shall be sent
- _target_partition_count: int
The number of partitions of the topic that is created by the channel reader. This value will be read from the Kafka directly.
- _write_records(records: list[Any])
This method shall implement the logic that writes the records to the output resource. It will automatically be invoked by the plugin via the corresponding invoker method.
- Parameters:
records – list of records to be written
- _writer_status_producer: KafkaProducer | None
Output status producer, responsible to produce control signals and schema
- _writer_status_producer_properties: dict
Property collection for output status producer
- _writer_status_topic_name: str
Output status topic name
- _written_record_count: int
Number of outputted records.
- can_close() bool
This method can implement the logic to determine, if the channel can be closed or not.
- Returns:
True if the channel can be closed, False otherwise
- set_location(channel_location: str)
- start_channel(send_status_message: bool = True)
This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been started.
- Parameters:
send_status_message – if True the status message sending will be called
- stop_channel(send_status_message: bool = True)
This method sets the corresponding flags and sends the corresponding status message to signalize that the channel has been stopped.
- Parameters:
send_status_message – if True the status message sending will be called