refactor: CreateVersion and run_threaded

This commit is contained in:
Elijah McMorris 2024-08-27 18:59:25 -07:00
parent 17a115f473
commit b885315cd7
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
5 changed files with 210 additions and 235 deletions

View file

@ -28,16 +28,10 @@ fn bench(c: &mut Criterion) {
.reader("tests/data/bench.json") .reader("tests/data/bench.json")
.unwrap(); .unwrap();
create_db_entities_threaded( CreateVersion::Single
Some(db.clone()), .run_threaded(Some(db.clone()), reader, None, 1000, 100)
reader, .await
None, .unwrap();
1000,
100,
CreateVersion::Single,
)
.await
.unwrap();
}) })
}) })
}); });
@ -51,16 +45,10 @@ fn bench(c: &mut Criterion) {
.reader("tests/data/bench.json") .reader("tests/data/bench.json")
.unwrap(); .unwrap();
create_db_entities_threaded( CreateVersion::Bulk
Some(db.clone()), .run_threaded(Some(db.clone()), reader, None, 1000, 100)
reader, .await
None, .unwrap();
1000,
100,
CreateVersion::Bulk,
)
.await
.unwrap();
}) })
}) })
}); });

View file

@ -49,7 +49,7 @@ async fn main() -> Result<(), Error> {
let line = line?; let line = line?;
loop { loop {
if create_db_entity(&db, &line).await.is_ok() { if create_entity(&db, &line).await.is_ok() {
break; break;
} }
if retries >= 60 * 10 { if retries >= 60 * 10 {
@ -69,37 +69,37 @@ async fn main() -> Result<(), Error> {
} }
} }
CreateMode::ThreadedSingle => { CreateMode::ThreadedSingle => {
create_db_entities_threaded( CreateVersion::Single
None::<Surreal<Client>>, .run_threaded(
reader, None::<Surreal<Client>>,
Some(pb.clone()), reader,
2_500, Some(pb.clone()),
100, 2_500,
CreateVersion::Single, 100,
) )
.await?; .await?;
} }
CreateMode::ThreadedBulk => { CreateMode::ThreadedBulk => {
create_db_entities_threaded( CreateVersion::Bulk
None::<Surreal<Client>>, .run_threaded(
reader, None::<Surreal<Client>>,
Some(pb.clone()), reader,
500, Some(pb.clone()),
1_000, 500,
CreateVersion::Bulk, 1_000,
) )
.await?; .await?;
} }
CreateMode::ThreadedBulkFilter => { CreateMode::ThreadedBulkFilter => {
create_db_entities_threaded( CreateVersion::BulkFilter
None::<Surreal<Client>>, .run_threaded(
reader, None::<Surreal<Client>>,
Some(pb.clone()), reader,
500, Some(pb.clone()),
1_000, 500,
CreateVersion::BulkFilter, 1_000,
) )
.await?; .await?;
} }
} }

View file

@ -51,7 +51,7 @@ impl File_Format {
} }
} }
pub async fn create_db_entity(db: &Surreal<impl Connection>, line: &str) -> Result<(), Error> { pub async fn create_entity(db: &Surreal<impl Connection>, line: &str) -> Result<(), Error> {
let line = line.trim().trim_end_matches(',').to_string(); let line = line.trim().trim_end_matches(',').to_string();
if line == "[" || line == "]" { if line == "[" || line == "]" {
return Ok(()); return Ok(());
@ -76,105 +76,6 @@ pub async fn create_db_entity(db: &Surreal<impl Connection>, line: &str) -> Resu
Ok(()) Ok(())
} }
pub async fn create_db_entities(
db: &Surreal<impl Connection>,
lines: &[String],
pb: &Option<ProgressBar>,
) -> Result<(), Error> {
let mut counter = 0;
for line in lines {
create_db_entity(db, line).await?;
counter += 1;
if counter % 100 == 0 {
if let Some(ref p) = pb {
p.inc(100)
}
}
}
Ok(())
}
pub async fn create_db_entities_bulk(
db: &Surreal<impl Connection>,
lines: &[String],
pb: &Option<ProgressBar>,
batch_size: usize,
) -> Result<(), Error> {
let lines = lines
.iter()
.map(|line| line.trim().trim_end_matches(',').to_string())
.filter(|line| line != "[" && line != "]")
.collect::<Vec<String>>();
let mut data_vec: Vec<EntityMini> = Vec::with_capacity(batch_size);
let mut claims_vec: Vec<Claims> = Vec::with_capacity(batch_size);
let mut property_vec: Vec<EntityMini> = Vec::with_capacity(batch_size);
let mut lexeme_vec: Vec<EntityMini> = Vec::with_capacity(batch_size);
for line in lines {
let json: Value = from_str(&line).expect("Failed to parse JSON");
let data = Entity::from_json(json).expect("Failed to parse JSON");
let (claims, data) = EntityMini::from_entity(data);
match data.id.clone().expect("No ID").tb.as_str() {
"Property" => property_vec.push(data),
"Lexeme" => lexeme_vec.push(data),
"Entity" => data_vec.push(data),
_ => panic!("Unknown table"),
}
claims_vec.push(claims);
}
db.insert::<Vec<EntityMini>>("Entity")
.content(data_vec)
.await?;
db.insert::<Vec<Claims>>("Claims")
.content(claims_vec)
.await?;
db.insert::<Vec<EntityMini>>("Property")
.content(property_vec)
.await?;
db.insert::<Vec<EntityMini>>("Lexeme")
.content(lexeme_vec)
.await?;
if let Some(ref p) = pb {
p.inc(batch_size as u64)
}
Ok(())
}
pub async fn create_db_entities_bulk_filter(
db: &Surreal<impl Connection>,
lines: &[String],
pb: &Option<ProgressBar>,
batch_size: usize,
) -> Result<(), Error> {
let db_mem = init_db::create_db_mem().await?;
create_db_entities_bulk(&db_mem, lines, &None, batch_size).await?;
let filter = tokio::fs::read_to_string(&*FILTER_PATH).await?;
db_mem.query(filter).await?;
let file_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect();
let file_path = format!("data/temp/{}.surql", file_name);
tokio::fs::create_dir_all("data/temp").await?;
db_mem.export(&file_path).await?;
db.import(&file_path).await?;
tokio::fs::remove_file(&file_path).await?;
if let Some(ref p) = pb {
p.inc(batch_size as u64)
}
Ok(())
}
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub enum CreateVersion { pub enum CreateVersion {
Single, Single,
@ -191,95 +92,182 @@ impl CreateVersion {
batch_size: usize, batch_size: usize,
) -> bool { ) -> bool {
match self { match self {
CreateVersion::Single => create_db_entities(db, chunk, pb).await.is_ok(), CreateVersion::Single => self.create_single(db, chunk, pb).await.is_ok(),
CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb, batch_size) CreateVersion::Bulk => self.create_bulk(db, chunk, pb, batch_size).await.is_ok(),
.await CreateVersion::BulkFilter => self
.is_ok(), .create_bulk_filter(db, chunk, pb, batch_size)
CreateVersion::BulkFilter => create_db_entities_bulk_filter(db, chunk, pb, batch_size)
.await .await
.is_ok(), .is_ok(),
} }
} }
}
pub async fn create_db_entities_threaded( pub async fn run_threaded(
dbo: Option<Surreal<impl Connection>>, // None::<Surreal<Client>> self,
reader: Box<dyn BufRead>, dbo: Option<Surreal<impl Connection>>,
pb: Option<ProgressBar>, reader: Box<dyn BufRead>, // None::<Surreal<Client>>
batch_size: usize, pb: Option<ProgressBar>,
batch_num: usize, batch_size: usize,
create_version: CreateVersion, batch_num: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut futures = Vec::new(); let mut lines = reader.lines().peekable();
let mut chunk = Vec::with_capacity(batch_size); let mut futures = Vec::new();
let mut chunk_counter = 0;
let mut lines = reader.lines();
let mut last_loop = false;
loop { while lines.peek().is_some() {
let line = lines.next(); let chunk: Vec<String> = lines
match line { .by_ref()
Some(line) => chunk.push(line?), .take(batch_size)
None => last_loop = true, .filter_map(Result::ok)
}; .collect();
if chunk.len() >= batch_size || last_loop { futures.push(self.spawn_chunk(dbo.clone(), chunk, pb.clone(), batch_size));
let dbo = dbo.clone();
let pb = pb.clone();
futures.push(tokio::spawn(async move { if futures.len() >= batch_num {
let mut retries = 0; join_all(futures).await;
loop { futures = Vec::new();
match dbo { }
Some(ref db) => { }
if create_version.run(db, &chunk, &pb, batch_size).await {
break; join_all(futures).await;
} Ok(())
if db.use_ns("wikidata").use_db("wikidata").await.is_err() { }
continue;
}; fn spawn_chunk(
} &self,
None => { dbo: Option<Surreal<impl Connection>>,
let db = if let Ok(db) = init_db::create_db_ws().await { chunk: Vec<String>,
db pb: Option<ProgressBar>,
} else { batch_size: usize,
continue; ) -> tokio::task::JoinHandle<()> {
}; let create_version = *self;
if create_version.run(&db, &chunk, &pb, batch_size).await {
break; tokio::spawn(async move {
} let mut retries = 0;
loop {
match dbo {
Some(ref db) => {
if create_version.run(db, &chunk, &pb, batch_size).await {
break;
} }
} }
None => {
if retries >= 60 * 10 { let db = match init_db::create_db_ws().await {
panic!("Failed to create entities, too many retries"); Ok(db) => db,
Err(_) => continue,
};
if create_version.run(&db, &chunk, &pb, batch_size).await {
break;
}
} }
retries += 1;
sleep(Duration::from_millis(250)).await;
} }
}));
chunk_counter += 1;
chunk = Vec::with_capacity(batch_size);
}
if chunk_counter >= batch_num || last_loop { if retries >= 60 * 10 {
join_all(futures).await; panic!("Failed to create entities, too many retries");
futures = Vec::new(); }
chunk_counter = 0; retries += 1;
} sleep(Duration::from_millis(250)).await;
if last_loop { }
break; })
}
} }
match dbo { async fn create_single(
Some(db) => { self,
create_db_entities(&db, &chunk, &pb).await?; db: &Surreal<impl Connection>,
} lines: &[String],
None => { pb: &Option<ProgressBar>,
create_db_entities(&init_db::create_db_ws().await?, &chunk, &pb).await?; ) -> Result<(), Error> {
let mut counter = 0;
for line in lines {
create_entity(db, line).await?;
counter += 1;
if counter % 100 == 0 {
if let Some(ref p) = pb {
p.inc(100)
}
}
} }
Ok(())
}
async fn create_bulk(
self,
db: &Surreal<impl Connection>,
lines: &[String],
pb: &Option<ProgressBar>,
batch_size: usize,
) -> Result<(), Error> {
let lines = lines
.iter()
.map(|line| line.trim().trim_end_matches(',').to_string())
.filter(|line| line != "[" && line != "]")
.collect::<Vec<String>>();
let mut data_vec: Vec<EntityMini> = Vec::with_capacity(batch_size);
let mut claims_vec: Vec<Claims> = Vec::with_capacity(batch_size);
let mut property_vec: Vec<EntityMini> = Vec::with_capacity(batch_size);
let mut lexeme_vec: Vec<EntityMini> = Vec::with_capacity(batch_size);
for line in lines {
let json: Value = from_str(&line).expect("Failed to parse JSON");
let data = Entity::from_json(json).expect("Failed to parse JSON");
let (claims, data) = EntityMini::from_entity(data);
match data.id.clone().expect("No ID").tb.as_str() {
"Property" => property_vec.push(data),
"Lexeme" => lexeme_vec.push(data),
"Entity" => data_vec.push(data),
_ => panic!("Unknown table"),
}
claims_vec.push(claims);
}
db.insert::<Vec<EntityMini>>("Entity")
.content(data_vec)
.await?;
db.insert::<Vec<Claims>>("Claims")
.content(claims_vec)
.await?;
db.insert::<Vec<EntityMini>>("Property")
.content(property_vec)
.await?;
db.insert::<Vec<EntityMini>>("Lexeme")
.content(lexeme_vec)
.await?;
if let Some(ref p) = pb {
p.inc(batch_size as u64)
}
Ok(())
}
async fn create_bulk_filter(
self,
db: &Surreal<impl Connection>,
lines: &[String],
pb: &Option<ProgressBar>,
batch_size: usize,
) -> Result<(), Error> {
let db_mem = init_db::create_db_mem().await?;
self.create_bulk(&db_mem, lines, &None, batch_size).await?;
let filter = tokio::fs::read_to_string(&*FILTER_PATH).await?;
db_mem.query(filter).await?;
let file_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect();
let file_path = format!("data/temp/{}.surql", file_name);
tokio::fs::create_dir_all("data/temp").await?;
db_mem.export(&file_path).await?;
db.import(&file_path).await?;
tokio::fs::remove_file(&file_path).await?;
if let Some(ref p) = pb {
p.inc(batch_size as u64)
}
Ok(())
} }
join_all(futures).await;
Ok(())
} }

View file

@ -34,5 +34,6 @@ pub async fn create_db_ws() -> Result<Surreal<Client>, Error> {
pub async fn create_db_mem() -> Result<Surreal<Db>, Error> { pub async fn create_db_mem() -> Result<Surreal<Db>, Error> {
let db = Surreal::new::<Mem>(()).await?; let db = Surreal::new::<Mem>(()).await?;
db.use_ns("wikidata").use_db("wikidata").await?; db.use_ns("wikidata").use_db("wikidata").await?;
Ok(db) Ok(db)
} }

View file

@ -42,7 +42,7 @@ async fn entity() {
let reader = init_reader("json", "Entity"); let reader = init_reader("json", "Entity");
for line in reader.lines() { for line in reader.lines() {
create_db_entity(&db, &line.unwrap()).await.unwrap(); create_entity(&db, &line.unwrap()).await.unwrap();
} }
assert_eq!(51.0, entity_query(&db).await.unwrap().unwrap()) assert_eq!(51.0, entity_query(&db).await.unwrap().unwrap())
@ -56,7 +56,9 @@ async fn entity_threaded(#[case] version: CreateVersion) -> Result<(), Error> {
let db = inti_db().await?; let db = inti_db().await?;
let reader = init_reader("json", "Entity"); let reader = init_reader("json", "Entity");
create_db_entities_threaded(Some(db.clone()), reader, None, 1_000, 100, version).await?; version
.run_threaded(Some(db.clone()), reader, None, 1_000, 100)
.await?;
assert_eq!(51.0, entity_query(&db).await?.unwrap()); assert_eq!(51.0, entity_query(&db).await?.unwrap());
Ok(()) Ok(())
@ -68,15 +70,9 @@ async fn entity_threaded_filter() -> Result<(), Error> {
let db = inti_db().await?; let db = inti_db().await?;
let reader = init_reader("json", "bench"); let reader = init_reader("json", "bench");
create_db_entities_threaded( CreateVersion::BulkFilter
Some(db.clone()), .run_threaded(Some(db.clone()), reader, None, 1_000, 100)
reader, .await?;
None,
1_000,
100,
CreateVersion::BulkFilter,
)
.await?;
let count: Option<f32> = db let count: Option<f32> = db
.query("return count(select * from Entity);") .query("return count(select * from Entity);")
@ -105,7 +101,7 @@ async fn property() {
let reader = init_reader("json", "Property"); let reader = init_reader("json", "Property");
for line in reader.lines() { for line in reader.lines() {
create_db_entity(&db, &line.unwrap()).await.unwrap(); create_entity(&db, &line.unwrap()).await.unwrap();
} }
assert_eq!(2.0, property_query(&db).await.unwrap().unwrap()) assert_eq!(2.0, property_query(&db).await.unwrap().unwrap())
@ -119,7 +115,9 @@ async fn property_threaded(#[case] version: CreateVersion) -> Result<(), Error>
let db = inti_db().await?; let db = inti_db().await?;
let reader = init_reader("json", "Property"); let reader = init_reader("json", "Property");
create_db_entities_threaded(Some(db.clone()), reader, None, 1_000, 100, version).await?; version
.run_threaded(Some(db.clone()), reader, None, 1_000, 100)
.await?;
assert_eq!(2.0, property_query(&db).await?.unwrap()); assert_eq!(2.0, property_query(&db).await?.unwrap());
Ok(()) Ok(())