Kafka Consumers: Consumer Groups vs. Standalone Consumers

Introduction

Apache Kafka provides two primary consumption patterns: Consumer Groups and Standalone Consumers. Understanding when and how to use each pattern is crucial for building scalable, fault-tolerant streaming applications.

🎯 Interview Insight: Interviewers often ask: “When would you choose consumer groups over standalone consumers?” The key is understanding that consumer groups provide automatic load balancing and fault tolerance, while standalone consumers offer more control but require manual management.

Consumer Groups Deep Dive

What are Consumer Groups?

Consumer groups enable multiple consumer instances to work together to consume messages from a topic. Each message is delivered to only one consumer instance within the group, providing natural load balancing.


graph TD
A[Topic: orders] --> B[Partition 0]
A --> C[Partition 1] 
A --> D[Partition 2]
A --> E[Partition 3]

B --> F[Consumer 1<br/>Group: order-processors]
C --> F
D --> G[Consumer 2<br/>Group: order-processors]
E --> G

style F fill:#e1f5fe
style G fill:#e1f5fe

Key Characteristics

1. Automatic Partition Assignment

  • Kafka automatically assigns partitions to consumers within a group
  • Uses configurable assignment strategies (Range, RoundRobin, Sticky, Cooperative Sticky)
  • Handles consumer failures gracefully through rebalancing

2. Offset Management

  • Group coordinator manages offset commits
  • Provides exactly-once or at-least-once delivery guarantees
  • Automatic offset commits can be enabled for convenience

Consumer Group Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-processing-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Assignment strategy - crucial for performance
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

// Offset management
props.put("enable.auto.commit", "false"); // Manual commit for reliability
props.put("auto.offset.reset", "earliest");

// Session and heartbeat configuration
props.put("session.timeout.ms", "30000");
props.put("heartbeat.interval.ms", "3000");
props.put("max.poll.interval.ms", "300000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders", "payments"));

🎯 Interview Insight: Common question: “What happens if a consumer in a group fails?” Answer should cover: immediate detection via heartbeat mechanism, partition reassignment to healthy consumers, and the role of session.timeout.ms in failure detection speed.

Assignment Strategies

Range Assignment (Default)


graph LR
subgraph "Topic: orders (6 partitions)"
    P0[P0] 
    P1[P1]
    P2[P2]
    P3[P3]
    P4[P4]
    P5[P5]
end

subgraph "Consumer Group"
    C1[Consumer 1]
    C2[Consumer 2]
    C3[Consumer 3]
end

P0 --> C1
P1 --> C1
P2 --> C2
P3 --> C2
P4 --> C3
P5 --> C3

  • Minimizes partition reassignments during rebalancing
  • Maintains consumer-to-partition affinity when possible
  • Reduces processing interruptions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Best practice implementation with Cooperative Sticky
public class OptimizedConsumerGroup {

public void startConsumption() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

// Process records in batches for efficiency
Map<TopicPartition, List<ConsumerRecord<String, String>>> partitionRecords
= records.partitions().stream()
.collect(Collectors.toMap(
partition -> partition,
partition -> records.records(partition)
));

for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> entry :
partitionRecords.entrySet()) {

processPartitionBatch(entry.getKey(), entry.getValue());

// Commit offsets per partition for better fault tolerance
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(entry.getKey(),
new OffsetAndMetadata(
entry.getValue().get(entry.getValue().size() - 1).offset() + 1));
consumer.commitSync(offsets);
}
}
}
}

Consumer Group Rebalancing


sequenceDiagram
participant C1 as Consumer1
participant C2 as Consumer2
participant GC as GroupCoordinator
participant C3 as Consumer3New

Note over C1,C2: Normal Processing
C3->>GC: Join Group Request
GC->>C1: Rebalance Notification
GC->>C2: Rebalance Notification

C1->>GC: Leave Group - stop processing
C2->>GC: Leave Group - stop processing

GC->>C1: New Assignment P0 and P1
GC->>C2: New Assignment P2 and P3
GC->>C3: New Assignment P4 and P5

Note over C1,C3: Resume Processing with New Assignments

🎯 Interview Insight: Key question: “How do you minimize rebalancing impact?” Best practices include: using cooperative rebalancing, proper session timeout configuration, avoiding long-running message processing, and implementing graceful shutdown.

Standalone Consumers

When to Use Standalone Consumers

Standalone consumers assign partitions manually and don’t participate in consumer groups. They’re ideal when you need:

  • Precise partition control: Processing specific partitions with custom logic
  • No automatic rebalancing: When you want to manage partition assignment manually
  • Custom offset management: Storing offsets in external systems
  • Simple scenarios: Single consumer applications

Implementation Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class StandaloneConsumerExample {

public void consumeWithManualAssignment() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// Note: No group.id for standalone consumer
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Manual partition assignment
TopicPartition partition0 = new TopicPartition("orders", 0);
TopicPartition partition1 = new TopicPartition("orders", 1);
consumer.assign(Arrays.asList(partition0, partition1));

// Seek to specific offset if needed
consumer.seekToBeginning(Arrays.asList(partition0, partition1));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record : records) {
processRecord(record);

// Manual offset management
storeOffsetInExternalSystem(record.topic(), record.partition(), record.offset());
}
}
}
}

Custom Offset Storage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CustomOffsetManager {
private final JdbcTemplate jdbcTemplate;

public void storeOffset(String topic, int partition, long offset) {
String sql = """
INSERT INTO consumer_offsets (topic, partition, offset, updated_at)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE offset = ?, updated_at = ?
""";

Timestamp now = new Timestamp(System.currentTimeMillis());
jdbcTemplate.update(sql, topic, partition, offset, now, offset, now);
}

public long getStoredOffset(String topic, int partition) {
String sql = "SELECT offset FROM consumer_offsets WHERE topic = ? AND partition = ?";
return jdbcTemplate.queryForObject(sql, Long.class, topic, partition);
}
}

🎯 Interview Insight: Interviewers may ask: “What are the trade-offs of using standalone consumers?” Key points: more control but more complexity, manual fault tolerance, no automatic load balancing, and the need for custom monitoring.

Comparison and Use Cases

Feature Comparison Matrix

Feature Consumer Groups Standalone Consumers
Partition Assignment Automatic Manual
Load Balancing Built-in Manual implementation
Fault Tolerance Automatic rebalancing Manual handling required
Offset Management Kafka-managed Custom implementation
Scalability Horizontal scaling Limited scaling
Complexity Lower Higher
Control Limited Full control

Decision Flow Chart


flowchart TD
A[Need to consume from Kafka?] --> B{Multiple consumers needed?}
B -->|Yes| C{Need automatic load balancing?}
B -->|No| D[Consider Standalone Consumer]

C -->|Yes| E[Use Consumer Groups]
C -->|No| F{Need custom partition logic?}

F -->|Yes| D
F -->|No| E

D --> G{Custom offset storage needed?}
G -->|Yes| H[Implement custom offset management]
G -->|No| I[Use Kafka offset storage]

E --> J[Configure appropriate assignment strategy]

style E fill:#c8e6c9
style D fill:#ffecb3

Use Case Examples

Consumer Groups - Best For:

1
2
3
4
5
6
7
8
9
10
11
12
13
// E-commerce order processing with multiple workers
@Service
public class OrderProcessingService {

@KafkaListener(topics = "orders", groupId = "order-processors")
public void processOrder(OrderEvent order) {
// Automatic load balancing across multiple instances
validateOrder(order);
updateInventory(order);
processPayment(order);
sendConfirmation(order);
}
}

Standalone Consumers - Best For:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Data archival service processing specific partitions
@Service
public class DataArchivalService {

public void archivePartitionData(int partitionId) {
// Process only specific partitions for compliance
TopicPartition partition = new TopicPartition("user-events", partitionId);
consumer.assign(Collections.singletonList(partition));

// Custom offset management for compliance tracking
long lastArchivedOffset = getLastArchivedOffset(partitionId);
consumer.seek(partition, lastArchivedOffset + 1);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
archiveToComplianceSystem(records);
updateArchivedOffset(partitionId, getLastOffset(records));
}
}
}

Offset Management

Automatic vs Manual Offset Commits


graph TD
A[Offset Management Strategies] --> B[Automatic Commits]
A --> C[Manual Commits]

B --> D[enable.auto.commit=true]
B --> E[Pros: Simple, Less code]
B --> F[Cons: Potential message loss, Duplicates]

C --> G[Synchronous Commits]
C --> H[Asynchronous Commits]
C --> I[Batch Commits]

G --> J[commitSync]
H --> K[commitAsync]
I --> L[Commit after batch processing]

style G fill:#c8e6c9
style I fill:#c8e6c9

Best Practice: Manual Offset Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RobustConsumerImplementation {

public void consumeWithReliableOffsetManagement() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

// Process records in order
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);

// Commit immediately after successful processing
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);

consumer.commitSync(offsets);

} catch (Exception e) {
log.error("Failed to process record at offset {}", record.offset(), e);
// Implement retry logic or dead letter queue
handleProcessingFailure(record, e);
}
}
}
} catch (Exception e) {
log.error("Consumer error", e);
} finally {
consumer.close();
}
}
}

🎯 Interview Insight: Critical question: “How do you handle exactly-once processing?” Key concepts: idempotent processing, transactional producers/consumers, and the importance of offset management in achieving exactly-once semantics.

Rebalancing Mechanisms

Types of Rebalancing


graph TB
A[Rebalancing Triggers] --> B[Consumer Join/Leave]
A --> C[Partition Count Change]  
A --> D[Consumer Failure]
A --> E[Configuration Change]

B --> F[Cooperative Rebalancing]
B --> G[Eager Rebalancing]

F --> H[Incremental Assignment]
F --> I[Minimal Disruption]

G --> J[Stop-the-world]
G --> K[All Partitions Reassigned]

style F fill:#c8e6c9
style H fill:#c8e6c9
style I fill:#c8e6c9

Minimizing Rebalancing Impact

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class OptimalConsumerConfiguration {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();

// Rebalancing optimization
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());

// Heartbeat configuration
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");

// Processing optimization
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");

return new DefaultKafkaConsumerFactory<>(props);
}
}

Rebalancing Listener Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RebalanceAwareConsumer implements ConsumerRebalanceListener {

private final KafkaConsumer<String, String> consumer;
private final Map<TopicPartition, Long> currentOffsets = new HashMap<>();

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("Partitions revoked: {}", partitions);

// Commit current offsets before losing partitions
commitCurrentOffsets();

// Gracefully finish processing current batch
finishCurrentProcessing();
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions assigned: {}", partitions);

// Initialize any partition-specific resources
initializePartitionResources(partitions);

// Seek to appropriate starting position if needed
seekToDesiredPosition(partitions);
}

private void commitCurrentOffsets() {
if (!currentOffsets.isEmpty()) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
currentOffsets.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new OffsetAndMetadata(entry.getValue() + 1)
));

try {
consumer.commitSync(offsetsToCommit);
log.info("Committed offsets: {}", offsetsToCommit);
} catch (Exception e) {
log.error("Failed to commit offsets during rebalance", e);
}
}
}
}

🎯 Interview Insight: Scenario-based question: “Your consumer group is experiencing frequent rebalancing. How would you troubleshoot?” Look for: session timeout analysis, processing time optimization, network issues investigation, and proper rebalance listener implementation.

Performance Optimization

Consumer Configuration Tuning

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class HighPerformanceConsumerConfig {

public Properties getOptimizedConsumerProperties() {
Properties props = new Properties();

// Network optimization
props.put("fetch.min.bytes", "50000"); // Batch fetching
props.put("fetch.max.wait.ms", "500"); // Reduce latency
props.put("max.partition.fetch.bytes", "1048576"); // 1MB per partition

// Processing optimization
props.put("max.poll.records", "1000"); // Larger batches
props.put("max.poll.interval.ms", "600000"); // 10 minutes

// Memory optimization
props.put("receive.buffer.bytes", "65536"); // 64KB
props.put("send.buffer.bytes", "131072"); // 128KB

return props;
}
}

Parallel Processing Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Service
public class ParallelProcessingConsumer {

private final ExecutorService processingPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

public void consumeWithParallelProcessing() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

if (!records.isEmpty()) {
// Group records by partition to maintain order within partition
Map<TopicPartition, List<ConsumerRecord<String, String>>> partitionGroups =
records.partitions().stream()
.collect(Collectors.toMap(
Function.identity(),
partition -> records.records(partition)
));

List<CompletableFuture<Void>> futures = partitionGroups.entrySet().stream()
.map(entry -> CompletableFuture.runAsync(
() -> processPartitionRecords(entry.getKey(), entry.getValue()),
processingPool
))
.collect(Collectors.toList());

// Wait for all partitions to complete processing
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> commitOffsetsAfterProcessing(partitionGroups))
.join();
}
}
}

private void processPartitionRecords(TopicPartition partition,
List<ConsumerRecord<String, String>> records) {
// Process records from single partition sequentially to maintain order
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}
}

Monitoring and Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
public class ConsumerMetricsCollector {

private final MeterRegistry meterRegistry;
private final Timer processingTimer;
private final Counter processedRecords;
private final Gauge lagGauge;

public ConsumerMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.processingTimer = Timer.builder("kafka.consumer.processing.time")
.register(meterRegistry);
this.processedRecords = Counter.builder("kafka.consumer.records.processed")
.register(meterRegistry);
}

public void recordProcessingMetrics(ConsumerRecord<String, String> record,
Duration processingTime) {
processingTimer.record(processingTime);
processedRecords.increment();

// Record lag metrics
long currentLag = System.currentTimeMillis() - record.timestamp();
Gauge.builder("kafka.consumer.lag.ms")
.tag("topic", record.topic())
.tag("partition", String.valueOf(record.partition()))
.register(meterRegistry, () -> currentLag);
}
}

🎯 Interview Insight: Performance question: “How do you measure and optimize consumer performance?” Key metrics: consumer lag, processing rate, rebalancing frequency, and memory usage. Tools: JMX metrics, Kafka Manager, and custom monitoring.

Troubleshooting Common Issues

Consumer Lag Investigation


flowchart TD
A[High Consumer Lag Detected] --> B{Check Consumer Health}
B -->|Healthy| C[Analyze Processing Time]
B -->|Unhealthy| D[Check Resource Usage]

C --> E{Processing Time > Poll Interval?}
E -->|Yes| F[Optimize Processing Logic]
E -->|No| G[Check Partition Distribution]

D --> H[CPU/Memory Issues?]
H -->|Yes| I[Scale Resources]
H -->|No| J[Check Network Connectivity]

F --> K[Increase max.poll.interval.ms]
F --> L[Implement Async Processing]
F --> M[Reduce max.poll.records]

G --> N[Rebalance Consumer Group]
G --> O[Add More Consumers]

Common Issues and Solutions

1. Rebalancing Loops

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Problem: Frequent rebalancing due to long processing
public class ProblematicConsumer {
@KafkaListener(topics = "slow-topic")
public void processSlowly(String message) {
// This takes too long - causes rebalancing
Thread.sleep(60000); // 1 minute processing
}
}

// Solution: Optimize processing or increase timeouts
public class OptimizedConsumer {

@KafkaListener(topics = "slow-topic",
containerFactory = "optimizedKafkaListenerContainerFactory")
public void processEfficiently(String message) {
// Process quickly or use async processing
CompletableFuture.runAsync(() -> {
performLongRunningTask(message);
});
}
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
optimizedKafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();

// Increase timeouts to prevent rebalancing
factory.getContainerProperties().setPollTimeout(Duration.ofSeconds(30));
factory.getContainerProperties().setMaxPollInterval(Duration.ofMinutes(10));

return factory;
}

2. Memory Issues with Large Messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MemoryOptimizedConsumer {

public void consumeWithMemoryManagement() {
// Limit fetch size to prevent OOM
Properties props = new Properties();
props.put("max.partition.fetch.bytes", "1048576"); // 1MB limit
props.put("max.poll.records", "100"); // Process smaller batches

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

// Process and release memory promptly
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
// Clear references to help GC
record = null;
}

// Explicit GC hint for large message processing
if (records.count() > 50) {
System.gc();
}
}
}
}

3. Handling Consumer Failures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class ResilientConsumer {

private static final int MAX_RETRIES = 3;
private final RetryTemplate retryTemplate;

public ResilientConsumer() {
this.retryTemplate = RetryTemplate.builder()
.maxAttempts(MAX_RETRIES)
.exponentialBackoff(1000, 2, 10000)
.retryOn(TransientException.class)
.build();
}

@KafkaListener(topics = "orders")
public void processWithRetry(ConsumerRecord<String, String> record) {
try {
retryTemplate.execute(context -> {
processRecord(record);
return null;
});
} catch (Exception e) {
// Send to dead letter queue after max retries
sendToDeadLetterQueue(record, e);
}
}

private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception error) {
DeadLetterRecord dlq = DeadLetterRecord.builder()
.originalTopic(record.topic())
.originalPartition(record.partition())
.originalOffset(record.offset())
.payload(record.value())
.error(error.getMessage())
.timestamp(Instant.now())
.build();

kafkaTemplate.send("dead-letter-topic", dlq);
}
}

🎯 Interview Insight: Troubleshooting question: “A consumer group stops processing messages. Walk me through your debugging approach.” Expected steps: check consumer logs, verify group coordination, examine partition assignments, monitor resource usage, and validate network connectivity.

Best Practices Summary

Consumer Groups Best Practices

  1. Use Cooperative Sticky Assignment

    1
    2
    props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
  2. Implement Proper Error Handling

    1
    2
    3
    4
    5
    6
    @RetryableTopic(attempts = "3", 
    backoff = @Backoff(delay = 1000, multiplier = 2))
    @KafkaListener(topics = "orders")
    public void processOrder(Order order) {
    // Processing logic with automatic retry
    }
  3. Monitor Consumer Lag

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Scheduled(fixedRate = 30000)
    public void monitorConsumerLag() {
    AdminClient adminClient = AdminClient.create(adminProps);

    // Check lag for all consumer groups
    Map<String, ConsumerGroupDescription> groups =
    adminClient.describeConsumerGroups(groupIds).all().get();

    groups.forEach((groupId, description) -> {
    // Calculate and alert on high lag
    checkLagThresholds(groupId, description);
    });
    }

Standalone Consumer Best Practices

  1. Implement Custom Offset Management
  2. Handle Partition Changes Gracefully
  3. Monitor Processing Health
  4. Implement Circuit Breakers

Universal Best Practices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class UniversalBestPractices {

// 1. Always close consumers properly
@PreDestroy
public void cleanup() {
consumer.close(Duration.ofSeconds(30));
}

// 2. Use appropriate serialization
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

// 3. Configure timeouts appropriately
props.put("request.timeout.ms", "30000");
props.put("session.timeout.ms", "10000");

// 4. Enable security when needed
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
}

🎯 Interview Insight: Final synthesis question: “Design a robust consumer architecture for a high-throughput e-commerce platform.” Look for: proper consumer group strategy, error handling, monitoring, scaling considerations, and failure recovery mechanisms.

Key Takeaways

  • Consumer Groups: Best for distributed processing with automatic load balancing
  • Standalone Consumers: Best for precise control and custom logic requirements
  • Offset Management: Critical for exactly-once or at-least-once processing guarantees
  • Rebalancing: Minimize impact through proper configuration and cooperative assignment
  • Monitoring: Essential for maintaining healthy consumer performance
  • Error Handling: Implement retries, dead letter queues, and circuit breakers

Choose the right pattern based on your specific requirements for control, scalability, and fault tolerance. Both patterns have their place in a well-architected Kafka ecosystem.