From 305bf5273b0c700824f359adf4fa95b5bfc2aba0 Mon Sep 17 00:00:00 2001 From: NexVeridian Date: Tue, 30 Jan 2024 23:07:19 -0800 Subject: [PATCH] with_capacity, fix progress bar, profiling --- .cargo/config.toml | 2 ++ CONTRIBUTING.md | 3 ++- Cargo.toml | 1 + benches/bench.rs | 3 ++- src/utils.rs | 32 +++++++++++++++++++++++++------- 5 files changed, 32 insertions(+), 9 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 0a0833d..071ca97 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -4,6 +4,8 @@ rustflags = ["-C", "link-arg=-fuse-ld=/usr/bin/mold"] [alias] t = "nextest run" +# https://github.com/tikv/pprof-rs?tab=readme-ov-file#use-with-pprof +profile = "bench --bench bench -- --profile-time 10" [build] target-dir = "target/target" diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3114298..55804f6 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,6 +1,7 @@ # Contributing code -- Make sure the test pass +- Make sure the test pass `cargo t` - Run `cargo clippy --fix --allow-dirty` +- Run `cargo bench` # Dev Install ## Dev Containers diff --git a/Cargo.toml b/Cargo.toml index 861fd2c..be350d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ indicatif = "0.17" [dev-dependencies] surrealdb = { version = "1.1", features = ["kv-mem"] } criterion = { version = "0.5", features = ["async_tokio"] } +pprof = { version = "0.13", features = ["criterion", "protobuf-codec"] } [[bench]] name = "bench" diff --git a/benches/bench.rs b/benches/bench.rs index 57e1534..33fb032 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,5 +1,6 @@ use anyhow::{Error, Ok, Result}; use criterion::{criterion_group, criterion_main, Criterion}; +use pprof::criterion::{Output, PProfProfiler}; use std::{env, time::Duration}; use surrealdb::{ engine::local::{Db, Mem}, @@ -73,7 +74,7 @@ fn bench(c: &mut Criterion) { criterion_group! { name = benches; - config = Criterion::default().measurement_time(Duration::from_secs(60)); + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Protobuf)).measurement_time(Duration::from_secs(60)); targets= bench } criterion_main!(benches); diff --git a/src/utils.rs b/src/utils.rs index 5f9b89e..2e89453 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -100,6 +100,7 @@ pub async fn create_db_entities_bulk( db: &Surreal, lines: &[String], pb: &Option, + batch_size: usize, ) -> Result<(), Error> { let lines = lines .iter() @@ -108,9 +109,10 @@ pub async fn create_db_entities_bulk( .collect::>(); let mut data_vec: Vec = Vec::new(); - let mut claims_vec: Vec = Vec::new(); + let mut claims_vec: Vec = Vec::with_capacity(batch_size); let mut property_vec: Vec = Vec::new(); let mut lexeme_vec: Vec = Vec::new(); + let mut first_loop = true; for line in lines { let json: Value = from_str(&line).expect("Failed to parse JSON"); @@ -123,6 +125,19 @@ pub async fn create_db_entities_bulk( _ => panic!("Unknown table"), } claims_vec.push(claims); + + if first_loop { + first_loop = false; + if !data_vec.is_empty() { + data_vec.reserve(batch_size); + } + if !property_vec.is_empty() { + property_vec.reserve(batch_size); + } + if !lexeme_vec.is_empty() { + lexeme_vec.reserve(batch_size); + } + } } db.query("insert into Entity ($data_vec) RETURN NONE;") @@ -139,7 +154,7 @@ pub async fn create_db_entities_bulk( .await?; if let Some(ref p) = pb { - p.inc(100) + p.inc(batch_size as u64) } Ok(()) } @@ -155,10 +170,13 @@ impl CreateVersion { db: &Surreal, chunk: &Vec, pb: &Option, + batch_size: usize, ) -> bool { match self { CreateVersion::Single => create_db_entities(db, chunk, pb).await.is_ok(), - CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb).await.is_ok(), + CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb, batch_size) + .await + .is_ok(), } } } @@ -172,7 +190,7 @@ pub async fn create_db_entities_threaded( create_version: CreateVersion, ) -> Result<(), Error> { let mut futures = Vec::new(); - let mut chunk = Vec::new(); + let mut chunk = Vec::with_capacity(batch_size); let mut chunk_counter = 0; let mut lines = reader.lines(); let mut last_loop = false; @@ -193,7 +211,7 @@ pub async fn create_db_entities_threaded( loop { match dbo { Some(ref db) => { - if create_version.run(db, &chunk, &pb).await { + if create_version.run(db, &chunk, &pb, batch_size).await { break; } if db.use_ns("wikidata").use_db("wikidata").await.is_err() { @@ -206,7 +224,7 @@ pub async fn create_db_entities_threaded( } else { continue; }; - if create_version.run(&db, &chunk, &pb).await { + if create_version.run(&db, &chunk, &pb, batch_size).await { break; } } @@ -220,7 +238,7 @@ pub async fn create_db_entities_threaded( } })); chunk_counter += 1; - chunk = Vec::new(); + chunk = Vec::with_capacity(batch_size); } if chunk_counter >= batch_num || last_loop {