more error handling and overwrite option

This commit is contained in:
Elijah McMorris 2023-12-18 02:38:46 -08:00
parent e37d413372
commit 8905c88819
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
8 changed files with 76 additions and 23 deletions

View file

@ -2,17 +2,27 @@ use anyhow::{Error, Ok, Result};
use bzip2::read::MultiBzDecoder;
use futures::future::join_all;
use indicatif::ProgressBar;
use lazy_static::lazy_static;
use serde_json::{from_str, Value};
use std::{
env,
fs::File,
io::{BufRead, BufReader},
};
use surrealdb::{Connection, Surreal};
use tokio::time::{sleep, Duration};
use wikidata::Entity;
mod tables;
use tables::*;
lazy_static! {
static ref OVERWRITE_DB: bool = env::var("OVERWRITE_DB")
.expect("OVERWRITE_DB not set")
.parse()
.expect("Failed to parse OVERWRITE_DB");
}
#[allow(non_camel_case_types)]
pub enum File_Format {
json,
@ -35,7 +45,7 @@ impl File_Format {
}
}
pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: String) -> Result<(), Error> {
pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: &str) -> Result<(), Error> {
let line = line.trim().trim_end_matches(',').to_string();
if line == "[" || line == "]" {
return Ok(());
@ -48,15 +58,13 @@ pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: String) -> R
let id = data.id.clone().expect("No ID");
data.id = None;
let _ = db.create::<Option<EntityMini>>(&id).await.is_err();
{
if db.create::<Option<EntityMini>>(&id).await.is_err() && *OVERWRITE_DB {
db.update::<Option<EntityMini>>(&id).content(data).await?;
};
}
let id = claims.id.clone().expect("No ID");
claims.id = None;
let _ = db.create::<Option<Claims>>(&id).await.is_err();
{
if db.create::<Option<Claims>>(&id).await.is_err() && *OVERWRITE_DB {
db.update::<Option<Claims>>(&id).content(claims).await?;
}
Ok(())
@ -64,12 +72,12 @@ pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: String) -> R
pub async fn create_db_entities<C: Connection>(
db: &Surreal<C>,
lines: Vec<String>,
pb: Option<ProgressBar>,
lines: &Vec<String>,
pb: &Option<ProgressBar>,
) -> Result<(), Error> {
let mut counter = 0;
for line in lines {
create_db_entity(db, line.to_string()).await?;
create_db_entity(db, line).await?;
counter += 1;
if counter % 100 == 0 {
if let Some(ref p) = pb {
@ -92,7 +100,7 @@ pub async fn create_db_entities_threaded<C: Connection>(
let mut chunk_counter = 0;
for line in reader.lines() {
chunk.push(line.unwrap());
chunk.push(line?);
if chunk.len() >= batch_size {
let db = db.clone();
@ -100,7 +108,20 @@ pub async fn create_db_entities_threaded<C: Connection>(
let pb = pb.clone();
futures.push(tokio::spawn(async move {
create_db_entities(&db, lines, pb).await.unwrap();
let mut retries = 0;
loop {
if create_db_entities(&db, &lines, &pb).await.is_ok() {
break;
}
if retries >= 60 * 10 {
panic!("Failed to create entities, too many retries");
}
retries += 1;
sleep(Duration::from_secs(1)).await;
if db.use_ns("wikidata").use_db("wikidata").await.is_err() {
continue;
};
}
}));
chunk_counter += 1;
chunk.clear();
@ -113,7 +134,7 @@ pub async fn create_db_entities_threaded<C: Connection>(
}
}
create_db_entities(db, chunk, pb).await.unwrap();
create_db_entities(db, &chunk, &pb).await?;
join_all(futures).await;
Ok(())
}