From 8905c88819a40c3b00c97796112e3c017348961b Mon Sep 17 00:00:00 2001 From: NexVeridian Date: Mon, 18 Dec 2023 02:38:46 -0800 Subject: [PATCH] more error handling and overwrite option --- CONTRIBUTING.md | 4 +--- Cargo.toml | 2 +- README.md | 4 +++- docker-compose.dev.yml | 7 +++++++ docker-compose.yml | 7 +++++++ src/main.rs | 25 +++++++++++++++++++---- src/utils.rs | 45 +++++++++++++++++++++++++++++++----------- tests/integration.rs | 5 +++-- 8 files changed, 76 insertions(+), 23 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 65206e3..dd8fa23 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -22,9 +22,7 @@ Run tests with `cargo t` Remove the cargo cache for buildkit with `docker builder prune --filter type=exec.cachemount` # License -All code in this repository is dual-licensed under either [License-MIT](./LICENSE-MIT) or [LICENSE-APACHE](./LICENSE-Apache) at your option. This means you can select the license you prefer. - -[Why dual license](https://github.com/bevyengine/bevy/issues/2373) +All code in this repository is dual-licensed under either [License-MIT](./LICENSE-MIT) or [LICENSE-APACHE](./LICENSE-Apache) at your option. This means you can select the license you prefer. [Why dual license](https://github.com/bevyengine/bevy/issues/2373). # Your contributions Any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/Cargo.toml b/Cargo.toml index da41542..68f9af2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ anyhow = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" surrealdb = "1.0" -tokio = "1.35" +tokio = { version = "1.35", features = ["time"] } futures = "0.3" wikidata = "0.3.1" bzip2 = { version = "0.4", features = ["tokio"] } diff --git a/README.md b/README.md index a8c6e93..6fff867 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ https://www.wikidata.org/wiki/Special:EntityData/P527.json ``` # Install -Copy docker-compose.yml +Copy [docker-compose.yml](./docker-compose.yml) Create data folder next to docker-compose.yml and .env, place data inside, and set the data type in .env ``` @@ -40,6 +40,8 @@ FILE_NAME=data/latest-all.json.bz2 # If not using docker file for Wikidata to SurrealDB, use 0.0.0.0:8000 WIKIDATA_DB_PORT=surrealdb:8000 THREADED_REQUESTS=true +# true=overwrite existing data, false=skip if already exists +OVERWRITE_DB=false ``` ## View Progress diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 534ad9f..35e16b4 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -13,6 +13,13 @@ services: - --pass - $DB_PASSWORD - file:/data/surrealdb + restart: always + deploy: + resources: + limits: + memory: 8GB + reservations: + cpus: '0.5' ports: - 8000:8000 volumes: diff --git a/docker-compose.yml b/docker-compose.yml index 6ebcc22..c985f4b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,13 @@ services: - --pass - $DB_PASSWORD - file:/data/surrealdb + restart: always + deploy: + resources: + limits: + memory: 8GB + reservations: + cpus: '0.5' ports: - 8000:8000 volumes: diff --git a/src/main.rs b/src/main.rs index c688f6c..25846ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ use anyhow::{Error, Ok, Result}; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use lazy_static::lazy_static; -use std::{env, fmt::Write, io::BufRead, thread, time::Duration}; +use std::{env, fmt::Write, io::BufRead}; use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal}; +use tokio::time::{sleep, Duration}; mod utils; use utils::*; @@ -18,7 +19,7 @@ lazy_static! { #[tokio::main] async fn main() -> Result<(), Error> { - thread::sleep(Duration::from_secs(10)); + sleep(Duration::from_secs(10)).await; let total_size = 113_000_000; let pb = ProgressBar::new(total_size); @@ -48,14 +49,30 @@ async fn main() -> Result<(), Error> { if !*THREADED_REQUESTS { let mut counter = 0; for line in reader.lines() { - create_db_entity(&db, line?).await?; + let mut retries = 0; + let line = line?; + + loop { + if create_db_entity(&db, &line).await.is_ok() { + break; + } + if retries >= 60 * 10 { + panic!("Failed to create entities, too many retries"); + } + retries += 1; + sleep(Duration::from_secs(1)).await; + if db.use_ns("wikidata").use_db("wikidata").await.is_err() { + continue; + }; + } + counter += 1; if counter % 100 == 0 { pb.inc(100); } } } else { - create_db_entities_threaded(&db, reader, Some(pb.clone()), 1000, 100).await?; + create_db_entities_threaded(&db, reader, Some(pb.clone()), 2500, 100).await?; } pb.finish(); diff --git a/src/utils.rs b/src/utils.rs index e16df68..1b1dcb1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -2,17 +2,27 @@ use anyhow::{Error, Ok, Result}; use bzip2::read::MultiBzDecoder; use futures::future::join_all; use indicatif::ProgressBar; +use lazy_static::lazy_static; use serde_json::{from_str, Value}; use std::{ + env, fs::File, io::{BufRead, BufReader}, }; use surrealdb::{Connection, Surreal}; +use tokio::time::{sleep, Duration}; use wikidata::Entity; mod tables; use tables::*; +lazy_static! { + static ref OVERWRITE_DB: bool = env::var("OVERWRITE_DB") + .expect("OVERWRITE_DB not set") + .parse() + .expect("Failed to parse OVERWRITE_DB"); +} + #[allow(non_camel_case_types)] pub enum File_Format { json, @@ -35,7 +45,7 @@ impl File_Format { } } -pub async fn create_db_entity(db: &Surreal, line: String) -> Result<(), Error> { +pub async fn create_db_entity(db: &Surreal, line: &str) -> Result<(), Error> { let line = line.trim().trim_end_matches(',').to_string(); if line == "[" || line == "]" { return Ok(()); @@ -48,15 +58,13 @@ pub async fn create_db_entity(db: &Surreal, line: String) -> R let id = data.id.clone().expect("No ID"); data.id = None; - let _ = db.create::>(&id).await.is_err(); - { + if db.create::>(&id).await.is_err() && *OVERWRITE_DB { db.update::>(&id).content(data).await?; - }; + } let id = claims.id.clone().expect("No ID"); claims.id = None; - let _ = db.create::>(&id).await.is_err(); - { + if db.create::>(&id).await.is_err() && *OVERWRITE_DB { db.update::>(&id).content(claims).await?; } Ok(()) @@ -64,12 +72,12 @@ pub async fn create_db_entity(db: &Surreal, line: String) -> R pub async fn create_db_entities( db: &Surreal, - lines: Vec, - pb: Option, + lines: &Vec, + pb: &Option, ) -> Result<(), Error> { let mut counter = 0; for line in lines { - create_db_entity(db, line.to_string()).await?; + create_db_entity(db, line).await?; counter += 1; if counter % 100 == 0 { if let Some(ref p) = pb { @@ -92,7 +100,7 @@ pub async fn create_db_entities_threaded( let mut chunk_counter = 0; for line in reader.lines() { - chunk.push(line.unwrap()); + chunk.push(line?); if chunk.len() >= batch_size { let db = db.clone(); @@ -100,7 +108,20 @@ pub async fn create_db_entities_threaded( let pb = pb.clone(); futures.push(tokio::spawn(async move { - create_db_entities(&db, lines, pb).await.unwrap(); + let mut retries = 0; + loop { + if create_db_entities(&db, &lines, &pb).await.is_ok() { + break; + } + if retries >= 60 * 10 { + panic!("Failed to create entities, too many retries"); + } + retries += 1; + sleep(Duration::from_secs(1)).await; + if db.use_ns("wikidata").use_db("wikidata").await.is_err() { + continue; + }; + } })); chunk_counter += 1; chunk.clear(); @@ -113,7 +134,7 @@ pub async fn create_db_entities_threaded( } } - create_db_entities(db, chunk, pb).await.unwrap(); + create_db_entities(db, &chunk, &pb).await?; join_all(futures).await; Ok(()) } diff --git a/tests/integration.rs b/tests/integration.rs index 563a7ca..8a12fcd 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -9,6 +9,7 @@ use wikidata_to_surrealdb::utils::*; async fn inti_db() -> Result, Error> { env::set_var("WIKIDATA_LANG", "en"); + env::set_var("OVERWRITE_DB", "true"); let db = Surreal::new::(()).await?; db.use_ns("wikidata").use_db("wikidata").await?; @@ -40,7 +41,7 @@ async fn entity() { .unwrap(); for line in reader.lines() { - create_db_entity(&db, line.unwrap()).await.unwrap(); + create_db_entity(&db, &line.unwrap()).await.unwrap(); } assert_eq!(51.0, entity_query(&db).await.unwrap().unwrap()) @@ -82,7 +83,7 @@ async fn property() { .unwrap(); for line in reader.lines() { - create_db_entity(&db, line.unwrap()).await.unwrap(); + create_db_entity(&db, &line.unwrap()).await.unwrap(); } assert_eq!(2.0, property_query(&db).await.unwrap().unwrap())