从c#到SQL Server的批量插入策略

在我们当前的项目中,客户将向我们的系统发送复杂/嵌套消息的集合。 这些消息的频率约为。 1000-2000 msg /每秒。

这些复杂对象包含事务数据(要添加)以及主数据(如果未找到则将添加)。 但客户不是传递主数据的ID,而是传递“名称”列。

系统检查这些名称是否存在主数据。 如果找到,它将使用数据库中的ID,否则首先创建此主数据,然后使用这些ID。

解析主数据ID后,系统会将事务数据插入SQL Server数据库(使用主数据ID)。 每条消息的主实体数量约为15-20。

以下是我们可以采用的一些策略。

  1. 我们可以首先从C#代码中解析master ID(如果没有找到则插入主数据)并将这些ID存储在C#cache中。 解决所有ID后,我们可以使用SqlBulkCopy类批量插入事务数据。 我们可以访问数据库15次以获取不同实体的ID,然后再次命中数据库以插入最终数据。 我们可以使用相同的连接在完成所有这些处理后关闭它。

  2. 我们可以将包含主数据和事务数据的所有这些消息一次性发送到数据库(以多个TVP的forms),然后在内部存储过程中,首先为缺失的数据创建主数据,然后插入事务数据。

有人可以建议这个用例的最佳方法吗?

由于一些隐私问题,我无法分享实际的对象结构。 但这是假设的对象结构,它非常接近我们的业务对象

一条此类消息将包含有关一个产品(其主数据)的信息以及来自不同供应商的价格详细信息(交易数据):

主数据(如果未找到则需要添加)

产品名称:ABC,ProductCateory:XYZ,制造商:XXX和其他一些细节(属性数量在15-20范围内)。

交易数据(将始终添加)

供应商名称:A,ListPrice:XXX,折扣:XXX

供应商名称:B,ListPrice:XXX,折扣:XXX

供应商名称:C,ListPrice:XXX,折扣:XXX

供应商名称:D,ListPrice:XXX,折扣:XXX

对于属于一个产品的消息,大多数有关主数据的信息将保持不变(并且将更改频率更低),但交易数据将始终波动。 因此,系统将检查系统中是否存在产品“XXX”。 如果没有,请检查本产品中提到的“类别”是否存在。 如果没有,它将为类别插入新记录,然后为产品插入新记录。 这将针对制造商和其他主数据进行。

多个供应商将同时发送有关多个产品(2000-5000)的数据。

因此,假设我们有1000个供应商,每个供应商都在发送大约10-15种不同产品的数据。 每2-3秒后,每个供应商都会向我们发送这10个产品的价格更新。 他可能会开始发送有关新产品的数据,但这种情况并不常见。

你可能最好用你的#2想法(即使用多个TVP一次性将所有15-20个实体发送到数据库,并处理整组最多2000条消息)。

在应用层缓存主数据查找并在发送到数据库之前进行翻译听起来很棒,但是错过了一些东西:

  1. 无论如何,您将不得不点击数据库以获取初始列表
  2. 无论如何,您将不得不点击数据库以插入新条目
  3. 查找字典中的值以替换为ID 正是数据库的作用(假设每个这些名称到ID查找都使用非聚集索引)
  4. 经常查询的值会将其数据页缓存在缓冲池中(这一个内存缓存)

为什么在应用层重复现在在DB层提供和发生的内容 ,特别是给出:

  • 15-20个实体可以有多达20k的记录(这是一个相对较小的数字,特别是当考虑到非聚集索引只需要两个字段时: NameID可以在使用时将许多行打包到单个数据页中100%填充因子)。
  • 并非所有20k条目都是“活动”或“当前”,因此您无需担心缓存所有条目。 因此,无论当前值是什么值都将被轻易识别为被查询的值,并且那些数据页(可能包括一些非活动条目,但在那里没什么大不了)将被缓存在缓冲池中。

因此,您无需担心老条目的老化或由于可能更改的值(即特定ID更新Name )而导致任何密钥到期或重新加载,因为这是自然处理的。

是的,内存中缓存是一种很棒的技术,可以大大加快网站的速度,但这些场景/用例是指非数据库进程在纯粹的只读目的中反复请求相同的数据。 但是这种特殊情况是合并数据并且查找值列表可能频繁更改(更多因为新条目而不是更新条目)。


总而言之,选项#2是要走的路。 虽然没有15个TVP,但我已经多次成功完成了这项技术。 可能需要对方法进行一些优化/调整以调整这种特定情况,但我发现效果很好的是:

  • 通过TVP接受数据。 我比SqlBulkCopy更喜欢这个,因为:
    • 它使得一个易于自包含的存储过程成为可能
    • 它非常适合应用程序代码,可以将集合完全流式传输到数据库,而无需首先将集合复制到DataTable ,这会复制集合,这会浪费CPU和内存。 这要求您为每个返回IEnumerable的集合创建一个方法,接受集合作为输入,并使用yield return; 发送forforeach循环中的每条记录。
  • TVP不适合统计,因此不适合JOINing(尽管可以通过在查询中使用TOP (@RecordCount)来减轻这种情况),但无论如何你都不需要担心,因为它们只用于填充具有任何缺失值的真实表
  • 第1步:为每个实体插入缺少的名称。 请记住,每个实体的[Name]字段都应该有一个NonClustered Index,并且假设该ID是Clustered Index,该值自然会成为索引的一部分,因此[Name]仅提供覆盖索引除了帮助以下操作。 并且还要记住,此客户端的任何先前执行(即大致相同的实体值)将导致这些索引的数据页保持缓存在缓冲池(即内存)中。

     ;WITH cte AS ( SELECT DISTINCT tmp.[Name] FROM @EntityNumeroUno tmp ) INSERT INTO EntityNumeroUno ([Name]) SELECT cte.[Name] FROM cte WHERE NOT EXISTS( SELECT * FROM EntityNumeroUno tab WHERE tab.[Name] = cte.[Name] ) 
  • 步骤2:在简单的INSERT...SELECT中插入所有“消息”,其中由于步骤1,查找表的数据页(即“实体”)已经缓存在缓冲池中


最后,请记住猜想/假设/有根据的猜测不能替代测试。 您需要尝试一些方法来查看哪种方法最适合您的特定情况,因为可能还有其他未共享的细节可能影响此处的“理想”。

我会说,如果消息只是插入,那么弗拉德的想法可能会更快。 我在这里描述的方法我已经在更复杂的情况下使用,需要完全同步(更新和删除),并进行了额外的validation和相关操作数据的创建(而不是查找值)。 在直接插入时使用SqlBulkCopy 可能会更快(尽管只有2000条记录,我怀疑它有什么区别,如果有的话),但这假设您直接加载到目标表(消息和查找)而不是中间/临时表(我相信Vlad的想法是将SqlBulkCopy直接发送到目标表)。 然而,如上所述,由于更新查找值的问题,使用外部高速缓存(即不是缓冲池)也更容易出错。 它可能需要更多的代码来考虑使外部缓存无效,特别是如果使用外部缓存只是稍微快一些。 需要将额外的风险/维护考虑在哪种方法总体上更好地满足您的需求。


UPDATE

根据评论中提供的信息,我们现在知道:

  • 有多个供应商
  • 每个供应商提供多种产品
  • 产品并非供应商所独有; 产品由1个或更多供应商销售
  • 产品属性是单一的
  • 定价信息具有可以包含多个记录的属性
  • 定价信息仅限INSERT(即时间点历史记录)
  • 唯一产品由SKU(或类似领域)确定
  • 一旦创建,使用现有SKU但不同属性(例如类别,制造商等)的产品将被视为同一产品 ; 差异将被忽略

考虑到所有这些,我仍然会推荐TVP,但要重新思考这种方法并使其以供应商为中心,而不是以产品为中心。 这里的假设是供应商随时发送文件。 所以当你得到一个文件时,导入它。 您提前进行的唯一查询是供应商。 这是基本布局:

  1. 似乎有理由假设您此时已经有VendorID,因为系统为什么要从未知来源导入文件?
  2. 您可以批量导入
  3. 创建一个SendRows方法:
    • 接受FileStream或允许通过文件前进的东西
    • 接受类似int BatchSize东西
    • 返回IEnumerable
    • 创建一个SqlDataRecord以匹配TVP结构
    • for循环通过FileStream直到满足BatchSize或文件中没有更多记录
    • 对数据执行任何必要的validation
    • 将数据映射到SqlDataRecord
    • 呼叫yield return;
  4. 打开文件
  5. 虽然文件中有数据
    • 调用存储过程
    • 传递VendorID
    • 传入TVP的SendRows(FileStream, BatchSize)
  6. 关闭文件
  7. 试验:
    • 在围绕FileStream循环之前打开SqlConnection,并在循环完成后关闭它
    • 打开SqlConnection,执行存储过程,并关闭FileStream循环内部的SqlConnection
  8. 试验各种BatchSize值。 从100开始,然后是200,500等。
  9. 存储过程将处理插入新产品

使用这种类型的结构,您将发送未使用的产品属性(即仅使用SKU查找现有产品)。 但是,它的扩展非常好,因为文件大小没有上限。 如果卖方发送50个产品,那很好。 如果他们发送50k产品,罚款。 如果他们发送400万个产品(这是我工作的系统,它确实处理了更新任何属性的产品信息!),那很好。 应用层或数据库层的内存不会增加,甚至可以处理1000万个产品。 导入所用的时间应随着发送的产品数量的增加而增加。


更新2
与源数据相关的新详细信息:

  • 来自Azure EventHub
  • 以C#对象的forms出现(没有文件)
  • 产品详细信息通过OP系统的API提供
  • 收集在单个队列中(只需将数据拉出插入数据库)

如果数据源是C#对象,那么我肯定会使用TVP,因为你可以通过我在第一次更新中描述的方法(即返回IEnumerable )发送它们。 针对每个供应商的价格/优惠详细信息发送一个或多个TVP,但针对单个属性属性定期输入参数。 例如:

 CREATE PROCEDURE dbo.ImportProduct ( @SKU VARCHAR(50), @ProductName NVARCHAR(100), @Manufacturer NVARCHAR(100), @Category NVARCHAR(300), @VendorPrices dbo.VendorPrices READONLY, @DiscountCoupons dbo.DiscountCoupons READONLY ) SET NOCOUNT ON; -- Insert Product if it doesn't already exist IF (NOT EXISTS( SELECT * FROM dbo.Products pr WHERE pr.SKU = @SKU ) ) BEGIN INSERT INTO dbo.Products (SKU, ProductName, Manufacturer, Category, ...) VALUES (@SKU, @ProductName, @Manufacturer, @Category, ...); END; ...INSERT data from TVPs -- might need OPTION (RECOMPILE) per each TVP query to ensure proper estimated rows 

从数据库的角度来看,没有比BULK INSERT快的东西(例如来自csv文件)。 最好是尽快批量处理所有数据,然后使用存储过程对其进行处理。

AC#层只会减慢进程,因为C#和SQL之间的所有查询都比Sql-Server可以直接处理的慢几千倍。