Individual ws

This commit is contained in:
Elijah McMorris 2023-12-19 03:10:22 -08:00
parent 8905c88819
commit dc85c7d997
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
6 changed files with 76 additions and 44 deletions

View file

@ -28,7 +28,10 @@ Create data folder next to docker-compose.yml and .env, place data inside, and s
└── .env └── .env
``` ```
`docker compose up --pull always` `docker compose up --pull always -d`
## View Progress
`docker attach wikidata-to-surrealdb`
## Example .env ## Example .env
``` ```
@ -44,9 +47,6 @@ THREADED_REQUESTS=true
OVERWRITE_DB=false OVERWRITE_DB=false
``` ```
## View Progress
`docker attach wikidata-to-surrealdb`
# [Dev Install](./CONTRIBUTING.md#dev-install) # [Dev Install](./CONTRIBUTING.md#dev-install)
# How to Query # How to Query

View file

@ -16,8 +16,6 @@ services:
restart: always restart: always
deploy: deploy:
resources: resources:
limits:
memory: 8GB
reservations: reservations:
cpus: '0.5' cpus: '0.5'
ports: ports:

View file

@ -16,8 +16,6 @@ services:
restart: always restart: always
deploy: deploy:
resources: resources:
limits:
memory: 8GB
reservations: reservations:
cpus: '0.5' cpus: '0.5'
ports: ports:

View file

@ -2,19 +2,20 @@ use anyhow::{Error, Ok, Result};
use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::{env, fmt::Write, io::BufRead}; use std::{env, fmt::Write, io::BufRead};
use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal}; use surrealdb::{engine::remote::ws::Client, Surreal};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
mod utils; mod utils;
use utils::*; use utils::*;
lazy_static! { lazy_static! {
#[derive(Debug)] static ref WIKIDATA_FILE_FORMAT: String =
static ref DB_USER: String = env::var("DB_USER").expect("DB_USER not set"); env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set");
static ref DB_PASSWORD: String = env::var("DB_PASSWORD").expect("DB_PASSWORD not set"); static ref WIKIDATA_FILE_NAME: String =
static ref WIKIDATA_FILE_FORMAT: String = env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set"); env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set");
static ref WIKIDATA_FILE_NAME: String = env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set"); static ref THREADED_REQUESTS: bool = env::var("THREADED_REQUESTS")
static ref WIKIDATA_DB_PORT: String = env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set"); .expect("THREADED_REQUESTS not set")
static ref THREADED_REQUESTS: bool = env::var("THREADED_REQUESTS").expect("THREADED_REQUESTS not set").parse().expect("Failed to parse THREADED_REQUESTS"); .parse()
.expect("Failed to parse THREADED_REQUESTS");
} }
#[tokio::main] #[tokio::main]
@ -35,15 +36,7 @@ async fn main() -> Result<(), Error> {
}), }),
); );
let db = Surreal::new::<Ws>(WIKIDATA_DB_PORT.as_str()).await?; let db = create_db_ws().await?;
db.signin(Root {
username: &DB_USER,
password: &DB_PASSWORD,
})
.await?;
db.use_ns("wikidata").use_db("wikidata").await?;
let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?; let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?;
if !*THREADED_REQUESTS { if !*THREADED_REQUESTS {
@ -72,7 +65,8 @@ async fn main() -> Result<(), Error> {
} }
} }
} else { } else {
create_db_entities_threaded(&db, reader, Some(pb.clone()), 2500, 100).await?; create_db_entities_threaded(None::<Surreal<Client>>, reader, Some(pb.clone()), 2500, 100)
.await?;
} }
pb.finish(); pb.finish();

View file

@ -1,4 +1,4 @@
use anyhow::{Error, Ok, Result}; use anyhow::{Error, Result};
use bzip2::read::MultiBzDecoder; use bzip2::read::MultiBzDecoder;
use futures::future::join_all; use futures::future::join_all;
use indicatif::ProgressBar; use indicatif::ProgressBar;
@ -9,7 +9,11 @@ use std::{
fs::File, fs::File,
io::{BufRead, BufReader}, io::{BufRead, BufReader},
}; };
use surrealdb::{Connection, Surreal}; use surrealdb::{
engine::remote::ws::{Client, Ws},
opt::auth::Root,
Connection, Surreal,
};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use wikidata::Entity; use wikidata::Entity;
@ -21,6 +25,10 @@ lazy_static! {
.expect("OVERWRITE_DB not set") .expect("OVERWRITE_DB not set")
.parse() .parse()
.expect("Failed to parse OVERWRITE_DB"); .expect("Failed to parse OVERWRITE_DB");
static ref DB_USER: String = env::var("DB_USER").expect("DB_USER not set");
static ref DB_PASSWORD: String = env::var("DB_PASSWORD").expect("DB_PASSWORD not set");
static ref WIKIDATA_DB_PORT: String =
env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set");
} }
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
@ -45,7 +53,7 @@ impl File_Format {
} }
} }
pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: &str) -> Result<(), Error> { pub async fn create_db_entity(db: &Surreal<impl Connection>, line: &str) -> Result<(), Error> {
let line = line.trim().trim_end_matches(',').to_string(); let line = line.trim().trim_end_matches(',').to_string();
if line == "[" || line == "]" { if line == "[" || line == "]" {
return Ok(()); return Ok(());
@ -70,8 +78,8 @@ pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: &str) -> Res
Ok(()) Ok(())
} }
pub async fn create_db_entities<C: Connection>( pub async fn create_db_entities(
db: &Surreal<C>, db: &Surreal<impl Connection>,
lines: &Vec<String>, lines: &Vec<String>,
pb: &Option<ProgressBar>, pb: &Option<ProgressBar>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -88,8 +96,8 @@ pub async fn create_db_entities<C: Connection>(
Ok(()) Ok(())
} }
pub async fn create_db_entities_threaded<C: Connection>( pub async fn create_db_entities_threaded(
db: &Surreal<C>, dbo: Option<Surreal<impl Connection>>, // None::<Surreal<Client>>
reader: Box<dyn BufRead>, reader: Box<dyn BufRead>,
pb: Option<ProgressBar>, pb: Option<ProgressBar>,
batch_size: usize, batch_size: usize,
@ -103,28 +111,42 @@ pub async fn create_db_entities_threaded<C: Connection>(
chunk.push(line?); chunk.push(line?);
if chunk.len() >= batch_size { if chunk.len() >= batch_size {
let db = db.clone(); let dbo = dbo.clone();
let lines = chunk.clone();
let pb = pb.clone(); let pb = pb.clone();
futures.push(tokio::spawn(async move { futures.push(tokio::spawn(async move {
let mut retries = 0; let mut retries = 0;
loop { loop {
if create_db_entities(&db, &lines, &pb).await.is_ok() { match dbo {
Some(ref db) => {
if create_db_entities(db, &chunk, &pb).await.is_ok() {
break; break;
} }
if db.use_ns("wikidata").use_db("wikidata").await.is_err() {
continue;
};
}
None => {
let db = if let Ok(db) = create_db_ws().await {
db
} else {
continue;
};
if create_db_entities(&db, &chunk, &pb).await.is_ok() {
break;
}
}
}
if retries >= 60 * 10 { if retries >= 60 * 10 {
panic!("Failed to create entities, too many retries"); panic!("Failed to create entities, too many retries");
} }
retries += 1; retries += 1;
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
if db.use_ns("wikidata").use_db("wikidata").await.is_err() {
continue;
};
} }
})); }));
chunk_counter += 1; chunk_counter += 1;
chunk.clear(); chunk = Vec::new();
} }
if chunk_counter >= batch_num { if chunk_counter >= batch_num {
@ -134,7 +156,27 @@ pub async fn create_db_entities_threaded<C: Connection>(
} }
} }
create_db_entities(db, &chunk, &pb).await?; match dbo {
Some(db) => {
create_db_entities(&db, &chunk, &pb).await?;
}
None => {
create_db_entities(&create_db_ws().await?, &chunk, &pb).await?;
}
}
join_all(futures).await; join_all(futures).await;
Ok(()) Ok(())
} }
pub async fn create_db_ws() -> Result<Surreal<Client>, Error> {
let db = Surreal::new::<Ws>(WIKIDATA_DB_PORT.as_str()).await?;
db.signin(Root {
username: &DB_USER,
password: &DB_PASSWORD,
})
.await?;
db.use_ns("wikidata").use_db("wikidata").await?;
Ok(db)
}

View file

@ -54,7 +54,7 @@ async fn entity_threaded() {
.reader("tests/data/Entity.json") .reader("tests/data/Entity.json")
.unwrap(); .unwrap();
create_db_entities_threaded(&db, reader, None, 1000, 100) create_db_entities_threaded(Some(db.clone()), reader, None, 1000, 100)
.await .await
.unwrap(); .unwrap();
@ -96,7 +96,7 @@ async fn property_threaded() {
.reader("tests/data/Property.json") .reader("tests/data/Property.json")
.unwrap(); .unwrap();
create_db_entities_threaded(&db, reader, None, 1000, 100) create_db_entities_threaded(Some(db.clone()), reader, None, 1000, 100)
.await .await
.unwrap(); .unwrap();