Distributed Coordination
Learn how to use Redis for distributed job coordination, locking, and synchronization in TickerQ.
Overview
Redis provides robust distributed coordination capabilities for TickerQ, enabling:
- Distributed Locking - Prevent concurrent job execution
- Job Coordination - Synchronize job execution across multiple instances
- Leader Election - Coordinate singleton jobs
- State Synchronization - Share job state across instances
Distributed Locking
Basic Distributed Lock
csharp
public class EmailService
{
private readonly IDistributedLockProvider _lockProvider;
public async Task SendBulkEmailsAsync()
{
using var lockHandle = await _lockProvider.AcquireLockAsync(
"bulk-email-job",
TimeSpan.FromMinutes(30));
// Only one instance will execute this code
await ProcessBulkEmails();
}
}Lock Configuration
csharp
builder.Services.AddTickerQ(options =>
{
options.UseRedis(redisOptions =>
{
redisOptions.LockingOptions = lockingOptions =>
{
lockingOptions.DefaultLockTimeout = TimeSpan.FromMinutes(5);
lockingOptions.LockRetryDelay = TimeSpan.FromMilliseconds(100);
lockingOptions.MaxLockRetries = 10;
lockingOptions.EnableLockExtension = true;
lockingOptions.LockExtensionInterval = TimeSpan.FromMinutes(1);
};
});
});Advanced Locking Patterns
Reentrant Locks
csharp
public class ReentrantLockExample
{
private readonly IDistributedLockProvider _lockProvider;
public async Task ProcessOrderAsync(string orderId)
{
using var lockHandle = await _lockProvider.AcquireLockAsync(
$"order:{orderId}",
TimeSpan.FromMinutes(5),
reentrant: true);
await UpdateOrderStatus(orderId);
await ProcessPayment(orderId); // Can acquire same lock
}
private async Task ProcessPayment(string orderId)
{
// This will succeed because the lock is reentrant
using var lockHandle = await _lockProvider.AcquireLockAsync(
$"order:{orderId}",
TimeSpan.FromMinutes(2),
reentrant: true);
// Process payment logic
}
}Read-Write Locks
csharp
public class ReadWriteLockExample
{
private readonly IDistributedLockProvider _lockProvider;
public async Task<Order> GetOrderAsync(string orderId)
{
using var readLock = await _lockProvider.AcquireReadLockAsync(
$"order:{orderId}",
TimeSpan.FromMinutes(1));
return await _orderRepository.GetAsync(orderId);
}
public async Task UpdateOrderAsync(Order order)
{
using var writeLock = await _lockProvider.AcquireWriteLockAsync(
$"order:{order.Id}",
TimeSpan.FromMinutes(5));
await _orderRepository.UpdateAsync(order);
}
}Job Coordination
Singleton Jobs
csharp
[SingletonJob("data-backup")]
public class DataBackupJob : IJob
{
public async Task ExecuteAsync(JobContext context)
{
// Only one instance across all servers will execute this
await BackupDatabase();
}
}Job Barriers
csharp
public class JobBarrierExample
{
private readonly IJobBarrier _jobBarrier;
public async Task ProcessBatchAsync(string batchId, int totalJobs)
{
// Wait for all jobs in the batch to complete
await _jobBarrier.WaitAsync($"batch:{batchId}", totalJobs);
// This code runs only after all batch jobs complete
await ProcessBatchResults(batchId);
}
}Job Dependencies
csharp
public class DependentJobExample
{
private readonly IJobDependencyManager _dependencyManager;
public async Task ScheduleDependentJobsAsync()
{
var job1Id = await ScheduleJob<DataExtractionJob>();
var job2Id = await ScheduleJob<DataTransformationJob>();
// This job waits for both dependencies
await ScheduleJob<DataLoadJob>(dependencies: new[] { job1Id, job2Id });
}
}Leader Election
Simple Leader Election
csharp
public class LeaderElectionService : IHostedService
{
private readonly ILeaderElection _leaderElection;
private readonly ILogger<LeaderElectionService> _logger;
private CancellationTokenSource _cancellationTokenSource;
public async Task StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = new CancellationTokenSource();
_ = Task.Run(async () =>
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
using var leadership = await _leaderElection.AcquireLeadershipAsync(
"job-scheduler",
TimeSpan.FromMinutes(1));
_logger.LogInformation("Acquired leadership");
// Execute leader-only tasks
await ExecuteLeaderTasks();
}
catch (LeadershipLostException)
{
_logger.LogInformation("Leadership lost");
}
await Task.Delay(TimeSpan.FromSeconds(30), _cancellationTokenSource.Token);
}
});
}
private async Task ExecuteLeaderTasks()
{
// Schedule recurring jobs
// Clean up expired data
// Send system notifications
}
}Leader Election with Callbacks
csharp
builder.Services.AddTickerQ(options =>
{
options.UseRedis(redisOptions =>
{
redisOptions.LeaderElectionOptions = leaderOptions =>
{
leaderOptions.LeadershipTimeout = TimeSpan.FromMinutes(1);
leaderOptions.RenewalInterval = TimeSpan.FromSeconds(30);
leaderOptions.OnLeadershipAcquired = async (context) =>
{
// Start leader-only services
await context.ServiceProvider.GetService<IJobScheduler>().StartAsync();
};
leaderOptions.OnLeadershipLost = async (context) =>
{
// Stop leader-only services
await context.ServiceProvider.GetService<IJobScheduler>().StopAsync();
};
};
});
});State Synchronization
Shared State Management
csharp
public class SharedStateExample
{
private readonly IDistributedState _distributedState;
public async Task UpdateJobProgressAsync(string jobId, int progress)
{
await _distributedState.SetAsync($"progress:{jobId}", progress);
// Notify other instances of progress update
await _distributedState.PublishAsync("job-progress-updated", new
{
JobId = jobId,
Progress = progress
});
}
public async Task<int> GetJobProgressAsync(string jobId)
{
return await _distributedState.GetAsync<int>($"progress:{jobId}");
}
}Event Synchronization
csharp
public class EventSynchronizationExample
{
private readonly IDistributedEventBus _eventBus;
public async Task PublishJobCompletedEventAsync(string jobId)
{
await _eventBus.PublishAsync("job-completed", new JobCompletedEvent
{
JobId = jobId,
CompletedAt = DateTime.UtcNow
});
}
public async Task SubscribeToJobEventsAsync()
{
await _eventBus.SubscribeAsync<JobCompletedEvent>("job-completed", async (evt) =>
{
// Handle job completion across all instances
await HandleJobCompletion(evt);
});
}
}Coordination Patterns
Producer-Consumer Pattern
csharp
public class ProducerConsumerExample
{
private readonly IDistributedQueue<WorkItem> _workQueue;
// Producer
public async Task ProduceWorkAsync(IEnumerable<WorkItem> items)
{
foreach (var item in items)
{
await _workQueue.EnqueueAsync(item);
}
}
// Consumer
public async Task ConsumeWorkAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var item = await _workQueue.DequeueAsync(TimeSpan.FromSeconds(30));
if (item != null)
{
await ProcessWorkItem(item);
}
}
}
}Work Stealing Pattern
csharp
public class WorkStealingExample
{
private readonly IWorkStealingQueue _workQueue;
public async Task ProcessWorkAsync(string workerId)
{
while (true)
{
// Try to get work from own queue first
var work = await _workQueue.TryDequeueAsync(workerId);
if (work == null)
{
// Steal work from other workers
work = await _workQueue.StealWorkAsync(workerId);
}
if (work != null)
{
await ProcessWork(work);
}
else
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}
}Circuit Breaker Pattern
csharp
public class CircuitBreakerExample
{
private readonly IDistributedCircuitBreaker _circuitBreaker;
public async Task<T> ExecuteWithCircuitBreakerAsync<T>(
string operationName,
Func<Task<T>> operation)
{
return await _circuitBreaker.ExecuteAsync(operationName, operation);
}
}
// Configuration
redisOptions.CircuitBreakerOptions = circuitBreakerOptions =>
{
circuitBreakerOptions.FailureThreshold = 5;
circuitBreakerOptions.RecoveryTimeout = TimeSpan.FromMinutes(1);
circuitBreakerOptions.SamplingDuration = TimeSpan.FromMinutes(5);
};Coordination Monitoring
Lock Monitoring
csharp
public class LockMonitoringService
{
private readonly ILockMonitor _lockMonitor;
public async Task<LockStatistics> GetLockStatisticsAsync()
{
return await _lockMonitor.GetStatisticsAsync();
}
public async Task<IEnumerable<ActiveLock>> GetActiveLocks()
{
return await _lockMonitor.GetActiveLocksAsync();
}
public async Task ReleaseStaleLocks()
{
var staleLocks = await _lockMonitor.GetStaleLocksAsync();
foreach (var staleLock in staleLocks)
{
await _lockMonitor.ReleaseLockAsync(staleLock.Key);
}
}
}Coordination Health Checks
csharp
public class CoordinationHealthCheck : IHealthCheck
{
private readonly IDistributedLockProvider _lockProvider;
private readonly ILeaderElection _leaderElection;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
// Test lock acquisition
using var testLock = await _lockProvider.AcquireLockAsync(
"health-check-lock",
TimeSpan.FromSeconds(5));
// Test leader election
var leaderInfo = await _leaderElection.GetLeaderInfoAsync("test-election");
return HealthCheckResult.Healthy("Coordination services are healthy");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Coordination services are unhealthy", ex);
}
}
}Performance Optimization
Lock Optimization
csharp
redisOptions.LockingOptions = lockingOptions =>
{
// Use Lua scripts for atomic operations
lockingOptions.UseLuaScripts = true;
// Optimize lock acquisition
lockingOptions.LockRetryDelay = TimeSpan.FromMilliseconds(50);
lockingOptions.MaxLockRetries = 20;
// Enable lock pooling
lockingOptions.EnableLockPooling = true;
lockingOptions.LockPoolSize = 100;
};Coordination Batching
csharp
redisOptions.CoordinationOptions = coordinationOptions =>
{
coordinationOptions.EnableBatching = true;
coordinationOptions.BatchSize = 50;
coordinationOptions.BatchTimeout = TimeSpan.FromMilliseconds(100);
};Troubleshooting
Common Issues
Lock Contention
csharp
// Monitor lock contention
public class LockContentionMonitor
{
public async Task MonitorLockContentionAsync()
{
var contentionMetrics = await _lockMonitor.GetContentionMetricsAsync();
if (contentionMetrics.AverageWaitTime > TimeSpan.FromSeconds(5))
{
// High contention detected
_logger.LogWarning("High lock contention detected: {Metrics}", contentionMetrics);
}
}
}Split-Brain Prevention
csharp
redisOptions.LeaderElectionOptions = leaderOptions =>
{
leaderOptions.RequireQuorum = true;
leaderOptions.MinimumNodes = 3;
leaderOptions.HeartbeatInterval = TimeSpan.FromSeconds(10);
};Debugging Tools
csharp
// Enable coordination debugging
redisOptions.DebuggingOptions = debuggingOptions =>
{
debuggingOptions.EnableLockTracing = true;
debuggingOptions.EnableLeaderElectionLogging = true;
debuggingOptions.LogCoordinationEvents = true;
};Next Steps
- Integration Guide - Integrate with your application
