bulk insert and benchmarks

This commit is contained in:
Elijah McMorris 2024-01-15 22:17:25 -08:00
parent dc85c7d997
commit 82edfdfbd3
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
12 changed files with 289 additions and 54 deletions

View file

@ -1,7 +1,6 @@
use anyhow::{Error, Ok, Result};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use lazy_static::lazy_static;
use std::{env, fmt::Write, io::BufRead};
use std::{env, io::BufRead};
use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::time::{sleep, Duration};
mod utils;
@ -16,25 +15,16 @@ lazy_static! {
.expect("THREADED_REQUESTS not set")
.parse()
.expect("Failed to parse THREADED_REQUESTS");
static ref WIKIDATA_BULK_INSERT: bool = env::var("WIKIDATA_BULK_INSERT")
.expect("WIKIDATA_BULK_INSERT not set")
.parse()
.expect("Failed to parse WIKIDATA_BULK_INSERT");
}
#[tokio::main]
async fn main() -> Result<(), Error> {
sleep(Duration::from_secs(10)).await;
let total_size = 113_000_000;
let pb = ProgressBar::new(total_size);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} ETA:[{eta}]",
)?
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
let sec = state.eta().as_secs();
let min = (sec / 60) % 60;
let hr = (sec / 60) / 60;
write!(w, "{}:{:02}:{:02}", hr, min, sec % 60).unwrap()
}),
);
let pb = create_pb().await;
let db = create_db_ws().await?;
let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?;
@ -64,9 +54,26 @@ async fn main() -> Result<(), Error> {
pb.inc(100);
}
}
} else if *WIKIDATA_BULK_INSERT {
create_db_entities_threaded(
None::<Surreal<Client>>,
reader,
Some(pb.clone()),
2500,
100,
CreateVersion::Bulk,
)
.await?;
} else {
create_db_entities_threaded(None::<Surreal<Client>>, reader, Some(pb.clone()), 2500, 100)
.await?;
create_db_entities_threaded(
None::<Surreal<Client>>,
reader,
Some(pb.clone()),
2500,
100,
CreateVersion::Single,
)
.await?;
}
pb.finish();

View file

@ -1,7 +1,7 @@
use anyhow::{Error, Result};
use bzip2::read::MultiBzDecoder;
use futures::future::join_all;
use indicatif::ProgressBar;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use lazy_static::lazy_static;
use serde_json::{from_str, Value};
use std::{
@ -96,21 +96,95 @@ pub async fn create_db_entities(
Ok(())
}
pub async fn create_db_entities_bulk(
db: &Surreal<impl Connection>,
lines: &[String],
pb: &Option<ProgressBar>,
) -> Result<(), Error> {
let lines = lines
.iter()
.map(|line| line.trim().trim_end_matches(',').to_string())
.filter(|line| line != "[" && line != "]")
.collect::<Vec<String>>();
let mut data_vec: Vec<EntityMini> = Vec::new();
let mut claims_vec: Vec<Claims> = Vec::new();
let mut property_vec: Vec<EntityMini> = Vec::new();
let mut lexeme_vec: Vec<EntityMini> = Vec::new();
for line in lines {
let json: Value = from_str(&line).expect("Failed to parse JSON");
let data = Entity::from_json(json).expect("Failed to parse JSON");
let (claims, data) = EntityMini::from_entity(data);
match data.id.clone().expect("No ID").tb.as_str() {
"Property" => property_vec.push(data),
"Lexeme" => lexeme_vec.push(data),
"Entity" => data_vec.push(data),
_ => panic!("Unknown table"),
}
claims_vec.push(claims);
}
db.query("insert into Entity ($data_vec) RETURN NONE;")
.bind(("data_vec", data_vec))
.await?;
db.query("insert into Claims ($claims_vec) RETURN NONE;")
.bind(("claims_vec", claims_vec))
.await?;
db.query("insert into Property ($property_vec) RETURN NONE;")
.bind(("property_vec", property_vec))
.await?;
db.query("insert into Lexeme ($lexeme_vec) RETURN NONE;")
.bind(("lexeme_vec", lexeme_vec))
.await?;
if let Some(ref p) = pb {
p.inc(100)
}
Ok(())
}
#[derive(Clone, Copy)]
pub enum CreateVersion {
Single,
Bulk,
}
impl CreateVersion {
pub async fn run(
self,
db: &Surreal<impl Connection>,
chunk: &Vec<String>,
pb: &Option<ProgressBar>,
) -> bool {
match self {
CreateVersion::Single => create_db_entities(db, chunk, pb).await.is_ok(),
CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb).await.is_ok(),
}
}
}
pub async fn create_db_entities_threaded(
dbo: Option<Surreal<impl Connection>>, // None::<Surreal<Client>>
reader: Box<dyn BufRead>,
pb: Option<ProgressBar>,
batch_size: usize,
batch_num: usize,
create_version: CreateVersion,
) -> Result<(), Error> {
let mut futures = Vec::new();
let mut chunk = Vec::new();
let mut chunk_counter = 0;
let mut lines = reader.lines();
let mut last_loop = false;
for line in reader.lines() {
chunk.push(line?);
loop {
let line = lines.next();
match line {
Some(line) => chunk.push(line?),
None => last_loop = true,
};
if chunk.len() >= batch_size {
if chunk.len() >= batch_size || last_loop {
let dbo = dbo.clone();
let pb = pb.clone();
@ -119,7 +193,7 @@ pub async fn create_db_entities_threaded(
loop {
match dbo {
Some(ref db) => {
if create_db_entities(db, &chunk, &pb).await.is_ok() {
if create_version.run(db, &chunk, &pb).await {
break;
}
if db.use_ns("wikidata").use_db("wikidata").await.is_err() {
@ -132,7 +206,7 @@ pub async fn create_db_entities_threaded(
} else {
continue;
};
if create_db_entities(&db, &chunk, &pb).await.is_ok() {
if create_version.run(&db, &chunk, &pb).await {
break;
}
}
@ -142,18 +216,21 @@ pub async fn create_db_entities_threaded(
panic!("Failed to create entities, too many retries");
}
retries += 1;
sleep(Duration::from_secs(1)).await;
sleep(Duration::from_millis(100)).await;
}
}));
chunk_counter += 1;
chunk = Vec::new();
}
if chunk_counter >= batch_num {
if chunk_counter >= batch_num || last_loop {
join_all(futures).await;
futures = Vec::new();
chunk_counter = 0;
}
if last_loop {
break;
}
}
match dbo {
@ -180,3 +257,24 @@ pub async fn create_db_ws() -> Result<Surreal<Client>, Error> {
Ok(db)
}
pub async fn create_pb() -> ProgressBar {
let total_size = 110_000_000;
let pb = ProgressBar::new(total_size);
pb.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} ETA:[{eta}]",
)
.unwrap()
.with_key(
"eta",
|state: &ProgressState, w: &mut dyn std::fmt::Write| {
let sec = state.eta().as_secs();
let min = (sec / 60) % 60;
let hr = (sec / 60) / 60;
write!(w, "{}:{:02}:{:02}", hr, min, sec % 60).unwrap()
},
),
);
pb
}