本文将详细讲解EF Core与MySQL的事务和并发处理,分为三个部分:使用事务、处理并发冲突(乐观并发)以及悲观并发(MySQL中使用锁)。
-
使用事务
在EF Core中,可以使用事务来确保一系列操作要么全部成功,要么全部失败。EF Core支持多种事务管理方式,包括自动事务(SaveChanges自动包装事务)和显式事务。
-
处理并发冲突(乐观并发)
乐观并发假设多个事务很少冲突,因此不会立即锁定资源。而是在更新时检查数据是否被其他事务修改过。EF Core通过配置并发令牌(Concurrency Token)来实现乐观并发。
-
悲观并发(MySQL中使用锁)
悲观并发假设冲突经常发生,因此在读取数据时就会锁定资源,防止其他事务修改。在MySQL中,可以通过使用事务和SELECT ... FOR UPDATE语句来实现悲观并发。
1. 使用事务
基本事务用法
// 使用DbContext的Database.BeginTransaction()
using (var transaction = context.Database.BeginTransaction())
{
try
{
// 执行多个数据库操作
var product = new Product { Name = "New Product", Price = 99.99m, Stock = 10 };
context.Products.Add(product);
var order = new Order { CustomerId = 1, OrderDate = DateTime.Now };
context.Orders.Add(order);
// 保存更改
context.SaveChanges();
// 提交事务
transaction.Commit();
}
catch (Exception)
{
// 回滚事务
transaction.Rollback();
throw;
}
}
使用TransactionScope(分布式事务)
// 需要安装System.Transactions包
using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
try
{
// 多个数据库操作
using (var context1 = new ApplicationDbContext())
using (var context2 = new ApplicationDbContext())
{
context1.Products.Add(new Product { Name = "Product 1", Price = 10.99m });
context2.Orders.Add(new Order { CustomerId = 1, OrderDate = DateTime.Now });
await context1.SaveChangesAsync();
await context2.SaveChangesAsync();
}
// 完成事务
scope.Complete();
}
catch (Exception)
{
// 事务会自动回滚
throw;
}
}
设置事务隔离级别
using (var transaction = context.Database.BeginTransaction(IsolationLevel.ReadCommitted))
{
try
{
// 执行操作
context.Products.Add(new Product { Name = "New Product", Price = 99.99m });
context.SaveChanges();
transaction.Commit();
}
catch (Exception)
{
transaction.Rollback();
throw;
}
}
异步事务处理
public async Task<bool> ProcessOrderAsync(int productId, int quantity)
{
using (var transaction = await context.Database.BeginTransactionAsync())
{
try
{
// 检查库存
var product = await context.Products
.FirstOrDefaultAsync(p => p.Id == productId);
if (product == null || product.Stock < quantity)
return false;
// 减少库存
product.Stock -= quantity;
context.Products.Update(product);
// 创建订单
var order = new Order
{
ProductId = productId,
Quantity = quantity,
OrderDate = DateTime.Now
};
context.Orders.Add(order);
await context.SaveChangesAsync();
await transaction.CommitAsync();
return true;
}
catch (Exception)
{
await transaction.RollbackAsync();
throw;
}
}
}
2. 处理并发冲突(乐观并发)
配置并发令牌
// 在实体中定义并发令牌属性
public class Product
{
public int Id { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
public int Stock { get; set; }
// 使用时间戳作为并发令牌
1777992846
public byte[] Version { get; set; }
// 或者使用自定义的并发令牌
// public uint RowVersion { get; set; }
}
// 在DbContext中配置并发令牌
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Product>()
.Property(p => p.Version)
.IsRowVersion()
.IsConcurrencyToken();
// 或者使用非时间戳字段作为并发令牌
modelBuilder.Entity<Product>()
.Property(p => p.Name)
.IsConcurrencyToken();
}
处理并发异常
public async Task<bool> UpdateProductAsync(Product updatedProduct)
{
try
{
context.Products.Update(updatedProduct);
await context.SaveChangesAsync();
return true;
}
catch (DbUpdateConcurrencyException ex)
{
// 处理并发冲突
foreach (var entry in ex.Entries)
{
if (entry.Entity is Product)
{
// 获取数据库中的当前值
var databaseValues = await entry.GetDatabaseValuesAsync();
if (databaseValues == null)
{
// 实体已被删除
return false;
}
// 转换为实体
var databaseProduct = (Product)databaseValues.ToObject();
// 决定如何解决冲突
// 选项1: 使用客户端值
// entry.OriginalValues.SetValues(databaseValues);
// await context.SaveChangesAsync();
// 选项2: 使用数据库值
// entry.CurrentValues.SetValues(databaseValues);
// 选项3: 合并值
var currentValues = entry.CurrentValues;
var resolvedValues = currentValues.Clone();
// 合并逻辑示例
resolvedValues["Name"] = currentValues["Name"]; // 保留新名称
resolvedValues["Price"] = databaseValues["Price"]; // 使用数据库价格
resolvedValues["Stock"] = Math.Max(
(int)currentValues["Stock"],
(int)databaseValues["Stock"]); // 使用较大库存
// 设置解析后的值
entry.OriginalValues.SetValues(databaseValues);
entry.CurrentValues.SetValues(resolvedValues);
// 重试保存
await context.SaveChangesAsync();
return true;
}
}
return false;
}
}
自定义并发冲突解决策略
public class ConcurrencyHandler
{
public static async Task<bool> ResolveConcurrencyAsync(
DbUpdateConcurrencyException ex,
ApplicationDbContext context)
{
var saved = false;
foreach (var entry in ex.Entries)
{
// 获取数据库当前值
var databaseValues = await entry.GetDatabaseValuesAsync();
if (databaseValues == null)
{
// 实体已被删除
entry.State = EntityState.Detached;
continue;
}
// 获取原始值和当前值
var originalValues = entry.OriginalValues;
var currentValues = entry.CurrentValues;
// 根据实体类型应用不同的解决策略
if (entry.Entity is Product product)
{
await ResolveProductConcurrency(entry, databaseValues);
}
else if (entry.Entity is Order order)
{
await ResolveOrderConcurrency(entry, databaseValues);
}
}
// 重试保存
try
{
await context.SaveChangesAsync();
saved = true;
}
catch (DbUpdateConcurrencyException)
{
// 如果再次失败,可能需要递归处理或放弃
}
return saved;
}
private static async Task ResolveProductConcurrency(
EntityEntry entry,
PropertyValues databaseValues)
{
var databaseProduct = (Product)databaseValues.ToObject();
var currentProduct = (Product)entry.Entity;
// 解决策略:保留较高的库存值,其他字段使用最新值
var resolvedStock = Math.Max(currentProduct.Stock, databaseProduct.Stock);
// 更新实体
entry.OriginalValues.SetValues(databaseValues);
currentProduct.Stock = resolvedStock;
}
}
3. 悲观并发(MySQL中使用锁)
使用SELECT ... FOR UPDATE
// 使用原始SQL进行行级锁定
public async Task<Product> GetProductWithLockAsync(int productId)
{
// 开始事务
using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
{
try
{
// 使用SELECT ... FOR UPDATE锁定行
var product = await context.Products
.FromSqlRaw("SELECT * FROM Products WHERE Id = {0} FOR UPDATE", productId)
.AsNoTracking()
.FirstOrDefaultAsync();
// 执行需要原子性的操作
// ...
await transaction.CommitAsync();
return product;
}
catch (Exception)
{
await transaction.RollbackAsync();
throw;
}
}
}
使用EF Core的锁机制
// 在EF Core中模拟行级锁
public async Task<bool> ReserveProductAsync(int productId, int quantity)
{
using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
{
try
{
// 获取产品并锁定行
var product = await context.Products
.FirstOrDefaultAsync(p => p.Id == productId);
if (product == null || product.Stock < quantity)
{
await transaction.RollbackAsync();
return false;
}
// 更新库存
product.Stock -= quantity;
await context.SaveChangesAsync();
await transaction.CommitAsync();
return true;
}
catch (Exception)
{
await transaction.RollbackAsync();
throw;
}
}
}
处理锁超时
// 在MySQL连接字符串中设置锁等待超时
var connectionString = "server=localhost;database=efcoredb;user=root;password=yourpassword;" +
"Connection Timeout=30;Default Command Timeout=30;";
// 或者在DbContext中配置
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString),
options => options.EnableRetryOnFailure().CommandTimeout(30));
}
// 处理锁超时异常
public async Task<bool> TryUpdateWithLockAsync(int productId, Action<Product> updateAction)
{
const int maxRetries = 3;
for (int attempt = 0; attempt < maxRetries; attempt++)
{
using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
{
try
{
// 获取并锁定产品
var product = await context.Products
.FirstOrDefaultAsync(p => p.Id == productId);
if (product == null)
return false;
// 执行更新操作
updateAction(product);
await context.SaveChangesAsync();
await transaction.CommitAsync();
return true;
}
catch (MySqlException ex) when (ex.Number == 1205) // Lock wait timeout
{
// 锁等待超时,重试
await transaction.RollbackAsync();
if (attempt == maxRetries - 1)
throw;
// 指数退避策略
await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt)));
}
catch (Exception)
{
await transaction.RollbackAsync();
throw;
}
}
}
return false;
}
表级锁定
// 使用表级锁定(谨慎使用,影响性能)
public async Task<bool> PerformMaintenanceAsync()
{
using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
{
try
{
// 锁定整个表
await context.Database.ExecuteSqlRawAsync("LOCK TABLES Products WRITE");
// 执行维护操作
await context.Database.ExecuteSqlRawAsync("UPDATE Products SET Price = Price * 1.1");
// 解锁表
await context.Database.ExecuteSqlRawAsync("UNLOCK TABLES");
await transaction.CommitAsync();
return true;
}
catch (Exception)
{
await context.Database.ExecuteSqlRawAsync("UNLOCK TABLES");
await transaction.RollbackAsync();
throw;
}
}
}
4. 高级并发控制模式
使用版本号实现乐观并发
public class VersionedEntity
{
public int Id { get; set; }
public uint Version { get; set; } // 使用无符号整数作为版本号
}
// 配置版本号并发令牌
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Product>()
.Property(p => p.Version)
.IsConcurrencyToken()
.ValueGeneratedOnAddOrUpdate();
}
// 在更新时自动增加版本号
public override int SaveChanges()
{
var entries = ChangeTracker.Entries()
.Where(e => e.State == EntityState.Modified || e.State == EntityState.Added);
foreach (var entry in entries)
{
if (entry.Entity is VersionedEntity entity)
{
entity.Version++;
}
}
return base.SaveChanges();
}
使用自定义锁机制
// 实现简单的应用级锁机制
public class ApplicationLockService
{
private static readonly ConcurrentDictionary<string, SemaphoreSlim> Locks =
new ConcurrentDictionary<string, SemaphoreSlim>();
public async Task<T> ExecuteWithLockAsync<T>(string lockKey, Func<Task<T>> operation)
{
var lockObject = Locks.GetOrAdd(lockKey, key => new SemaphoreSlim(1, 1));
await lockObject.WaitAsync();
try
{
return await operation();
}
finally
{
lockObject.Release();
}
}
}
// 使用应用级锁
public async Task<bool> UpdateProductSafelyAsync(int productId, Action<Product> updateAction)
{
var lockService = new ApplicationLockService();
var lockKey = $"product_{productId}";
return await lockService.ExecuteWithLockAsync(lockKey, async () =>
{
using (var context = new ApplicationDbContext())
{
var product = await context.Products.FindAsync(productId);
if (product == null) return false;
updateAction(product);
await context.SaveChangesAsync();
return true;
}
});
}
5. 监控和诊断并发问题
记录并发冲突
public class ConcurrencyLogger
{
private readonly ILogger<ConcurrencyLogger> _logger;
public ConcurrencyLogger(ILogger<ConcurrencyLogger> logger)
{
_logger = logger;
}
public void LogConcurrencyConflict(DbUpdateConcurrencyException ex, string operation)
{
_logger.LogWarning("并发冲突发生在操作: {Operation}", operation);
foreach (var entry in ex.Entries)
{
var databaseValues = entry.GetDatabaseValues();
var currentValues = entry.CurrentValues;
var originalValues = entry.OriginalValues;
_logger.LogWarning("实体: {EntityType}", entry.Metadata.Name);
_logger.LogWarning("数据库值: {DatabaseValues}", databaseValues?.Properties);
_logger.LogWarning("当前值: {CurrentValues}", currentValues?.Properties);
_logger.LogWarning("原始值: {OriginalValues}", originalValues?.Properties);
}
}
}
性能计数器监控
// 监控并发冲突率
public class ConcurrencyMonitor
{
private long _totalOperations;
private long _concurrencyExceptions;
public void RecordOperation(bool hadConcurrencyException = false)
{
Interlocked.Increment(ref _totalOperations);
if (hadConcurrencyException)
{
Interlocked.Increment(ref _concurrencyExceptions);
}
}
public double GetConcurrencyExceptionRate()
{
var total = Interlocked.Read(ref _totalOperations);
var exceptions = Interlocked.Read(ref _concurrencyExceptions);
return total > 0 ? (double)exceptions / total : 0;
}
}
总结
本文详细介绍了EF Core与MySQL中的事务和并发处理,包括:
- 事务管理:
-
-
基本事务用法和TransactionScope
-
设置事务隔离级别
-
异步事务处理
-
乐观并发控制:
-
悲观并发控制:
-
高级并发模式:
-
版本号实现乐观并发
-
应用级锁机制
-
监控和诊断并发问题
在实际应用中,应根据具体场景选择合适的并发控制策略:
正确实施事务和并发控制是构建高并发、高可用应用程序的关键。建议在生产环境中进行充分的压力测试,确保并发控制策略能够满足实际需求。