将最多100,000条记录插入DocumentDB的最快方法

正如标题所示,我需要以编程方式将100,000多条记录插入到DocumentDb集合中。 这些数据将在稍后用于创建报告。 我正在使用Azure Documents SDK和用于批量插入文档的存储过程(请参阅使用存储过程查询Azure documentdb批量插入 )。

以下控制台应用程序显示了我如何插入文档。

InsertDocuments生成500个测试文档以传递给存储过程。 main函数调用InsertDocuments 10次,整体插入5,000个文档。 运行此应用程序会导致每隔几秒钟插入500个文档。 如果我增加每次通话的文件数量,我就会开始收到错误并丢失文件。

任何人都可以推荐更快的方式来插入文件?

static void Main(string[] args) { Console.WriteLine("Starting..."); MainAsync().Wait(); } static async Task MainAsync() { int campaignId = 1001, count = 500; for (int i = 0; i < 10; i++) { await InsertDocuments(campaignId, (count * i) + 1, (count * i) + count); } } static async Task InsertDocuments(int campaignId, int startId, int endId) { using (DocumentClient client = new DocumentClient(new Uri(documentDbUrl), documentDbKey)) { List items = new List(); // Create x number of documents to insert for (int i = startId; i <= endId; i++) { var item = new { id = Guid.NewGuid(), campaignId = campaignId, userId = i, status = "Pending" }; items.Add(item); } var task = client.ExecuteStoredProcedureAsync("/dbs/default/colls/campaignusers/sprocs/bulkImport", new RequestOptions() { PartitionKey = new PartitionKey(campaignId) }, new { items = items }); try { await task; int insertCount = (int)task.Result.Response; Console.WriteLine("{0} documents inserted...", insertCount); } catch (Exception e) { Console.WriteLine("Error: {0}", e.Message); } } } 

将文档插入Azure DocumentDB的最快方法。 可在Github上获取样本: https : //github.com/Azure/azure-documentdb-dotnet/tree/master/samples/documentdb-benchmark

以下提示将帮助您使用.NET SDK实现最佳通过:

  • 初始化单个DocumentClient
  • 使用直接连接和TCP协议( ConnectionMode.DirectConnectionProtocol.Tcp
  • 并行使用100个任务(取决于您的硬件)
  • 将DocumentClient构造函数中的MaxConnectionLimit增加到一个较高的值,比如1000个连接
  • 打开gcServer
  • 确保您的集合具有适当的预配置吞吐量(以及良好的分区键)
  • 在同一个Azure区域中运行也会有所帮助

使用10,000 RU / s,您可以在大约50秒内插入100,000个文档(每次写入大约5个请求单位)。

使用100,000 RU / s,您可以在大约5秒内插入。 您可以通过配置吞吐量(以及非常高的插入数量,跨多个VM /工作人员的扩展插入)来尽可能快地实现此目的

Cosmos Db团队刚刚发布了批量导入和更新SDK,遗憾的是只在Framework 4.5.1中提供,但这显然会为您带来很多繁重的工作并最大限度地利用吞吐量。 看到

https://docs.microsoft.com/en-us/azure/cosmos-db/bulk-executor-overview https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk -堆执行人点网

其他方法是其他人提到的存储过程。 存储过程需要分区密钥。 此外,存储过程应在4秒内按文档结束,否则所有记录都将回滚。 请参阅下面的代码,使用python azure documentdb sdk和基于javascript的存储过程。 我已经修改了脚本并解决了很多错误,下面的代码工作正常: –

 function bulkimport2(docObject) { var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); // The count of imported docs, also used as current doc index. var count = 0; getContext().getResponse().setBody(docObject.items); //return // Validate input. //if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject); docObject.items=JSON.stringify(docObject.items) docObject.items = docObject.items.replace("\\\\r", ""); docObject.items = docObject.items.replace("\\\\n", ""); var docs = JSON.parse(docObject.items); var docsLength = docObject.items.length; if (docsLength == 0) { getContext().getResponse().setBody(0); return; } // Call the CRUD API to create a document. tryCreate(docs[count], callback, collectionLink,count); // Note that there are 2 exit conditions: // 1) The createDocument request was not accepted. // In this case the callback will not be called, we just call setBody and we are done. // 2) The callback was called docs.length times. // In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done. function tryCreate(doc, callback, collectionLink,count ) { doc=JSON.stringify(doc); if (typeof doc == "undefined") { getContext().getResponse().setBody(count); return ; } else { doc = doc.replace("\\r", ""); doc = doc.replace("\\n", ""); doc=JSON.parse(doc); } getContext().getResponse().setBody(doc); var isAccepted = collection.upsertDocument(collectionLink, doc, callback); // If the request was accepted, callback will be called. // Otherwise report current count back to the client, // which will call the script again with remaining set of docs. // This condition will happen when this stored procedure has been running too long // and is about to get cancelled by the server. This will allow the calling client // to resume this batch from the point we got to before isAccepted was set to false if (!isAccepted) { getContext().getResponse().setBody(count); } } // This is called when collection.createDocument is done and the document has been persisted. function callback(err, doc, options) { if (err) throw getContext().getResponse().setBody(err + doc); // One more document has been inserted, increment the count. count++; if (count >= docsLength) { // If we have created all documents, we are done. Just set the response. getContext().getResponse().setBody(count); return ; } else { // Create next document. tryCreate(docs[count], callback, collectionLink,count); } } 

}

编辑: – getContext()。getResponse()。setBody(count); 回归; //完全处理所有记录时

python脚本加载存储过程并进行批量导入

 # Initialize the Python DocumentDB client client = document_client.DocumentClient(config['ENDPOINT'], {'masterKey': config['MASTERKEY'] ,'DisableSSLVerification' : 'true' }) # Create a database #db = client.CreateDatabase({ 'id': config['DOCUMENTDB_DATABASE'] }) db=client.ReadDatabases({ 'id': 'db2' }) print(db) # Create collection options options = { 'offerEnableRUPerMinuteThroughput': True, 'offerVersion': "V2", 'offerThroughput': 400 } # Create a collection #collection = client.CreateCollection('dbs/db2' , { 'id': 'coll2'}, options) #collection = client.CreateCollection({ 'id':'db2'},{ 'id': 'coll2'}, options) database_link = 'dbs/db2' collection_link = database_link + '/colls/coll2' """ #List collections collection = client.ReadCollection(collection_link) print(collection) print('Databases:') databases = list(client.ReadDatabases()) if not databases: print('No Databases:') for database in databases: print(database['id']) """ # Create some documents """ document1 = client.CreateDocument(collection['_self'], { 'Web Site': 0, 'Cloud Service': 0, 'Virtual Machine': 0, 'name': 'some' }) document2 = client.CreateDocument(collection['_self'], { 'Web Site': 1, 'Cloud Service': 0, 'Virtual Machine': 0, 'name': 'some' }) """ # Query them in SQL """ query = { 'query': 'SELECT * FROM server s' } options = {} options['enableCrossPartitionQuery'] = True options['maxItemCount'] = 20 #result_iterable = client.QueryDocuments(collection['_self'], query, options) result_iterable = client.QueryDocuments(collection_link, query, options) results = list(result_iterable); print(results) """ ##How to store procedure and use it """ sproc3 = { 'id': 'storedProcedure2', 'body': ( 'function (input) {' + ' getContext().getResponse().setBody(' + ' \'a\' + input.temp);' + '}') } retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3) result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/storedProcedure3',{'temp': 'so'}) """ ## delete all records in collection """ result = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkDeleteSproc',"SELECT * FROM c ORDER BY c._ts DESC ") print(result) """ multiplerecords="""[{ "Virtual Machine": 0, "name": "some", "Web Site": 0, "Cloud Service": 0 }, { "Virtual Machine": 0, "name": "some", "Web Site": 1, "Cloud Service": 0 }]""" multiplerecords=json.loads(multiplerecords) print(multiplerecords) print(str(json.dumps(json.dumps(multiplerecords).encode('utf8')))) #bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(multiplerecords).encode('utf8')) #bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkImport',json.dumps(json.loads(r'{"items": [{"name":"John","age":30,"city":"New York"},{"name":"John","age":30,"city":"New York"}]}')).encode('utf8')) str1='{name":"John","age":30,"city":"New York","PartitionKey" : "Morisplane"}' str2='{name":"John","age":30,"city":"New York","partitionKey" : "Morisplane"}' key1=base64.b64encode(str1.encode("utf-8")) key2=base64.b64encode(str2.encode("utf-8")) data= {"items":[{"id": key1 ,"name":"John","age":30,"city":"Morisplane","PartitionKey" : "Morisplane" },{"id": key2,"name":"John","age":30,"city":"Morisplane","partitionKey" : "Morisplane"}] , "city": "Morisplane", "partitionKey" : "Morisplane"} print(repr(data)) #retrieved_sproc3 =client.DeleteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2') sproc3 = { 'id': 'bulkimport2', 'body': ( """function bulkimport2(docObject) { var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); // The count of imported docs, also used as current doc index. var count = 0; getContext().getResponse().setBody(docObject.items); //return // Validate input. //if (!docObject.items || !docObject.items.length) getContext().getResponse().setBody(docObject); docObject.items=JSON.stringify(docObject.items) docObject.items = docObject.items.replace("\\\\r", ""); docObject.items = docObject.items.replace("\\\\n", ""); var docs = JSON.parse(docObject.items); var docsLength = docObject.items.length; if (docsLength == 0) { getContext().getResponse().setBody(0); return; } // Call the CRUD API to create a document. tryCreate(docs[count], callback, collectionLink,count); // Note that there are 2 exit conditions: // 1) The createDocument request was not accepted. // In this case the callback will not be called, we just call setBody and we are done. // 2) The callback was called docs.length times. // In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done. function tryCreate(doc, callback, collectionLink,count ) { doc=JSON.stringify(doc); if (typeof doc == "undefined") { getContext().getResponse().setBody(count); return ; } else { doc = doc.replace("\\r", ""); doc = doc.replace("\\n", ""); doc=JSON.parse(doc); } getContext().getResponse().setBody(doc); return var isAccepted = collection.upsertDocument(collectionLink, doc, callback); // If the request was accepted, callback will be called. // Otherwise report current count back to the client, // which will call the script again with remaining set of docs. // This condition will happen when this stored procedure has been running too long // and is about to get cancelled by the server. This will allow the calling client // to resume this batch from the point we got to before isAccepted was set to false if (!isAccepted) { getContext().getResponse().setBody(count); } } // This is called when collection.createDocument is done and the document has been persisted. function callback(err, doc, options) { if (err) throw getContext().getResponse().setBody(err + doc); // One more document has been inserted, increment the count. count++; if (count >= docsLength) { // If we have created all documents, we are done. Just set the response. getContext().getResponse().setBody(count); return ; } else { // Create next document. tryCreate(docs[count], callback, collectionLink,count); } } }""" ) } #retrieved_sproc3 = client.CreateStoredProcedure(collection_link,sproc3) bulkloadresult = client.ExecuteStoredProcedure('dbs/db2/colls/coll2/sprocs/bulkimport2', data , {"partitionKey" : "Morisplane"} ) print(repr(bulkloadresult)) 
 private async Task ExecuteDataUpload(IEnumerable data,PartitionKey partitionKey) { using (var client = new DocumentClient(m_endPointUrl, m_authKey, connPol)) { while (true) { try { var result = await client.ExecuteStoredProcedureAsync(m_spSelfLink, new RequestOptions { PartitionKey = partitionKey }, data); return result; } catch (DocumentClientException ex) { if (429 == (int)ex.StatusCode) { Thread.Sleep(ex.RetryAfter); continue; } if (HttpStatusCode.RequestTimeout == ex.StatusCode) { Thread.Sleep(ex.RetryAfter); continue; } throw ex; } catch (Exception) { Thread.Sleep(TimeSpan.FromSeconds(1)); continue; } } } } public async Task uploadData(IEnumerable data, string partitionKey) { int groupSize = 600; int dataSize = data.Count(); int chunkSize = dataSize > groupSize ? groupSize : dataSize; List uploadTasks = new List(); while (dataSize > 0) { IEnumerable chunkData = data.Take(chunkSize); object[] taskData = new object[3]; taskData[0] = chunkData; taskData[1] = chunkSize; taskData[2] = partitionKey; uploadTasks.Add(Task.Factory.StartNew(async (arg) => { object[] reqdData = (object[])arg; int chunkSizes = (int)reqdData[1]; IEnumerable chunkDatas = (IEnumerable)reqdData[0]; var partKey = new PartitionKey((string)reqdData[2]); int chunkDatasCount = chunkDatas.Count(); while (chunkDatasCount > 0) { int insertedCount = await ExecuteDataUpload(chunkDatas, partKey); chunkDatas = chunkDatas.Skip(insertedCount); chunkDatasCount = chunkDatasCount - insertedCount; } }, taskData)); data = data.Skip(chunkSize); dataSize = dataSize - chunkSize; chunkSize = dataSize > groupSize ? groupSize : dataSize; } await Task.WhenAll(uploadTasks); } 

现在,与要上载的对象列表并行调用uploadData。 只记住一件事只发送类似Partitionkey的数据。