kafka.consumer package¶
Submodules¶
kafka.consumer.base module¶
-
class
kafka.consumer.base.
Consumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000)[source]¶ Bases:
object
Base class to be used by other consumers. Not to be used directly
This base class provides logic for
- initialization and fetching metadata of partitions
- Auto-commit logic
- APIs for fetching pending message count
-
commit
(partitions=None)[source]¶ Commit stored offsets to Kafka via OffsetCommitRequest (v0)
- Keyword Arguments:
- partitions (list): list of partitions to commit, default is to commit
- all of them
Returns: True on success, False on failure
kafka.consumer.kafka module¶
-
class
kafka.consumer.kafka.
KafkaConsumer
(*topics, **configs)[source]¶ Bases:
object
A simpler kafka consumer
-
DEFAULT_CONFIG
= {'fetch_message_max_bytes': 1048576, 'group_id': None, 'consumer_timeout_ms': -1, 'auto_commit_interval_messages': None, 'auto_commit_interval_ms': 60000, 'refresh_leader_backoff_ms': 200, 'deserializer_class': <function <lambda> at 0x7fdd1ee5c668>, 'rebalance_max_retries': 4, 'auto_commit_enable': False, 'rebalance_backoff_ms': 2000, 'queued_max_message_chunks': 10, 'default_fetcher_backoff_ms': 1000, 'client_id': 'kafka.consumer.kafka', 'fetch_wait_max_ms': 100, 'auto_offset_reset': 'largest', 'bootstrap_servers': [], 'socket_timeout_ms': 30000, 'socket_receive_buffer_bytes': 65536, 'fetch_min_bytes': 1, 'num_consumer_fetchers': 1}¶
-
commit
()[source]¶ Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
- Returns:
- True on success, or False if no offsets were found for commit
- Note:
- this functionality requires server version >=0.8.1.1 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-
configure
(**configs)[source]¶ Configure the consumer instance
Configuration settings can be passed to constructor, otherwise defaults will be used:
- Keyword Arguments:
- bootstrap_servers (list): List of initial broker nodes the consumer
- should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.
- client_id (str): a unique name for this client. Defaults to
- ‘kafka.consumer.kafka’.
- group_id (str): the name of the consumer group to join,
- Offsets are fetched / committed to this group name.
- fetch_message_max_bytes (int, optional): Maximum bytes for each
- topic/partition fetch request. Defaults to 1024*1024.
- fetch_min_bytes (int, optional): Minimum amount of data the server
- should return for a fetch request, otherwise wait up to fetch_wait_max_ms for more data to accumulate. Defaults to 1.
- fetch_wait_max_ms (int, optional): Maximum time for the server to
- block waiting for fetch_min_bytes messages to accumulate. Defaults to 100.
- refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
- when refreshing metadata on errors (subject to random jitter). Defaults to 200.
- socket_timeout_ms (int, optional): TCP socket timeout in
- milliseconds. Defaults to 30*1000.
- auto_offset_reset (str, optional): A policy for resetting offsets on
- OffsetOutOfRange errors. ‘smallest’ will move to the oldest available message, ‘largest’ will move to the most recent. Any ofther value will raise the exception. Defaults to ‘largest’.
- deserializer_class (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value. Defaults to
- lambda msg: msg.
- auto_commit_enable (bool, optional): Enabling auto-commit will cause
- the KafkaConsumer to periodically commit offsets without an explicit call to commit(). Defaults to False.
- auto_commit_interval_ms (int, optional): If auto_commit_enabled,
- the milliseconds between automatic offset commits. Defaults to 60 * 1000.
- auto_commit_interval_messages (int, optional): If
- auto_commit_enabled, a number of messages consumed between automatic offset commits. Defaults to None (disabled).
- consumer_timeout_ms (int, optional): number of millisecond to throw
- a timeout exception to the consumer if no message is available for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
fetch_messages
()[source]¶ Sends FetchRequests for all topic/partitions set for consumption
- Returns:
- Generator that yields KafkaMessage structs after deserializing with the configured deserializer_class
- Note:
- Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
- See Also:
- Key KafkaConsumer configuration parameters: * fetch_message_max_bytes * fetch_max_wait_ms * fetch_min_bytes * deserializer_class * auto_offset_reset
-
get_partition_offsets
(topic, partition, request_time_ms, max_num_offsets)[source]¶ Request available fetch offsets for a single topic/partition
- Keyword Arguments:
topic (str): topic for offset request partition (int): partition for offset request request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.max_num_offsets (int): Maximum offsets to include in the OffsetResponse
- Returns:
- a list of offsets in the OffsetResponse submitted for the provided topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-
next
()[source]¶ Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
- Returns:
- a single KafkaMessage from the message iterator
- Raises:
- ConsumerTimeout after consumer_timeout_ms and no message
- Note:
- This is also the method called internally during iteration
-
offsets
(group=None)[source]¶ Get internal consumer offset values
- Keyword Arguments:
- group: Either “fetch”, “commit”, “task_done”, or “highwater”.
- If no group specified, returns all groups.
- Returns:
- A copy of internal offsets struct
-
set_topic_partitions
(*topics)[source]¶ Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
-
task_done
(message)[source]¶ Mark a fetched message as consumed.
Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()
- Arguments:
- message (KafkaMessage): the message to mark as complete
- Returns:
- True, unless the topic-partition for this message has not been configured for the consumer. In normal operation, this should not happen. But see github issue 364.
-
kafka.consumer.multiprocess module¶
-
class
kafka.consumer.multiprocess.
Events
(start, pause, exit)¶ Bases:
tuple
-
exit
¶ Alias for field number 2
-
pause
¶ Alias for field number 1
-
start
¶ Alias for field number 0
-
-
class
kafka.consumer.multiprocess.
MultiProcessConsumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0, **simple_consumer_options)[source]¶ Bases:
kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
- Arguments:
client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to Nonetopic: the topic to consume
- Keyword Arguments:
partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume
before a commit- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- num_procs: Number of processes to start for consuming messages.
- The available partitions will be divided among these processes
- partitions_per_proc: Number of partitions to be allocated per process
- (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=10)[source]¶ Fetch the specified number of messages
- Keyword Arguments:
count: Indicates the maximum number of messages to be fetched block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that many messages are fetched.- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is fetched. If None, it will block forever.
kafka.consumer.simple module¶
-
class
kafka.consumer.simple.
FetchContext
(consumer, block, timeout)[source]¶ Bases:
object
Class for managing the state of a consumer during fetch
-
class
kafka.consumer.simple.
SimpleConsumer
(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None, auto_offset_reset='largest')[source]¶ Bases:
kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
- Arguments:
client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to Nonetopic: the topic to consume
- Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no timeout, so it will wait forever.
- auto_offset_reset: default largest. Reset partition offsets upon
- OffsetOutOfRangeError. Valid values are largest and smallest. Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=0.1)[source]¶ Fetch the specified number of messages
- Keyword Arguments:
count: Indicates the maximum number of messages to be fetched block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that many messages are fetched.- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
reset_partition_offset
(partition)[source]¶ Update offsets using auto_offset_reset policy (smallest|largest)
- Arguments:
- partition (int): the partition for which offsets should be updated
Returns: Updated offset on success, None on failure
-
seek
(offset, whence=None, partition=None)[source]¶ Alter the current offset in the consumer, similar to fseek
- Arguments:
offset: how much to modify the offset whence: where to modify it from, default is None
- None is an absolute offset
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
- partition: modify which partition, default is None.
- If partition is None, would modify all partitions.
Module contents¶
-
class
kafka.consumer.
SimpleConsumer
(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None, auto_offset_reset='largest')[source]¶ Bases:
kafka.consumer.base.Consumer
A simple consumer implementation that consumes all/specified partitions for a topic
- Arguments:
client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to Nonetopic: the topic to consume
- Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no timeout, so it will wait forever.
- auto_offset_reset: default largest. Reset partition offsets upon
- OffsetOutOfRangeError. Valid values are largest and smallest. Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=0.1)[source]¶ Fetch the specified number of messages
- Keyword Arguments:
count: Indicates the maximum number of messages to be fetched block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that many messages are fetched.- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
reset_partition_offset
(partition)[source]¶ Update offsets using auto_offset_reset policy (smallest|largest)
- Arguments:
- partition (int): the partition for which offsets should be updated
Returns: Updated offset on success, None on failure
-
seek
(offset, whence=None, partition=None)[source]¶ Alter the current offset in the consumer, similar to fseek
- Arguments:
offset: how much to modify the offset whence: where to modify it from, default is None
- None is an absolute offset
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
- partition: modify which partition, default is None.
- If partition is None, would modify all partitions.
-
class
kafka.consumer.
MultiProcessConsumer
(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0, **simple_consumer_options)[source]¶ Bases:
kafka.consumer.base.Consumer
A consumer implementation that consumes partitions for a topic in parallel using multiple processes
- Arguments:
client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you must set this to Nonetopic: the topic to consume
- Keyword Arguments:
partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume
before a commit- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- num_procs: Number of processes to start for consuming messages.
- The available partitions will be divided among these processes
- partitions_per_proc: Number of partitions to be allocated per process
- (overrides num_procs)
Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers
-
get_messages
(count=1, block=True, timeout=10)[source]¶ Fetch the specified number of messages
- Keyword Arguments:
count: Indicates the maximum number of messages to be fetched block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that many messages are fetched.- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is fetched. If None, it will block forever.
-
class
kafka.consumer.
KafkaConsumer
(*topics, **configs)[source]¶ Bases:
object
A simpler kafka consumer
-
DEFAULT_CONFIG
= {'fetch_message_max_bytes': 1048576, 'group_id': None, 'consumer_timeout_ms': -1, 'auto_commit_interval_messages': None, 'auto_commit_interval_ms': 60000, 'refresh_leader_backoff_ms': 200, 'deserializer_class': <function <lambda> at 0x7fdd1ee5c668>, 'rebalance_max_retries': 4, 'auto_commit_enable': False, 'rebalance_backoff_ms': 2000, 'queued_max_message_chunks': 10, 'default_fetcher_backoff_ms': 1000, 'client_id': 'kafka.consumer.kafka', 'fetch_wait_max_ms': 100, 'auto_offset_reset': 'largest', 'bootstrap_servers': [], 'socket_timeout_ms': 30000, 'socket_receive_buffer_bytes': 65536, 'fetch_min_bytes': 1, 'num_consumer_fetchers': 1}¶
-
commit
()[source]¶ Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.
- Returns:
- True on success, or False if no offsets were found for commit
- Note:
- this functionality requires server version >=0.8.1.1 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-
configure
(**configs)[source]¶ Configure the consumer instance
Configuration settings can be passed to constructor, otherwise defaults will be used:
- Keyword Arguments:
- bootstrap_servers (list): List of initial broker nodes the consumer
- should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request.
- client_id (str): a unique name for this client. Defaults to
- ‘kafka.consumer.kafka’.
- group_id (str): the name of the consumer group to join,
- Offsets are fetched / committed to this group name.
- fetch_message_max_bytes (int, optional): Maximum bytes for each
- topic/partition fetch request. Defaults to 1024*1024.
- fetch_min_bytes (int, optional): Minimum amount of data the server
- should return for a fetch request, otherwise wait up to fetch_wait_max_ms for more data to accumulate. Defaults to 1.
- fetch_wait_max_ms (int, optional): Maximum time for the server to
- block waiting for fetch_min_bytes messages to accumulate. Defaults to 100.
- refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
- when refreshing metadata on errors (subject to random jitter). Defaults to 200.
- socket_timeout_ms (int, optional): TCP socket timeout in
- milliseconds. Defaults to 30*1000.
- auto_offset_reset (str, optional): A policy for resetting offsets on
- OffsetOutOfRange errors. ‘smallest’ will move to the oldest available message, ‘largest’ will move to the most recent. Any ofther value will raise the exception. Defaults to ‘largest’.
- deserializer_class (callable, optional): Any callable that takes a
- raw message value and returns a deserialized value. Defaults to
- lambda msg: msg.
- auto_commit_enable (bool, optional): Enabling auto-commit will cause
- the KafkaConsumer to periodically commit offsets without an explicit call to commit(). Defaults to False.
- auto_commit_interval_ms (int, optional): If auto_commit_enabled,
- the milliseconds between automatic offset commits. Defaults to 60 * 1000.
- auto_commit_interval_messages (int, optional): If
- auto_commit_enabled, a number of messages consumed between automatic offset commits. Defaults to None (disabled).
- consumer_timeout_ms (int, optional): number of millisecond to throw
- a timeout exception to the consumer if no message is available for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
fetch_messages
()[source]¶ Sends FetchRequests for all topic/partitions set for consumption
- Returns:
- Generator that yields KafkaMessage structs after deserializing with the configured deserializer_class
- Note:
- Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy
- See Also:
- Key KafkaConsumer configuration parameters: * fetch_message_max_bytes * fetch_max_wait_ms * fetch_min_bytes * deserializer_class * auto_offset_reset
-
get_partition_offsets
(topic, partition, request_time_ms, max_num_offsets)[source]¶ Request available fetch offsets for a single topic/partition
- Keyword Arguments:
topic (str): topic for offset request partition (int): partition for offset request request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.max_num_offsets (int): Maximum offsets to include in the OffsetResponse
- Returns:
- a list of offsets in the OffsetResponse submitted for the provided topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-
next
()[source]¶ Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
- Returns:
- a single KafkaMessage from the message iterator
- Raises:
- ConsumerTimeout after consumer_timeout_ms and no message
- Note:
- This is also the method called internally during iteration
-
offsets
(group=None)[source]¶ Get internal consumer offset values
- Keyword Arguments:
- group: Either “fetch”, “commit”, “task_done”, or “highwater”.
- If no group specified, returns all groups.
- Returns:
- A copy of internal offsets struct
-
set_topic_partitions
(*topics)[source]¶ Set the topic/partitions to consume Optionally specify offsets to start from
Accepts types:
str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
- dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
Example:
kafka = KafkaConsumer() # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
-
task_done
(message)[source]¶ Mark a fetched message as consumed.
Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()
- Arguments:
- message (KafkaMessage): the message to mark as complete
- Returns:
- True, unless the topic-partition for this message has not been configured for the consumer. In normal operation, this should not happen. But see github issue 364.
-