From dc85c7d997f17d999c85c9df59ef007b37a72bd5 Mon Sep 17 00:00:00 2001 From: NexVeridian Date: Tue, 19 Dec 2023 03:10:22 -0800 Subject: [PATCH] Individual ws --- README.md | 8 ++--- docker-compose.dev.yml | 2 -- docker-compose.yml | 2 -- src/main.rs | 30 +++++++---------- src/utils.rs | 74 +++++++++++++++++++++++++++++++++--------- tests/integration.rs | 4 +-- 6 files changed, 76 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 6fff867..e77baac 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,10 @@ Create data folder next to docker-compose.yml and .env, place data inside, and s └── .env ``` -`docker compose up --pull always` +`docker compose up --pull always -d` + +## View Progress +`docker attach wikidata-to-surrealdb` ## Example .env ``` @@ -44,9 +47,6 @@ THREADED_REQUESTS=true OVERWRITE_DB=false ``` -## View Progress -`docker attach wikidata-to-surrealdb` - # [Dev Install](./CONTRIBUTING.md#dev-install) # How to Query diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 35e16b4..225c815 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -16,8 +16,6 @@ services: restart: always deploy: resources: - limits: - memory: 8GB reservations: cpus: '0.5' ports: diff --git a/docker-compose.yml b/docker-compose.yml index c985f4b..4483e59 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,8 +16,6 @@ services: restart: always deploy: resources: - limits: - memory: 8GB reservations: cpus: '0.5' ports: diff --git a/src/main.rs b/src/main.rs index 25846ae..8925e7e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,19 +2,20 @@ use anyhow::{Error, Ok, Result}; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use lazy_static::lazy_static; use std::{env, fmt::Write, io::BufRead}; -use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal}; +use surrealdb::{engine::remote::ws::Client, Surreal}; use tokio::time::{sleep, Duration}; mod utils; use utils::*; lazy_static! { - #[derive(Debug)] - static ref DB_USER: String = env::var("DB_USER").expect("DB_USER not set"); - static ref DB_PASSWORD: String = env::var("DB_PASSWORD").expect("DB_PASSWORD not set"); - static ref WIKIDATA_FILE_FORMAT: String = env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set"); - static ref WIKIDATA_FILE_NAME: String = env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set"); - static ref WIKIDATA_DB_PORT: String = env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set"); - static ref THREADED_REQUESTS: bool = env::var("THREADED_REQUESTS").expect("THREADED_REQUESTS not set").parse().expect("Failed to parse THREADED_REQUESTS"); + static ref WIKIDATA_FILE_FORMAT: String = + env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set"); + static ref WIKIDATA_FILE_NAME: String = + env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set"); + static ref THREADED_REQUESTS: bool = env::var("THREADED_REQUESTS") + .expect("THREADED_REQUESTS not set") + .parse() + .expect("Failed to parse THREADED_REQUESTS"); } #[tokio::main] @@ -35,15 +36,7 @@ async fn main() -> Result<(), Error> { }), ); - let db = Surreal::new::(WIKIDATA_DB_PORT.as_str()).await?; - - db.signin(Root { - username: &DB_USER, - password: &DB_PASSWORD, - }) - .await?; - db.use_ns("wikidata").use_db("wikidata").await?; - + let db = create_db_ws().await?; let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?; if !*THREADED_REQUESTS { @@ -72,7 +65,8 @@ async fn main() -> Result<(), Error> { } } } else { - create_db_entities_threaded(&db, reader, Some(pb.clone()), 2500, 100).await?; + create_db_entities_threaded(None::>, reader, Some(pb.clone()), 2500, 100) + .await?; } pb.finish(); diff --git a/src/utils.rs b/src/utils.rs index 1b1dcb1..8ce875a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -use anyhow::{Error, Ok, Result}; +use anyhow::{Error, Result}; use bzip2::read::MultiBzDecoder; use futures::future::join_all; use indicatif::ProgressBar; @@ -9,7 +9,11 @@ use std::{ fs::File, io::{BufRead, BufReader}, }; -use surrealdb::{Connection, Surreal}; +use surrealdb::{ + engine::remote::ws::{Client, Ws}, + opt::auth::Root, + Connection, Surreal, +}; use tokio::time::{sleep, Duration}; use wikidata::Entity; @@ -21,6 +25,10 @@ lazy_static! { .expect("OVERWRITE_DB not set") .parse() .expect("Failed to parse OVERWRITE_DB"); + static ref DB_USER: String = env::var("DB_USER").expect("DB_USER not set"); + static ref DB_PASSWORD: String = env::var("DB_PASSWORD").expect("DB_PASSWORD not set"); + static ref WIKIDATA_DB_PORT: String = + env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set"); } #[allow(non_camel_case_types)] @@ -45,7 +53,7 @@ impl File_Format { } } -pub async fn create_db_entity(db: &Surreal, line: &str) -> Result<(), Error> { +pub async fn create_db_entity(db: &Surreal, line: &str) -> Result<(), Error> { let line = line.trim().trim_end_matches(',').to_string(); if line == "[" || line == "]" { return Ok(()); @@ -70,8 +78,8 @@ pub async fn create_db_entity(db: &Surreal, line: &str) -> Res Ok(()) } -pub async fn create_db_entities( - db: &Surreal, +pub async fn create_db_entities( + db: &Surreal, lines: &Vec, pb: &Option, ) -> Result<(), Error> { @@ -88,8 +96,8 @@ pub async fn create_db_entities( Ok(()) } -pub async fn create_db_entities_threaded( - db: &Surreal, +pub async fn create_db_entities_threaded( + dbo: Option>, // None::> reader: Box, pb: Option, batch_size: usize, @@ -103,28 +111,42 @@ pub async fn create_db_entities_threaded( chunk.push(line?); if chunk.len() >= batch_size { - let db = db.clone(); - let lines = chunk.clone(); + let dbo = dbo.clone(); let pb = pb.clone(); futures.push(tokio::spawn(async move { let mut retries = 0; loop { - if create_db_entities(&db, &lines, &pb).await.is_ok() { - break; + match dbo { + Some(ref db) => { + if create_db_entities(db, &chunk, &pb).await.is_ok() { + break; + } + if db.use_ns("wikidata").use_db("wikidata").await.is_err() { + continue; + }; + } + None => { + let db = if let Ok(db) = create_db_ws().await { + db + } else { + continue; + }; + if create_db_entities(&db, &chunk, &pb).await.is_ok() { + break; + } + } } + if retries >= 60 * 10 { panic!("Failed to create entities, too many retries"); } retries += 1; sleep(Duration::from_secs(1)).await; - if db.use_ns("wikidata").use_db("wikidata").await.is_err() { - continue; - }; } })); chunk_counter += 1; - chunk.clear(); + chunk = Vec::new(); } if chunk_counter >= batch_num { @@ -134,7 +156,27 @@ pub async fn create_db_entities_threaded( } } - create_db_entities(db, &chunk, &pb).await?; + match dbo { + Some(db) => { + create_db_entities(&db, &chunk, &pb).await?; + } + None => { + create_db_entities(&create_db_ws().await?, &chunk, &pb).await?; + } + } join_all(futures).await; Ok(()) } + +pub async fn create_db_ws() -> Result, Error> { + let db = Surreal::new::(WIKIDATA_DB_PORT.as_str()).await?; + + db.signin(Root { + username: &DB_USER, + password: &DB_PASSWORD, + }) + .await?; + db.use_ns("wikidata").use_db("wikidata").await?; + + Ok(db) +} diff --git a/tests/integration.rs b/tests/integration.rs index 8a12fcd..4429efb 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -54,7 +54,7 @@ async fn entity_threaded() { .reader("tests/data/Entity.json") .unwrap(); - create_db_entities_threaded(&db, reader, None, 1000, 100) + create_db_entities_threaded(Some(db.clone()), reader, None, 1000, 100) .await .unwrap(); @@ -96,7 +96,7 @@ async fn property_threaded() { .reader("tests/data/Property.json") .unwrap(); - create_db_entities_threaded(&db, reader, None, 1000, 100) + create_db_entities_threaded(Some(db.clone()), reader, None, 1000, 100) .await .unwrap();