with_capacity, fix progress bar, profiling

This commit is contained in:
Elijah McMorris 2024-01-30 23:07:19 -08:00
parent 82edfdfbd3
commit 305bf5273b
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
5 changed files with 32 additions and 9 deletions

View file

@ -4,6 +4,8 @@ rustflags = ["-C", "link-arg=-fuse-ld=/usr/bin/mold"]
[alias] [alias]
t = "nextest run" t = "nextest run"
# https://github.com/tikv/pprof-rs?tab=readme-ov-file#use-with-pprof
profile = "bench --bench bench -- --profile-time 10"
[build] [build]
target-dir = "target/target" target-dir = "target/target"

View file

@ -1,6 +1,7 @@
# Contributing code # Contributing code
- Make sure the test pass - Make sure the test pass `cargo t`
- Run `cargo clippy --fix --allow-dirty` - Run `cargo clippy --fix --allow-dirty`
- Run `cargo bench`
# Dev Install # Dev Install
## Dev Containers ## Dev Containers

View file

@ -19,6 +19,7 @@ indicatif = "0.17"
[dev-dependencies] [dev-dependencies]
surrealdb = { version = "1.1", features = ["kv-mem"] } surrealdb = { version = "1.1", features = ["kv-mem"] }
criterion = { version = "0.5", features = ["async_tokio"] } criterion = { version = "0.5", features = ["async_tokio"] }
pprof = { version = "0.13", features = ["criterion", "protobuf-codec"] }
[[bench]] [[bench]]
name = "bench" name = "bench"

View file

@ -1,5 +1,6 @@
use anyhow::{Error, Ok, Result}; use anyhow::{Error, Ok, Result};
use criterion::{criterion_group, criterion_main, Criterion}; use criterion::{criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use std::{env, time::Duration}; use std::{env, time::Duration};
use surrealdb::{ use surrealdb::{
engine::local::{Db, Mem}, engine::local::{Db, Mem},
@ -73,7 +74,7 @@ fn bench(c: &mut Criterion) {
criterion_group! { criterion_group! {
name = benches; name = benches;
config = Criterion::default().measurement_time(Duration::from_secs(60)); config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Protobuf)).measurement_time(Duration::from_secs(60));
targets= bench targets= bench
} }
criterion_main!(benches); criterion_main!(benches);

View file

@ -100,6 +100,7 @@ pub async fn create_db_entities_bulk(
db: &Surreal<impl Connection>, db: &Surreal<impl Connection>,
lines: &[String], lines: &[String],
pb: &Option<ProgressBar>, pb: &Option<ProgressBar>,
batch_size: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let lines = lines let lines = lines
.iter() .iter()
@ -108,9 +109,10 @@ pub async fn create_db_entities_bulk(
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let mut data_vec: Vec<EntityMini> = Vec::new(); let mut data_vec: Vec<EntityMini> = Vec::new();
let mut claims_vec: Vec<Claims> = Vec::new(); let mut claims_vec: Vec<Claims> = Vec::with_capacity(batch_size);
let mut property_vec: Vec<EntityMini> = Vec::new(); let mut property_vec: Vec<EntityMini> = Vec::new();
let mut lexeme_vec: Vec<EntityMini> = Vec::new(); let mut lexeme_vec: Vec<EntityMini> = Vec::new();
let mut first_loop = true;
for line in lines { for line in lines {
let json: Value = from_str(&line).expect("Failed to parse JSON"); let json: Value = from_str(&line).expect("Failed to parse JSON");
@ -123,6 +125,19 @@ pub async fn create_db_entities_bulk(
_ => panic!("Unknown table"), _ => panic!("Unknown table"),
} }
claims_vec.push(claims); claims_vec.push(claims);
if first_loop {
first_loop = false;
if !data_vec.is_empty() {
data_vec.reserve(batch_size);
}
if !property_vec.is_empty() {
property_vec.reserve(batch_size);
}
if !lexeme_vec.is_empty() {
lexeme_vec.reserve(batch_size);
}
}
} }
db.query("insert into Entity ($data_vec) RETURN NONE;") db.query("insert into Entity ($data_vec) RETURN NONE;")
@ -139,7 +154,7 @@ pub async fn create_db_entities_bulk(
.await?; .await?;
if let Some(ref p) = pb { if let Some(ref p) = pb {
p.inc(100) p.inc(batch_size as u64)
} }
Ok(()) Ok(())
} }
@ -155,10 +170,13 @@ impl CreateVersion {
db: &Surreal<impl Connection>, db: &Surreal<impl Connection>,
chunk: &Vec<String>, chunk: &Vec<String>,
pb: &Option<ProgressBar>, pb: &Option<ProgressBar>,
batch_size: usize,
) -> bool { ) -> bool {
match self { match self {
CreateVersion::Single => create_db_entities(db, chunk, pb).await.is_ok(), CreateVersion::Single => create_db_entities(db, chunk, pb).await.is_ok(),
CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb).await.is_ok(), CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb, batch_size)
.await
.is_ok(),
} }
} }
} }
@ -172,7 +190,7 @@ pub async fn create_db_entities_threaded(
create_version: CreateVersion, create_version: CreateVersion,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut futures = Vec::new(); let mut futures = Vec::new();
let mut chunk = Vec::new(); let mut chunk = Vec::with_capacity(batch_size);
let mut chunk_counter = 0; let mut chunk_counter = 0;
let mut lines = reader.lines(); let mut lines = reader.lines();
let mut last_loop = false; let mut last_loop = false;
@ -193,7 +211,7 @@ pub async fn create_db_entities_threaded(
loop { loop {
match dbo { match dbo {
Some(ref db) => { Some(ref db) => {
if create_version.run(db, &chunk, &pb).await { if create_version.run(db, &chunk, &pb, batch_size).await {
break; break;
} }
if db.use_ns("wikidata").use_db("wikidata").await.is_err() { if db.use_ns("wikidata").use_db("wikidata").await.is_err() {
@ -206,7 +224,7 @@ pub async fn create_db_entities_threaded(
} else { } else {
continue; continue;
}; };
if create_version.run(&db, &chunk, &pb).await { if create_version.run(&db, &chunk, &pb, batch_size).await {
break; break;
} }
} }
@ -220,7 +238,7 @@ pub async fn create_db_entities_threaded(
} }
})); }));
chunk_counter += 1; chunk_counter += 1;
chunk = Vec::new(); chunk = Vec::with_capacity(batch_size);
} }
if chunk_counter >= batch_num || last_loop { if chunk_counter >= batch_num || last_loop {