Redis serves as a high-performance in-memory data structure store, commonly used as a cache, database, and message broker. Understanding caching patterns and consistency mechanisms is crucial for building scalable, reliable systems.
π― Interview Insight: Interviewers often ask about the trade-offs between performance and consistency. Be prepared to discuss CAP theorem implications and when to choose eventual consistency over strong consistency.
Why Caching Matters
Reduced Latency: Sub-millisecond response times for cached data
Decreased Database Load: Offloads read operations from primary databases
Performance: Sub-millisecond latency for most operations
Scalability: Handles millions of requests per second
Flexibility: Rich data structures (strings, hashes, lists, sets, sorted sets)
Persistence: Optional durability with RDB/AOF
High Availability: Redis Sentinel and Cluster support
Core Caching Patterns
1. Cache-Aside (Lazy Loading)
The application manages the cache directly, loading data on cache misses.
sequenceDiagram
participant App as Application
participant Cache as Redis Cache
participant DB as Database
App->>Cache: GET user:123
Cache-->>App: Cache Miss (null)
App->>DB: SELECT * FROM users WHERE id=123
DB-->>App: User data
App->>Cache: SET user:123 {user_data} EX 3600
Cache-->>App: OK
App-->>App: Return user data
classCacheAsidePattern: def__init__(self): self.redis_client = redis.Redis(host='localhost', port=6379, db=0) self.cache_ttl = 3600# 1 hour defget_user(self, user_id): cache_key = f"user:{user_id}" # Try cache first cached_data = self.redis_client.get(cache_key) if cached_data: return json.loads(cached_data) # Cache miss - fetch from database user_data = self.fetch_user_from_db(user_id) if user_data: # Store in cache with TTL self.redis_client.setex( cache_key, self.cache_ttl, json.dumps(user_data) ) return user_data defupdate_user(self, user_id, user_data): # Update database self.update_user_in_db(user_id, user_data) # Invalidate cache cache_key = f"user:{user_id}" self.redis_client.delete(cache_key) return user_data
Pros:
Simple to implement and understand
Cache only contains requested data
Resilient to cache failures
Cons:
Cache miss penalty (extra database call)
Potential cache stampede issues
Data staleness between updates
π‘ Interview Insight: Discuss cache stampede scenarios: multiple requests hitting the same missing key simultaneously. Solutions include distributed locking or probabilistic refresh.
2. Write-Through
Data is written to both cache and database simultaneously.
sequenceDiagram
participant App as Application
participant Cache as Redis Cache
participant DB as Database
App->>Cache: SET key data
Cache->>DB: UPDATE data
DB-->>Cache: Success
Cache-->>App: Success
Note over App,DB: Read requests served directly from cache
π― Interview Insight: Write-behind offers better write performance but introduces complexity and potential data loss risks. Discuss scenarios where this pattern is appropriate (high write volume, acceptable eventual consistency,some data loss is acceptable, like analytics or logging systems).
4. Refresh-Ahead
Proactively refresh cache entries before they expire.
π― Interview Insight: Strong consistency comes with performance costs. Discuss scenarios where itβs necessary (financial transactions, inventory management) vs. where eventual consistency is acceptable (user profiles, social media posts).
2. Eventual Consistency
Updates propagate through the system over time, allowing temporary inconsistencies.
classReadYourWritesCache: def__init__(self): self.redis = redis.Redis(host='localhost', port=6379, db=0) self.user_versions = {} # Track user-specific versions defwrite_user_data(self, user_id: int, data: Dict, session_id: str): # Increment version for this user version = self.redis.incr(f"user_version:{user_id}") # Store data with version cache_key = f"user:{user_id}" versioned_data = {**data, "_version": version, "_updated_by": session_id} # Write to cache and database self.redis.setex(cache_key, 3600, json.dumps(versioned_data)) self.update_database(user_id, data) # Track version for this session self.user_versions[session_id] = version defread_user_data(self, user_id: int, session_id: str) -> Dict: cache_key = f"user:{user_id}" cached_data = self.redis.get(cache_key) if cached_data: data = json.loads(cached_data) cached_version = data.get("_version", 0) expected_version = self.user_versions.get(session_id, 0) # Ensure user sees their own writes if cached_version >= expected_version: return data # Fallback to database for consistency returnself.fetch_from_database(user_id)
import asyncio from concurrent.futures import ThreadPoolExecutor
classCacheWarmer: def__init__(self, redis_client, batch_size=100): self.redis = redis_client self.batch_size = batch_size asyncdefwarm_user_cache(self, user_ids: List[int]): """Warm cache for multiple users concurrently""" asyncdefwarm_single_user(user_id: int): try: user_data = awaitself.fetch_user_from_db(user_id) if user_data: cache_key = f"user:{user_id}" self.redis.setex( cache_key, 3600, json.dumps(user_data) ) returnTrue except Exception as e: print(f"Failed to warm cache for user {user_id}: {e}") returnFalse # Process in batches to avoid overwhelming the system for i inrange(0, len(user_ids), self.batch_size): batch = user_ids[i:i + self.batch_size] tasks = [warm_single_user(uid) for uid in batch] results = await asyncio.gather(*tasks, return_exceptions=True) success_count = sum(1for r in results if r isTrue) print(f"Warmed {success_count}/{len(batch)} cache entries") # Small delay between batches await asyncio.sleep(0.1) defwarm_on_startup(self): """Warm cache with most accessed data on application startup""" popular_users = self.get_popular_user_ids() asyncio.run(self.warm_user_cache(popular_users))
2. Multi-Level Caching
Implement multiple cache layers for optimal performance.
graph TD
A[Application] --> B[L1 Cache - Local Memory]
B --> C[L2 Cache - Redis]
C --> D[L3 Cache - CDN]
D --> E[Database]
style B fill:#e1f5fe
style C fill:#f3e5f5
style D fill:#e8f5e8
style E fill:#fff3e0
π― Interview Insight: Multi-level caching questions often focus on cache coherence. Discuss strategies for maintaining consistency across levels and the trade-offs between complexity and performance.
classTagBasedInvalidation: def__init__(self): self.redis = redis.Redis(host='localhost', port=6379, db=0) defset_with_tags(self, key: str, value: any, tags: List[str], ttl: int = 3600): """Store data with associated tags for bulk invalidation""" # Store the actual data self.redis.setex(key, ttl, json.dumps(value)) # Associate key with tags for tag in tags: tag_key = f"tag:{tag}" self.redis.sadd(tag_key, key) self.redis.expire(tag_key, ttl + 300) # Tags live longer than data definvalidate_by_tag(self, tag: str): """Invalidate all cache entries associated with a tag""" tag_key = f"tag:{tag}" # Get all keys associated with this tag keys_to_invalidate = self.redis.smembers(tag_key) if keys_to_invalidate: # Delete all associated keys self.redis.delete(*keys_to_invalidate) # Clean up tag associations for key in keys_to_invalidate: self._remove_key_from_all_tags(key.decode()) # Remove the tag itself self.redis.delete(tag_key) def_remove_key_from_all_tags(self, key: str): """Remove a key from all tag associations""" # This could be expensive - consider background cleanup tag_pattern = "tag:*" for tag_key inself.redis.scan_iter(match=tag_pattern): self.redis.srem(tag_key, key)
# Usage example cache = TagBasedInvalidation()
# Store user data with tags user_data = {"name": "John", "department": "Engineering"} cache.set_with_tags( key="user:123", value=user_data, tags=["user", "department:engineering", "active_users"] )
# Invalidate all engineering department data cache.invalidate_by_tag("department:engineering")
π― Interview Insight: Tag-based invalidation is a sophisticated pattern. Discuss the trade-offs between granular control and storage overhead. Mention alternatives like dependency graphs for complex invalidation scenarios.
import threading from collections import defaultdict, deque import time
classHotKeyDetector: def__init__(self, threshold=100, window_seconds=60): self.redis = Redis(host='localhost', port=6379, db=0) self.threshold = threshold self.window_seconds = window_seconds # Track key access patterns self.access_counts = defaultdict(deque) self.lock = threading.RLock() # Hot key mitigation strategies self.hot_keys = set() self.local_cache = {} # Local caching for hot keys deftrack_access(self, key: str): """Track key access for hot key detection""" current_time = time.time() withself.lock: # Add current access self.access_counts[key].append(current_time) # Remove old accesses outside the window cutoff_time = current_time - self.window_seconds while (self.access_counts[key] and self.access_counts[key][0] < cutoff_time): self.access_counts[key].popleft() # Check if key is hot iflen(self.access_counts[key]) > self.threshold: if key notinself.hot_keys: self.hot_keys.add(key) self._handle_hot_key(key) defget_with_hot_key_handling(self, key: str): """Get data with hot key optimization""" self.track_access(key) # If it's a hot key, try local cache first if key inself.hot_keys: local_data = self.local_cache.get(key) if local_data and local_data['expires'] > time.time(): return local_data['value'] # Get from Redis data = self.redis.get(key) # Cache locally if hot key if key inself.hot_keys and data: self.local_cache[key] = { 'value': data, 'expires': time.time() + 30# Short local cache TTL } return data def_handle_hot_key(self, key: str): """Implement hot key mitigation strategies""" # Strategy 1: Add local caching print(f"Hot key detected: {key} - enabling local caching") # Strategy 2: Create multiple copies with random distribution original_data = self.redis.get(key) if original_data: for i inrange(3): # Create 3 copies copy_key = f"{key}:copy:{i}" self.redis.setex(copy_key, 300, original_data) # 5 min TTL # Strategy 3: Use read replicas (if available) # This would involve routing reads to replica nodes defget_distributed_hot_key(self, key: str): """Get hot key data using distribution strategy""" if key notinself.hot_keys: returnself.redis.get(key) # Random selection from copies import random copy_index = random.randint(0, 2) copy_key = f"{key}:copy:{copy_index}" data = self.redis.get(copy_key) ifnot data: # Fallback to original data = self.redis.get(key) return data
π― Interview Insight: Hot key problems are common in production. Discuss identification techniques (monitoring access patterns), mitigation strategies (local caching, key distribution), and prevention approaches (better key design, load balancing).
# Find large keys large_keys = debugger.find_large_keys(threshold_bytes=10240) # 10KB threshold
# Check connection pool pool_stats = debugger.connection_pool_stats() print(f"Connection pool stats: {pool_stats}")
π― Interview Insight: Debugging questions often focus on production issues. Discuss tools like Redis MONITOR (and its performance impact), MEMORY USAGE command, and the importance of having proper monitoring in place before issues occur.
π― Interview Insight: Security questions often cover data encryption, session management, and rate limiting. Discuss the balance between security and performance, and mention compliance requirements (GDPR, HIPAA) that might affect caching strategies.
3. Operational Excellence
class RedisOperationalExcellence:
def __init__(self):
self.redis = Redis(host='localhost', port=6379, db=0)
self.backup_location = '/var/backups/redis'
def automated_backup(self):
"""Automated backup with rotation"""
import subprocess
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"{self.backup_location}/redis_backup_{timestamp}.rdb"
try:
# Trigger background save
self.redis.bgsave()
# Wait for background save to complete
while self.redis.lastsave() == self.redis.lastsave():
time.sleep(1)
# Copy RDB file
subprocess.run([
'cp', '/var/lib/redis/dump.rdb', backup_file
], check=True)
# Compress backup
subprocess.run([
'gzip', backup_file
], check=True)
# Cleanup old backups (keep last 7 days)
self._cleanup_old_backups()
print(f"Backup completed: {backup_file}.gz")
except Exception as e:
print(f"Backup failed: {e}")
# Send alert to monitoring system
self._send_alert("Redis backup failed", str(e))
def _cleanup_old_backups(self):
"""Remove backups older than 7 days"""
import os
import glob
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=7)
pattern = f"{self.backup_location}/redis_backup_*.rdb.gz"
for backup_file in glob.glob(pattern):
file_time = datetime.fromtimestamp(os.path.getctime(backup_file))
if file_time < cutoff_date:
os.remove(backup_file)
print(f"Removed old backup: {backup_file}")
def capacity_planning_analysis(self) -> Dict:
"""Analyze Redis usage for capacity planning"""
info = self.redis.info()
# Memory analysis
used_memory = info['used_memory']
used_memory_peak = info['used_memory_peak']
max_memory = info.get('maxmemory', 0)
# Connection analysis
connected_clients = info['connected_clients']
# Key analysis
total_keys = sum(info.get(f'db{i}', {}).get('keys', 0) for i in range(16))
# Performance metrics
ops_per_sec = info.get('instantaneous_ops_per_sec', 0)
# Calculate trends (simplified - in production, use time series data)
memory_growth_rate = self._calculate_memory_growth_rate()
recommendations = []
# Memory recommendations
if max_memory > 0:
memory_usage_pct = (used_memory / max_memory) * 100
if memory_usage_pct > 80:
recommendations.append("Memory usage is high - consider scaling up")
# Connection recommendations
if connected_clients > 1000:
recommendations.append("High connection count - review connection pooling")
# Performance recommendations
if ops_per_sec > 100000:
recommendations.append("High operation rate - consider read replicas")
return {
'memory': {
'used_bytes': used_memory,
'used_human': info['used_memory_human'],
'peak_bytes': used_memory_peak,
'peak_human': info['used_memory_peak_human'],
'max_bytes': max_memory,
'usage_percentage': (used_memory / max_memory * 100) if max_memory > 0 else 0,
'growth_rate_mb_per_day': memory_growth_rate
},
'connections': {
'current': connected_clients,
'max_input': info.get('maxclients', 'unlimited')
},
'keys': {
'total': total_keys,
'expired': info.get('expired_keys', 0),
'evicted': info.get('evicted_keys', 0)
},
'performance': {
'ops_per_second': ops_per_sec,
'keyspace_hits': info.get('keyspace_hits', 0),
'keyspace_misses': info.get('keyspace_misses', 0),
'hit_rate': self._calculate_hit_rate(info)
},
'recommendations': recommendations
}
def _calculate_memory_growth_rate(self) -> float:
"""Calculate memory growth rate (simplified)"""
# In production, this would analyze historical data
# For demo purposes, return a placeholder
return 50.0