Design Universal Message Queue SDK

Introduction to Universal Message Queue SDK

The Universal Message Queue Component SDK is a sophisticated middleware solution designed to abstract the complexity of different message queue implementations while providing a unified interface for asynchronous communication patterns. This SDK addresses the critical need for vendor-agnostic messaging capabilities in distributed systems, enabling seamless integration with Kafka, Redis, and RocketMQ through a single, consistent API.

Core Value Proposition

Modern distributed systems require reliable asynchronous communication patterns to achieve scalability, resilience, and performance. The Universal MQ SDK provides:

  • Vendor Independence: Switch between Kafka, Redis, and RocketMQ without code changes
  • Unified API: Single interface for all messaging operations
  • Production Resilience: Built-in failure handling and recovery mechanisms
  • Asynchronous RPC: Transform synchronous HTTP calls into asynchronous message-driven operations

Interview Insight: Why use a universal SDK instead of direct MQ client libraries?
Answer: A universal SDK provides abstraction that enables vendor flexibility, reduces learning curve for developers, standardizes error handling patterns, and centralizes configuration management. It also allows for gradual migration between MQ technologies without application code changes.

Architecture Overview

The SDK follows a layered architecture pattern with clear separation of concerns:


flowchart TB
subgraph "Client Applications"
    A[Service A] --> B[Service B]
    C[Service C] --> D[Service D]
end

subgraph "Universal MQ SDK"
    E[Unified API Layer]
    F[SPI Interface]
    G[Async RPC Manager]
    H[Message Serialization]
    I[Failure Handling]
end

subgraph "MQ Implementations"
    J[Kafka Provider]
    K[Redis Provider]
    L[RocketMQ Provider]
end

subgraph "Message Brokers"
    M[Apache Kafka]
    N[Redis Streams]
    O[Apache RocketMQ]
end

A --> E
C --> E
E --> F
F --> G
F --> H
F --> I
F --> J
F --> K
F --> L
J --> M
K --> N
L --> O

Key Components

Unified API Layer: Provides consistent interface for all messaging operations
SPI (Service Provider Interface): Enables pluggable MQ implementations
Async RPC Manager: Handles request-response correlation and callback execution
Message Serialization: Manages data format conversion and schema evolution
Failure Handling: Implements retry, circuit breaker, and dead letter queue patterns

Service Provider Interface (SPI) Design

The SPI mechanism enables runtime discovery and loading of different MQ implementations without modifying core SDK code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Core SPI Interface
public interface MessageQueueProvider {
String getProviderName();
MessageProducer createProducer(ProducerConfig config);
MessageConsumer createConsumer(ConsumerConfig config);
void initialize(Properties properties);
void shutdown();

// Health check capabilities
boolean isHealthy();
ProviderMetrics getMetrics();
}

// SPI Configuration
@Component
public class MQProviderFactory {
private final Map<String, MessageQueueProvider> providers = new HashMap<>();

@PostConstruct
public void loadProviders() {
ServiceLoader<MessageQueueProvider> loader =
ServiceLoader.load(MessageQueueProvider.class);

for (MessageQueueProvider provider : loader) {
providers.put(provider.getProviderName(), provider);
}
}

public MessageQueueProvider getProvider(String providerName) {
MessageQueueProvider provider = providers.get(providerName);
if (provider == null) {
throw new UnsupportedMQProviderException(
"Provider not found: " + providerName);
}
return provider;
}
}

Provider Implementation Example - Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// META-INF/services/com.example.mq.MessageQueueProvider
com.example.mq.kafka.KafkaMessageQueueProvider

@Component
public class KafkaMessageQueueProvider implements MessageQueueProvider {
private KafkaProducer<String, Object> kafkaProducer;

@Override
public String getProviderName() {
return "kafka";
}

@Override
public void initialize(Properties properties) {
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
properties.getProperty("kafka.bootstrap.servers"));
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class.getName());

// Production settings
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

this.kafkaProducer = new KafkaProducer<>(kafkaProps);
}

@Override
public MessageProducer createProducer(ProducerConfig config) {
return new KafkaMessageProducer(kafkaProducer, config);
}
}

Interview Insight: How does SPI improve maintainability compared to factory patterns?
Answer: SPI provides compile-time independence - new providers can be added without modifying existing code. It supports modular deployment where providers can be packaged separately, enables runtime provider discovery, and follows the Open/Closed Principle by being open for extension but closed for modification.

Asynchronous RPC Implementation

The Async RPC pattern transforms traditional synchronous HTTP calls into message-driven asynchronous operations, providing better scalability and fault tolerance.


sequenceDiagram
participant Client as Client Service
participant SDK as MQ SDK
participant Server as Server Service
participant MQ as Message Queue
participant Callback as Callback Handler

Client->>SDK: asyncPost(url, data, callback)
SDK->>SDK: Generate messageKey & responseTopic
SDK->>Server: Direct HTTP POST with MQ headers
Note over Server: X-Message-Key: uuid-12345<br/>X-Response-Topic: client-responses
Server->>Server: Process business logic asynchronously
Server->>Server: HTTP 202 Accepted (immediate response)
Server->>MQ: Publish response message when ready
MQ->>SDK: SDK consumes response message
SDK->>Callback: Execute callback(response)

Async RPC Manager Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Component
public class AsyncRpcManager {
private final MessageQueueProvider mqProvider;
private final Map<String, CallbackContext> pendingRequests = new ConcurrentHashMap<>();
private final ScheduledExecutorService timeoutExecutor;

public <T> void asyncPost(String url, Object requestBody,
AsyncCallback<T> callback, Class<T> responseType) {
String messageKey = UUID.randomUUID().toString();
String responseTopic = "async-rpc-responses-" + getServiceName();

// Store callback context
CallbackContext context = new CallbackContext(callback, responseType,
System.currentTimeMillis());
pendingRequests.put(messageKey, context);

// Schedule timeout handling
scheduleTimeout(messageKey);

// Make direct HTTP request with MQ response headers
HttpHeaders headers = new HttpHeaders();
headers.set("X-Message-Key", messageKey);
headers.set("X-Response-Topic", responseTopic);
headers.set("X-Correlation-ID", messageKey);
headers.set("Content-Type", "application/json");

try {
// Send HTTP request directly - expect 202 Accepted for async processing
ResponseEntity<Void> response = restTemplate.postForEntity(url,
new HttpEntity<>(requestBody, headers), Void.class);

if (response.getStatusCode() != HttpStatus.ACCEPTED) {
// Server doesn't support async processing
pendingRequests.remove(messageKey);
callback.onError(new AsyncRpcException("Server returned " + response.getStatusCode() +
" - async processing not supported"));
}
// If 202 Accepted, we wait for MQ response

} catch (Exception e) {
// Clean up on immediate HTTP failure
pendingRequests.remove(messageKey);
callback.onError(e);
}
}

@EventListener
public void handleResponseMessage(MessageReceivedEvent event) {
String messageKey = event.getMessageKey();
CallbackContext context = pendingRequests.remove(messageKey);

if (context != null) {
try {
Object response = deserializeResponse(event.getPayload(),
context.getResponseType());
context.getCallback().onSuccess(response);
} catch (Exception e) {
context.getCallback().onError(e);
}
}
}

private void scheduleTimeout(String messageKey) {
timeoutExecutor.schedule(() -> {
CallbackContext context = pendingRequests.remove(messageKey);
if (context != null) {
context.getCallback().onTimeout();
}
}, 30, TimeUnit.SECONDS);
}
}

Server-Side Response Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@RestController
public class AsyncRpcController {
private final MessageQueueProvider mqProvider;

@PostMapping("/api/process-async")
public ResponseEntity<Void> processAsync(@RequestBody ProcessRequest request,
HttpServletRequest httpRequest) {
String messageKey = httpRequest.getHeader("X-Message-Key");
String responseTopic = httpRequest.getHeader("X-Response-Topic");

if (messageKey == null || responseTopic == null) {
return ResponseEntity.badRequest().build();
}

// Return 202 Accepted immediately - process asynchronously
CompletableFuture.runAsync(() -> {
try {
// Long-running business logic
ProcessResponse response = businessService.process(request);

// Send response via MQ when processing is complete
MessageProducer producer = mqProvider.createProducer(
ProducerConfig.builder()
.topic(responseTopic)
.build());

Message message = Message.builder()
.key(messageKey)
.payload(response)
.header("correlation-id", messageKey)
.header("processing-time", String.valueOf(System.currentTimeMillis()))
.build();

producer.send(message);

} catch (Exception e) {
// Send error response via MQ
sendErrorResponse(responseTopic, messageKey, e);
}
});

// Client gets immediate acknowledgment that request was accepted
return ResponseEntity.accepted()
.header("X-Message-Key", messageKey)
.build();
}
}

Interview Insight: Why use direct HTTP for requests instead of publishing to MQ?
Answer: Direct HTTP for requests provides immediate feedback (request validation, routing errors), utilizes existing HTTP infrastructure (load balancers, proxies, security), maintains request traceability, and reduces latency. The MQ is only used for the response path where asynchronous benefits (decoupling, persistence, fault tolerance) are most valuable. This hybrid approach gets the best of both worlds - immediate request processing feedback and asynchronous response handling.

Message Producer and Consumer Interfaces

The SDK defines unified interfaces for message production and consumption that abstract the underlying MQ implementation details.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface MessageProducer extends AutoCloseable {
CompletableFuture<SendResult> send(Message message);
CompletableFuture<SendResult> send(String topic, Object payload);
CompletableFuture<SendResult> sendWithKey(String topic, String key, Object payload);

// Batch operations for performance
CompletableFuture<List<SendResult>> sendBatch(List<Message> messages);

// Transactional support
void beginTransaction();
void commitTransaction();
void rollbackTransaction();
}

public interface MessageConsumer extends AutoCloseable {
void subscribe(String topic, MessageHandler handler);
void subscribe(List<String> topics, MessageHandler handler);
void unsubscribe(String topic);

// Manual acknowledgment control
void acknowledge(Message message);
void reject(Message message, boolean requeue);

// Consumer group management
void joinConsumerGroup(String groupId);
void leaveConsumerGroup();
}

// Unified message format
@Data
@Builder
public class Message {
private String id;
private String key;
private Object payload;
private Map<String, String> headers;
private long timestamp;
private String topic;
private int partition;
private long offset;

// Serialization metadata
private String contentType;
private String encoding;
}

Redis Streams Implementation Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RedisMessageConsumer implements MessageConsumer {
private final RedisTemplate<String, Object> redisTemplate;
private final String consumerGroup;
private final String consumerName;
private volatile boolean consuming = false;

@Override
public void subscribe(String topic, MessageHandler handler) {
// Create consumer group if not exists
try {
redisTemplate.opsForStream().createGroup(topic, consumerGroup);
} catch (Exception e) {
// Group might already exist
}

consuming = true;
CompletableFuture.runAsync(() -> {
while (consuming) {
try {
List<MapRecord<String, Object, Object>> records =
redisTemplate.opsForStream().read(
Consumer.from(consumerGroup, consumerName),
StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),
StreamOffset.create(topic, ReadOffset.lastConsumed())
);

for (MapRecord<String, Object, Object> record : records) {
Message message = convertToMessage(record);
try {
handler.handle(message);
// Acknowledge message
redisTemplate.opsForStream().acknowledge(topic, consumerGroup,
record.getId());
} catch (Exception e) {
// Handle processing error
handleProcessingError(message, e);
}
}
} catch (Exception e) {
handleConsumerError(e);
}
}
});
}

private void handleProcessingError(Message message, Exception error) {
// Implement retry logic or dead letter queue
RetryPolicy retryPolicy = getRetryPolicy();
if (retryPolicy.shouldRetry(message)) {
scheduleRetry(message, retryPolicy.getNextDelay());
} else {
sendToDeadLetterQueue(message, error);
}
}
}

Failure Handling and Resilience Patterns

Robust failure handling is crucial for production systems. The SDK implements multiple resilience patterns to handle various failure scenarios.


flowchart LR
A[Message Send] --> B{Send Success?}
B -->|Yes| C[Success]
B -->|No| D[Retry Logic]
D --> E{Max Retries?}
E -->|No| F[Exponential Backoff]
F --> A
E -->|Yes| G[Circuit Breaker]
G --> H{Circuit Open?}
H -->|Yes| I[Fail Fast]
H -->|No| J[Dead Letter Queue]
J --> K[Alert/Monitor]

Resilience Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Component
public class ResilientMessageProducer implements MessageProducer {
private final MessageProducer delegate;
private final RetryTemplate retryTemplate;
private final CircuitBreaker circuitBreaker;
private final DeadLetterQueueManager dlqManager;

public ResilientMessageProducer(MessageProducer delegate) {
this.delegate = delegate;
this.retryTemplate = buildRetryTemplate();
this.circuitBreaker = buildCircuitBreaker();
this.dlqManager = new DeadLetterQueueManager();
}

@Override
public CompletableFuture<SendResult> send(Message message) {
return CompletableFuture.supplyAsync(() -> {
try {
return circuitBreaker.executeSupplier(() -> {
return retryTemplate.execute(context -> {
return delegate.send(message).get();
});
});
} catch (Exception e) {
// Send to dead letter queue after all retries exhausted
dlqManager.sendToDeadLetterQueue(message, e);
throw new MessageSendException("Failed to send message after retries", e);
}
});
}

private RetryTemplate buildRetryTemplate() {
return RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 10000)
.retryOn(MessageSendException.class)
.build();
}

private CircuitBreaker buildCircuitBreaker() {
return CircuitBreaker.ofDefaults("messageProducer");
}
}

// Dead Letter Queue Management
@Component
public class DeadLetterQueueManager {
private final MessageProducer dlqProducer;
private final NotificationService notificationService;

public void sendToDeadLetterQueue(Message originalMessage, Exception error) {
Message dlqMessage = Message.builder()
.payload(originalMessage)
.headers(Map.of(
"original-topic", originalMessage.getTopic(),
"error-message", error.getMessage(),
"error-type", error.getClass().getSimpleName(),
"failure-timestamp", String.valueOf(System.currentTimeMillis())
))
.topic("dead-letter-queue")
.build();

try {
dlqProducer.send(dlqMessage);
notificationService.alertDeadLetter(originalMessage, error);
} catch (Exception dlqError) {
// Log to persistent storage as last resort
persistentLogger.logFailedMessage(originalMessage, error, dlqError);
}
}
}

Network Partition Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
public class NetworkPartitionHandler {
private final HealthCheckService healthCheckService;
private final LocalMessageBuffer localBuffer;

@EventListener
public void handleNetworkPartition(NetworkPartitionEvent event) {
if (event.isPartitioned()) {
// Switch to local buffering mode
localBuffer.enableBuffering();

// Start health check monitoring
healthCheckService.startPartitionRecoveryMonitoring();
} else {
// Network recovered - flush buffered messages
flushBufferedMessages();
localBuffer.disableBuffering();
}
}

private void flushBufferedMessages() {
List<Message> bufferedMessages = localBuffer.getAllBufferedMessages();

CompletableFuture.runAsync(() -> {
for (Message message : bufferedMessages) {
try {
delegate.send(message).get();
localBuffer.removeBufferedMessage(message.getId());
} catch (Exception e) {
// Keep in buffer for next flush attempt
logger.warn("Failed to flush buffered message: {}", message.getId(), e);
}
}
});
}
}

Interview Insight: How do you handle message ordering in failure scenarios?
Answer: Message ordering can be maintained through partitioning strategies (same key goes to same partition), single-threaded consumers per partition, and implementing sequence numbers with gap detection. However, strict ordering often conflicts with high availability, so systems typically choose between strong ordering and high availability based on business requirements.

Configuration and Best Practices

Configuration Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# application.yml
universal-mq:
provider: kafka # kafka, redis, rocketmq

# Common configuration
async-rpc:
timeout: 30s
max-pending-requests: 1000
response-topic-prefix: "async-rpc-responses"

resilience:
retry:
max-attempts: 3
initial-delay: 1s
max-delay: 10s
multiplier: 2.0
circuit-breaker:
failure-rate-threshold: 50
wait-duration-in-open-state: 60s
sliding-window-size: 100
dead-letter-queue:
enabled: true
topic: "dead-letter-queue"

# Provider-specific configuration
kafka:
bootstrap-servers: "localhost:9092"
producer:
acks: "all"
retries: 2147483647
enable-idempotence: true
batch-size: 16384
linger-ms: 5
consumer:
group-id: "universal-mq-consumers"
auto-offset-reset: "earliest"
enable-auto-commit: false

redis:
host: "localhost"
port: 6379
database: 0
stream:
consumer-group: "universal-mq-group"
consumer-name: "${spring.application.name}-${random.uuid}"

rocketmq:
name-server: "localhost:9876"
producer:
group: "universal-mq-producers"
send-msg-timeout: 3000
consumer:
group: "universal-mq-consumers"
consume-message-batch-max-size: 32

Production Best Practices

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Configuration
@EnableConfigurationProperties(UniversalMqProperties.class)
public class UniversalMqConfiguration {

@Bean
@ConditionalOnProperty(name = "universal-mq.monitoring.enabled", havingValue = "true")
public MqMetricsCollector metricsCollector() {
return new MqMetricsCollector();
}

@Bean
public MessageQueueHealthIndicator healthIndicator(MessageQueueProvider provider) {
return new MessageQueueHealthIndicator(provider);
}

// Production-ready thread pool configuration
@Bean("mqExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("mq-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

// Monitoring and Metrics
@Component
public class MqMetricsCollector {
private final MeterRegistry meterRegistry;
private final Counter messagesProduced;
private final Counter messagesConsumed;
private final Timer messageProcessingTime;

public MqMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messagesProduced = Counter.builder("mq.messages.produced")
.description("Number of messages produced")
.register(meterRegistry);
this.messagesConsumed = Counter.builder("mq.messages.consumed")
.description("Number of messages consumed")
.register(meterRegistry);
this.messageProcessingTime = Timer.builder("mq.message.processing.time")
.description("Message processing time")
.register(meterRegistry);
}

public void recordMessageProduced(String topic, String provider) {
messagesProduced.increment(
Tags.of("topic", topic, "provider", provider));
}

public void recordMessageConsumed(String topic, String provider, long processingTimeMs) {
messagesConsumed.increment(
Tags.of("topic", topic, "provider", provider));
messageProcessingTime.record(processingTimeMs, TimeUnit.MILLISECONDS);
}
}

Use Cases and Examples

Use Case 1: E-commerce Order Processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Order processing with async notification
@Service
public class OrderService {
private final AsyncRpcManager asyncRpcManager;
private final MessageProducer messageProducer;

public void processOrder(Order order) {
// Validate and save order
Order savedOrder = orderRepository.save(order);

// Async inventory check via direct HTTP + MQ response
asyncRpcManager.asyncPost(
"http://inventory-service/check",
new InventoryCheckRequest(order.getItems()),
new AsyncCallback<InventoryCheckResponse>() {
@Override
public void onSuccess(InventoryCheckResponse response) {
if (response.isAvailable()) {
processPayment(savedOrder);
} else {
cancelOrder(savedOrder, "Insufficient inventory");
}
}

@Override
public void onError(Exception error) {
cancelOrder(savedOrder, "Inventory check failed: " + error.getMessage());
}

@Override
public void onTimeout() {
cancelOrder(savedOrder, "Inventory check timeout");
}
},
InventoryCheckResponse.class
);
}

private void processPayment(Order order) {
// Async payment processing via direct HTTP + MQ response
asyncRpcManager.asyncPost(
"http://payment-service/process",
new PaymentRequest(order.getTotalAmount(), order.getPaymentMethod()),
new PaymentCallback(order),
PaymentResponse.class
);
}
}

// Inventory service response handler
@RestController
public class InventoryController {

@PostMapping("/check")
public ResponseEntity<Void> checkInventory(@RequestBody InventoryCheckRequest request,
HttpServletRequest httpRequest) {
String messageKey = httpRequest.getHeader("X-Message-Key");
String responseTopic = httpRequest.getHeader("X-Response-Topic");

// Process asynchronously and return 202 Accepted immediately
CompletableFuture.runAsync(() -> {
InventoryCheckResponse response = inventoryService.checkAvailability(request.getItems());

// Send response via MQ after processing is complete
Message responseMessage = Message.builder()
.key(messageKey)
.payload(response)
.topic(responseTopic)
.build();

messageProducer.send(responseMessage);
});

return ResponseEntity.accepted()
.header("X-Message-Key", messageKey)
.build();
}
}

Use Case 2: Real-time Analytics Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Event streaming for analytics
@Component
public class AnalyticsEventProcessor {
private final MessageConsumer eventConsumer;
private final MessageProducer enrichedEventProducer;

@PostConstruct
public void startProcessing() {
eventConsumer.subscribe("user-events", this::processUserEvent);
eventConsumer.subscribe("system-events", this::processSystemEvent);
}

private void processUserEvent(Message message) {
UserEvent event = (UserEvent) message.getPayload();

// Enrich event with user profile data
UserProfile profile = userProfileService.getProfile(event.getUserId());
EnrichedUserEvent enrichedEvent = EnrichedUserEvent.builder()
.originalEvent(event)
.userProfile(profile)
.enrichmentTimestamp(System.currentTimeMillis())
.build();

// Send to analytics pipeline
enrichedEventProducer.send("enriched-user-events", enrichedEvent);

// Update real-time metrics
metricsService.updateUserMetrics(enrichedEvent);
}
}

Use Case 3: Microservice Choreography

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Saga pattern implementation
@Component
public class OrderSagaOrchestrator {
private final Map<String, SagaState> activeSagas = new ConcurrentHashMap<>();

@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
String sagaId = UUID.randomUUID().toString();
SagaState saga = new SagaState(sagaId, event.getOrder());
activeSagas.put(sagaId, saga);

// Start saga - reserve inventory
asyncRpcManager.asyncPost(
"http://inventory-service/reserve",
new ReserveInventoryRequest(event.getOrder().getItems(), sagaId),
new InventoryReservationCallback(sagaId),
ReservationResponse.class
);
}

public class InventoryReservationCallback implements AsyncCallback<ReservationResponse> {
private final String sagaId;

@Override
public void onSuccess(ReservationResponse response) {
SagaState saga = activeSagas.get(sagaId);
if (response.isSuccess()) {
saga.markInventoryReserved();
// Next step - process payment
processPayment(saga);
} else {
// Compensate - cancel order
compensateOrder(saga);
}
}

@Override
public void onError(Exception error) {
compensateOrder(activeSagas.get(sagaId));
}
}
}

Interview Insight: How do you handle distributed transactions across multiple services?
Answer: Use saga patterns (orchestration or choreography) rather than two-phase commit. Implement compensating actions for each step, maintain saga state, and use event sourcing for audit trails. The Universal MQ SDK enables reliable event delivery needed for saga coordination.

Performance Optimization

Batching and Throughput Optimization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
public class BatchingMessageProducer {
private final BlockingQueue<Message> messageBuffer = new ArrayBlockingQueue<>(10000);
private final ScheduledExecutorService batchProcessor =
Executors.newSingleThreadScheduledExecutor();

@PostConstruct
public void startBatchProcessing() {
batchProcessor.scheduleWithFixedDelay(this::processBatch, 100, 100, TimeUnit.MILLISECONDS);
}

public CompletableFuture<SendResult> send(Message message) {
CompletableFuture<SendResult> future = new CompletableFuture<>();
message.setResultFuture(future);

if (!messageBuffer.offer(message)) {
future.completeExceptionally(new BufferFullException("Message buffer is full"));
}

return future;
}

private void processBatch() {
List<Message> batch = new ArrayList<>();
messageBuffer.drainTo(batch, 100); // Max batch size

if (!batch.isEmpty()) {
try {
List<SendResult> results = delegate.sendBatch(batch).get();

// Complete futures
for (int i = 0; i < batch.size(); i++) {
batch.get(i).getResultFuture().complete(results.get(i));
}
} catch (Exception e) {
// Complete all futures exceptionally
batch.forEach(msg ->
msg.getResultFuture().completeExceptionally(e));
}
}
}
}

Connection Pooling and Resource Management

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class MqConnectionPool {
private final GenericObjectPool<Connection> connectionPool;

public MqConnectionPool(ConnectionFactory factory) {
GenericObjectPoolConfig<Connection> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(50);
config.setMaxIdle(10);
config.setMinIdle(5);
config.setTestOnBorrow(true);
config.setTestWhileIdle(true);

this.connectionPool = new GenericObjectPool<>(factory, config);
}

public <T> T executeWithConnection(ConnectionCallback<T> callback) throws Exception {
Connection connection = connectionPool.borrowObject();
try {
return callback.execute(connection);
} finally {
connectionPool.returnObject(connection);
}
}
}

Testing Strategies

Integration Testing with Testcontainers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@SpringBootTest
@Testcontainers
class UniversalMqIntegrationTest {

@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

@Container
static GenericContainer<?> redis = new GenericContainer<>("redis:7-alpine")
.withExposedPorts(6379);

@Autowired
private UniversalMqSDK mqSDK;

@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("universal-mq.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("universal-mq.redis.host", redis::getHost);
registry.add("universal-mq.redis.port", () -> redis.getMappedPort(6379));
}

@Test
void testAsyncRpcWithKafka() throws Exception {
// Configure for Kafka
mqSDK.switchProvider("kafka");

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> responseRef = new AtomicReference<>();

// Mock HTTP server response
mockWebServer.enqueue(new MockResponse().setResponseCode(202));

// Send async request
mqSDK.asyncPost("http://localhost:" + mockWebServer.getPort() + "/test",
new TestRequest("test data"),
new AsyncCallback<TestResponse>() {
@Override
public void onSuccess(TestResponse response) {
responseRef.set(response.getMessage());
latch.countDown();
}

@Override
public void onError(Exception error) {
latch.countDown();
}
},
TestResponse.class);

// Simulate server response
simulateServerResponse("test-message-key", "Test response");

assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("Test response", responseRef.get());
}

@Test
void testProviderSwitching() {
// Test switching between providers
mqSDK.switchProvider("kafka");
assertEquals("kafka", mqSDK.getCurrentProvider());

mqSDK.switchProvider("redis");
assertEquals("redis", mqSDK.getCurrentProvider());

// Verify both providers work
assertDoesNotThrow(() -> {
mqSDK.send("test-topic", "test message");
});
}

@Test
void testFailureRecovery() throws Exception {
// Test circuit breaker and retry mechanisms
CountDownLatch errorLatch = new CountDownLatch(3); // Expect 3 retries

// Mock server to fail initially then succeed
mockWebServer.enqueue(new MockResponse().setResponseCode(500));
mockWebServer.enqueue(new MockResponse().setResponseCode(500));
mockWebServer.enqueue(new MockResponse().setResponseCode(202));

mqSDK.asyncPost("http://localhost:" + mockWebServer.getPort() + "/fail-test",
new TestRequest("retry test"),
new AsyncCallback<TestResponse>() {
@Override
public void onSuccess(TestResponse response) {
// Should eventually succeed
}

@Override
public void onError(Exception error) {
errorLatch.countDown();
}
},
TestResponse.class);

// Verify retry attempts
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
}
}

Unit Testing with Mocks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@ExtendWith(MockitoExtension.class)
class AsyncRpcManagerTest {

@Mock
private MessageQueueProvider mqProvider;

@Mock
private MessageProducer messageProducer;

@Mock
private RestTemplate restTemplate;

@InjectMocks
private AsyncRpcManager asyncRpcManager;

@Test
void testSuccessfulAsyncCall() {
// Setup
when(mqProvider.createProducer(any())).thenReturn(messageProducer);
when(restTemplate.postForEntity(any(String.class), any(), eq(Void.class)))
.thenReturn(ResponseEntity.accepted().build());

// Test
CompletableFuture<String> future = new CompletableFuture<>();
asyncRpcManager.asyncPost("http://test.com/api",
new TestRequest("test"),
new AsyncCallback<String>() {
@Override
public void onSuccess(String response) {
future.complete(response);
}

@Override
public void onError(Exception error) {
future.completeExceptionally(error);
}
},
String.class);

// Simulate response
MessageReceivedEvent event = new MessageReceivedEvent("test-message-key", "Success response");
asyncRpcManager.handleResponseMessage(event);

// Verify
assertDoesNotThrow(() -> assertEquals("Success response", future.get(1, TimeUnit.SECONDS)));
}

@Test
void testTimeoutHandling() {
// Configure short timeout for testing
asyncRpcManager.setTimeout(Duration.ofMillis(100));

CompletableFuture<Boolean> timeoutFuture = new CompletableFuture<>();

asyncRpcManager.asyncPost("http://test.com/api",
new TestRequest("timeout test"),
new AsyncCallback<String>() {
@Override
public void onSuccess(String response) {
timeoutFuture.complete(false);
}

@Override
public void onTimeout() {
timeoutFuture.complete(true);
}

@Override
public void onError(Exception error) {
timeoutFuture.completeExceptionally(error);
}
},
String.class);

// Verify timeout occurs
assertDoesNotThrow(() -> assertTrue(timeoutFuture.get(1, TimeUnit.SECONDS)));
}
}

Security Considerations

Message Encryption and Authentication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Component
public class SecureMessageProducer implements MessageProducer {
private final MessageProducer delegate;
private final EncryptionService encryptionService;
private final AuthenticationService authService;

@Override
public CompletableFuture<SendResult> send(Message message) {
// Add authentication headers
String authToken = authService.generateToken();
message.getHeaders().put("Authorization", "Bearer " + authToken);
message.getHeaders().put("X-Client-ID", authService.getClientId());

// Encrypt sensitive payload
if (isSensitiveMessage(message)) {
Object encryptedPayload = encryptionService.encrypt(message.getPayload());
message = message.toBuilder()
.payload(encryptedPayload)
.header("Encrypted", "true")
.header("Encryption-Algorithm", encryptionService.getAlgorithm())
.build();
}

return delegate.send(message);
}

private boolean isSensitiveMessage(Message message) {
return message.getHeaders().containsKey("Sensitive") ||
message.getPayload() instanceof PersonalData ||
message.getTopic().contains("pii");
}
}

@Component
public class SecureMessageConsumer implements MessageConsumer {
private final MessageConsumer delegate;
private final EncryptionService encryptionService;
private final AuthenticationService authService;

@Override
public void subscribe(String topic, MessageHandler handler) {
delegate.subscribe(topic, new SecureMessageHandler(handler));
}

private class SecureMessageHandler implements MessageHandler {
private final MessageHandler delegate;

public SecureMessageHandler(MessageHandler delegate) {
this.delegate = delegate;
}

@Override
public void handle(Message message) {
// Verify authentication
String authHeader = message.getHeaders().get("Authorization");
if (!authService.validateToken(authHeader)) {
throw new UnauthorizedMessageException("Invalid authentication token");
}

// Decrypt if necessary
if ("true".equals(message.getHeaders().get("Encrypted"))) {
Object decryptedPayload = encryptionService.decrypt(message.getPayload());
message = message.toBuilder().payload(decryptedPayload).build();
}

delegate.handle(message);
}
}
}

Access Control and Topic Security

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class TopicAccessController {
private final AccessControlService accessControlService;

public void validateTopicAccess(String topic, String operation, String clientId) {
TopicPermission permission = TopicPermission.valueOf(operation.toUpperCase());

if (!accessControlService.hasPermission(clientId, topic, permission)) {
throw new AccessDeniedException(
String.format("Client %s does not have %s permission for topic %s",
clientId, operation, topic));
}
}

@PreAuthorize("hasRole('ADMIN') or hasPermission(#topic, 'WRITE')")
public void createTopic(String topic, TopicConfiguration config) {
// Topic creation logic
}

@PreAuthorize("hasPermission(#topic, 'READ')")
public void subscribeTo(String topic) {
// Subscription logic
}
}

Monitoring and Observability

Distributed Tracing Integration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Component
public class TracingMessageProducer implements MessageProducer {
private final MessageProducer delegate;
private final Tracer tracer;

@Override
public CompletableFuture<SendResult> send(Message message) {
Span span = tracer.nextSpan()
.name("message-send")
.tag("mq.topic", message.getTopic())
.tag("mq.key", message.getKey())
.start();

try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// Inject trace context into message headers
TraceContext traceContext = span.context();
message.getHeaders().put("X-Trace-ID", traceContext.traceId());
message.getHeaders().put("X-Span-ID", traceContext.spanId());

return delegate.send(message)
.whenComplete((result, throwable) -> {
if (throwable != null) {
span.tag("error", throwable.getMessage());
} else {
span.tag("mq.partition", String.valueOf(result.getPartition()));
span.tag("mq.offset", String.valueOf(result.getOffset()));
}
span.end();
});
}
}
}

@Component
public class TracingMessageConsumer implements MessageConsumer {
private final MessageConsumer delegate;
private final Tracer tracer;

@Override
public void subscribe(String topic, MessageHandler handler) {
delegate.subscribe(topic, new TracingMessageHandler(handler));
}

private class TracingMessageHandler implements MessageHandler {
private final MessageHandler delegate;

@Override
public void handle(Message message) {
// Extract trace context from message headers
String traceId = message.getHeaders().get("X-Trace-ID");
String spanId = message.getHeaders().get("X-Span-ID");

SpanBuilder spanBuilder = tracer.nextSpan()
.name("message-consume")
.tag("mq.topic", message.getTopic())
.tag("mq.key", message.getKey());

if (traceId != null && spanId != null) {
// Continue trace from producer
spanBuilder = spanBuilder.asChildOf(createSpanContext(traceId, spanId));
}

Span span = spanBuilder.start();

try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
delegate.handle(message);
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
}
}

Health Checks and Metrics Dashboard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Component
public class MqHealthIndicator implements HealthIndicator {
private final List<MessageQueueProvider> providers;

@Override
public Health health() {
Health.Builder builder = new Health.Builder();

boolean allHealthy = true;
Map<String, Object> details = new HashMap<>();

for (MessageQueueProvider provider : providers) {
boolean healthy = provider.isHealthy();
allHealthy &= healthy;

details.put(provider.getProviderName() + ".status",
healthy ? "UP" : "DOWN");
details.put(provider.getProviderName() + ".metrics",
provider.getMetrics());
}

return allHealthy ? builder.up().withDetails(details).build() :
builder.down().withDetails(details).build();
}
}

// Custom metrics for business monitoring
@Component
public class BusinessMetricsCollector {
private final MeterRegistry meterRegistry;

// Track message processing latency by business context
public void recordBusinessOperationLatency(String operation, long latencyMs) {
Timer.builder("business.operation.latency")
.tag("operation", operation)
.register(meterRegistry)
.record(latencyMs, TimeUnit.MILLISECONDS);
}

// Track business-specific error rates
public void recordBusinessError(String errorType, String context) {
Counter.builder("business.errors")
.tag("error.type", errorType)
.tag("context", context)
.register(meterRegistry)
.increment();
}
}

Migration and Deployment Strategies

Blue-Green Deployment with Message Queue Migration


flowchart TB
subgraph "Blue Environment (Current)"
    B1[Service A] --> B2[Kafka Cluster]
    B3[Service B] --> B2
end

subgraph "Green Environment (New)"
    G1[Service A] --> G2[RocketMQ Cluster]
    G3[Service B] --> G2
end

subgraph "Migration Process"
    M1[Dual Write Phase]
    M2[Consumer Migration]
    M3[Producer Migration]
    M4[Verification]
end

B2 --> M1
G2 --> M1
M1 --> M2
M2 --> M3
M3 --> M4

Migration Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Component
public class MqMigrationManager {
private final Map<String, MessageQueueProvider> providers;
private final MigrationConfig migrationConfig;

public void startMigration(String fromProvider, String toProvider) {
MigrationPlan plan = createMigrationPlan(fromProvider, toProvider);

// Phase 1: Start dual write
enableDualWrite(fromProvider, toProvider);

// Phase 2: Migrate consumers gradually
migrateConsumersGradually(fromProvider, toProvider, plan);

// Phase 3: Stop old producer after lag verification
stopOldProducerAfterVerification(fromProvider, toProvider);

// Phase 4: Clean up
cleanupOldInfrastructure(fromProvider);
}

private void enableDualWrite(String fromProvider, String toProvider) {
// Configure all producers to write to both systems
MessageProducer oldProducer = providers.get(fromProvider).createProducer(getConfig());
MessageProducer newProducer = providers.get(toProvider).createProducer(getConfig());

DualWriteProducer dualProducer = new DualWriteProducer(oldProducer, newProducer);

// Replace existing producers
applicationContext.getBean(MessageProducerFactory.class)
.setDefaultProducer(dualProducer);
}

private void migrateConsumersGradually(String fromProvider, String toProvider,
MigrationPlan plan) {
for (ConsumerMigrationStep step : plan.getConsumerSteps()) {
// Stop percentage of old consumers
stopConsumers(fromProvider, step.getTopics(), step.getPercentage());

// Start same percentage of new consumers
startConsumers(toProvider, step.getTopics(), step.getPercentage());

// Wait and verify lag is manageable
waitAndVerifyLag(step.getVerificationTimeMs());
}
}
}

// Dual write producer for zero-downtime migration
public class DualWriteProducer implements MessageProducer {
private final MessageProducer primaryProducer;
private final MessageProducer secondaryProducer;
private final MigrationMetrics metrics;

@Override
public CompletableFuture<SendResult> send(Message message) {
// Send to primary (current) system
CompletableFuture<SendResult> primaryFuture = primaryProducer.send(message);

// Send to secondary (new) system - don't fail if this fails
CompletableFuture<SendResult> secondaryFuture = secondaryProducer.send(message)
.handle((result, throwable) -> {
if (throwable != null) {
metrics.recordSecondaryWriteFailure(message.getTopic(), throwable);
// Log but don't propagate error
logger.warn("Secondary write failed for topic: {}",
message.getTopic(), throwable);
}
return result;
});

// Return primary result - migration is transparent to clients
return primaryFuture;
}
}

Advanced Topics

Message Schema Evolution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Schema registry integration
@Component
public class SchemaAwareMessageProducer implements MessageProducer {
private final MessageProducer delegate;
private final SchemaRegistry schemaRegistry;

@Override
public CompletableFuture<SendResult> send(Message message) {
// Validate and evolve schema
Schema currentSchema = schemaRegistry.getLatestSchema(message.getTopic());

if (currentSchema != null) {
// Validate message against schema
SchemaValidationResult validation = currentSchema.validate(message.getPayload());

if (!validation.isValid()) {
// Attempt automatic schema evolution
Object evolvedPayload = schemaRegistry.evolvePayload(
message.getPayload(), currentSchema);
message = message.toBuilder().payload(evolvedPayload).build();
}
}

// Add schema metadata to message
message.getHeaders().put("Schema-ID", currentSchema.getId());
message.getHeaders().put("Schema-Version", String.valueOf(currentSchema.getVersion()));

return delegate.send(message);
}
}

// Backward compatibility handler
@Component
public class BackwardCompatibilityConsumer implements MessageConsumer {
private final MessageConsumer delegate;
private final SchemaRegistry schemaRegistry;

@Override
public void subscribe(String topic, MessageHandler handler) {
delegate.subscribe(topic, new CompatibilityMessageHandler(handler, topic));
}

private class CompatibilityMessageHandler implements MessageHandler {
private final MessageHandler delegate;
private final String topic;

@Override
public void handle(Message message) {
String schemaId = message.getHeaders().get("Schema-ID");

if (schemaId != null) {
// Get the schema used to produce this message
Schema producerSchema = schemaRegistry.getSchema(schemaId);
Schema currentSchema = schemaRegistry.getLatestSchema(topic);

if (!producerSchema.equals(currentSchema)) {
// Migrate message to current schema
Object migratedPayload = schemaRegistry.migratePayload(
message.getPayload(), producerSchema, currentSchema);
message = message.toBuilder().payload(migratedPayload).build();
}
}

delegate.handle(message);
}
}
}

Event Sourcing Integration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// Event store using Universal MQ SDK
@Component
public class EventStore {
private final MessageProducer eventProducer;
private final MessageConsumer eventConsumer;
private final EventRepository eventRepository;

public void storeEvent(DomainEvent event) {
// Store in persistent event log
EventRecord record = EventRecord.builder()
.eventId(event.getEventId())
.aggregateId(event.getAggregateId())
.eventType(event.getClass().getSimpleName())
.eventData(serialize(event))
.timestamp(event.getTimestamp())
.version(event.getVersion())
.build();

eventRepository.save(record);

// Publish for real-time processing
Message message = Message.builder()
.key(event.getAggregateId())
.payload(event)
.topic("domain-events")
.header("Event-Type", event.getClass().getSimpleName())
.header("Aggregate-ID", event.getAggregateId())
.build();

eventProducer.send(message);
}

public List<DomainEvent> getEventHistory(String aggregateId, int fromVersion) {
// Replay events from persistent store
List<EventRecord> records = eventRepository.findByAggregateIdAndVersionGreaterThan(
aggregateId, fromVersion);

return records.stream()
.map(this::deserializeEvent)
.collect(Collectors.toList());
}

@PostConstruct
public void startEventProjections() {
// Subscribe to events for read model updates
eventConsumer.subscribe("domain-events", this::handleDomainEvent);
}

private void handleDomainEvent(Message message) {
DomainEvent event = (DomainEvent) message.getPayload();

// Update read models/projections
projectionService.updateProjections(event);

// Trigger side effects
sideEffectProcessor.processSideEffects(event);
}
}

Interview Questions and Expert Insights

Q: How would you handle message ordering guarantees across different MQ providers?

Expert Answer: Message ordering is achieved differently across providers:

  • Kafka: Uses partitioning - messages with the same key go to the same partition, maintaining order within that partition
  • Redis Streams: Inherently ordered within a stream, use stream keys for partitioning
  • RocketMQ: Supports both ordered and unordered messages, use MessageQueueSelector for ordering

The Universal SDK abstracts this by implementing a consistent partitioning strategy based on message keys, ensuring the same ordering semantics regardless of the underlying provider.

Q: What are the performance implications of your SPI-based approach?

Expert Answer: The SPI approach has minimal runtime overhead:

  • Initialization cost: Provider discovery happens once at startup
  • Runtime cost: Single level of indirection (interface call)
  • Memory overhead: Multiple providers loaded but only active one used
  • Optimization: Use provider-specific optimizations under the unified interface

Benefits outweigh costs: vendor flexibility, simplified testing, and operational consistency justify the slight performance trade-off.

Q: How do you ensure exactly-once delivery semantics?

Expert Answer: Exactly-once is challenging and provider-dependent:

  • Kafka: Use idempotent producers + transactional consumers
  • Redis: Leverage Redis transactions and consumer group acknowledgments
  • RocketMQ: Built-in transactional message support

The SDK implements idempotency through:

  • Message deduplication using correlation IDs
  • Idempotent message handlers
  • At-least-once delivery with deduplication at the application level

Q: How would you handle schema evolution in a microservices environment?

Expert Answer: Schema evolution requires careful planning:

  1. Forward Compatibility: New producers can write data that old consumers can read
  2. Backward Compatibility: Old producers can write data that new consumers can read
  3. Full Compatibility: Both forward and backward compatibility

Implementation strategies:

  • Use Avro or Protocol Buffers for schema definition
  • Implement schema registry for centralized schema management
  • Version schemas and maintain compatibility matrices
  • Gradual rollout of schema changes with monitoring

External References and Resources

Official Documentation

Best Practices and Patterns

Production Operations

Testing and Development

Conclusion

The Universal Message Queue Component SDK provides a robust, production-ready solution for abstracting message queue implementations while maintaining high performance and reliability. By leveraging the SPI mechanism, implementing comprehensive failure handling, and supporting advanced patterns like async RPC, this SDK enables organizations to build resilient distributed systems that can evolve with changing technology requirements.

The key to success with this SDK lies in understanding the trade-offs between abstraction and performance, implementing proper monitoring and observability, and following established patterns for distributed system design. With careful attention to schema evolution, security, and operational concerns, this SDK can serve as a foundation for scalable microservices architectures.