feat: remove lazy_static and make tables.rs async

This commit is contained in:
Elijah McMorris 2024-09-24 14:28:15 -07:00
parent dbd039eebf
commit 5fa50cf69f
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
8 changed files with 142 additions and 83 deletions

1
Cargo.lock generated
View file

@ -4347,7 +4347,6 @@ dependencies = [
"criterion", "criterion",
"futures 0.3.30", "futures 0.3.30",
"indicatif", "indicatif",
"lazy_static",
"pprof", "pprof",
"rand", "rand",
"rstest", "rstest",

View file

@ -9,11 +9,10 @@ anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
surrealdb = { version = "2.0.1", features = ["protocol-http", "kv-mem"] } surrealdb = { version = "2.0.1", features = ["protocol-http", "kv-mem"] }
tokio = { version = "1.39", features = ["fs", "time"] } tokio = { version = "1.39", features = ["fs", "time", "sync"] }
futures = "0.3" futures = "0.3"
wikidata = "1.1" wikidata = "1.1"
bzip2 = { version = "0.4", features = ["tokio"] } bzip2 = { version = "0.4", features = ["tokio"] }
lazy_static = "1.5"
indicatif = "0.17" indicatif = "0.17"
rand = "0.8" rand = "0.8"
backon = { version = "1.2", features = ["tokio-sleep"] } backon = { version = "1.2", features = ["tokio-sleep"] }

View file

@ -64,7 +64,7 @@ Env string CREATE_VERSION must be in the enum CREATE_VERSION
pub enum CreateVersion { pub enum CreateVersion {
#[default] #[default]
Bulk, Bulk,
/// must create a filter.surql file in the data directory /// must create a `filter.surql` file in the data directory
BulkFilter, BulkFilter,
} }
``` ```

View file

@ -1,39 +1,59 @@
use anyhow::{Error, Ok, Result}; use anyhow::{Error, Ok, Result};
use lazy_static::lazy_static;
use std::env; use std::env;
use surrealdb::{engine::remote::http::Client, Surreal}; use surrealdb::{engine::remote::http::Client, Surreal};
use tokio::time::{sleep, Duration}; use tokio::{
fs,
sync::OnceCell,
time::{sleep, Duration},
};
mod utils; mod utils;
use init_reader::File_Format; use init_reader::File_Format;
use utils::*; use utils::*;
lazy_static! { static WIKIDATA_FILE_FORMAT: OnceCell<String> = OnceCell::const_new();
static ref WIKIDATA_FILE_FORMAT: String = static WIKIDATA_FILE_NAME: OnceCell<String> = OnceCell::const_new();
env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set"); static CREATE_VERSION: OnceCell<CreateVersion> = OnceCell::const_new();
static ref WIKIDATA_FILE_NAME: String =
env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set"); async fn get_wikidata_file_format() -> &'static String {
static ref CREATE_VERSION: CreateVersion = match env::var("CREATE_VERSION") WIKIDATA_FILE_FORMAT
.get_or_init(|| async { env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set") })
.await
}
async fn get_wikidata_file_name() -> &'static String {
WIKIDATA_FILE_NAME
.get_or_init(|| async { env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set") })
.await
}
async fn get_create_version() -> &'static CreateVersion {
CREATE_VERSION
.get_or_init(|| async {
match env::var("CREATE_VERSION")
.expect("CREATE_VERSION not set") .expect("CREATE_VERSION not set")
.as_str() .as_str()
{ {
"Bulk" => CreateVersion::Bulk, "Bulk" => CreateVersion::Bulk,
"BulkFilter" => CreateVersion::BulkFilter, "BulkFilter" => CreateVersion::BulkFilter,
_ => panic!("Unknown CREATE_VERSION"), _ => panic!("Unknown CREATE_VERSION"),
}; }
})
.await
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
sleep(Duration::from_secs(10)).await; sleep(Duration::from_secs(10)).await;
let pb = init_progress_bar::create_pb().await; let pb = init_progress_bar::create_pb().await;
let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?; let reader = File_Format::new(get_wikidata_file_format().await)
.reader(get_wikidata_file_name().await)?;
tokio::fs::create_dir_all("data/temp").await?; fs::create_dir_all("data/temp").await?;
tokio::fs::remove_dir_all("data/temp").await?; fs::remove_dir_all("data/temp").await?;
tokio::fs::create_dir_all("data/temp").await?; fs::create_dir_all("data/temp").await?;
CREATE_VERSION get_create_version()
.await
.run( .run(
None::<Surreal<Client>>, None::<Surreal<Client>>,
reader, reader,

View file

@ -3,11 +3,11 @@ use backon::Retryable;
use core::panic; use core::panic;
use futures::future::join_all; use futures::future::join_all;
use indicatif::ProgressBar; use indicatif::ProgressBar;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde_json::{from_str, Value}; use serde_json::{from_str, Value};
use std::{env, io::BufRead}; use std::{env, io::BufRead};
use surrealdb::{Connection, Surreal}; use surrealdb::{Connection, Surreal};
use tokio::sync::OnceCell;
use wikidata::Entity; use wikidata::Entity;
pub mod init_backoff; pub mod init_backoff;
@ -17,20 +17,33 @@ pub mod init_reader;
mod tables; mod tables;
use tables::*; use tables::*;
lazy_static! { static OVERWRITE_DB: OnceCell<bool> = OnceCell::const_new();
static ref OVERWRITE_DB: bool = env::var("OVERWRITE_DB") static FILTER_PATH: OnceCell<String> = OnceCell::const_new();
async fn get_overwrite_db() -> bool {
*OVERWRITE_DB
.get_or_init(|| async {
env::var("OVERWRITE_DB")
.expect("OVERWRITE_DB not set") .expect("OVERWRITE_DB not set")
.parse() .parse::<bool>()
.expect("Failed to parse OVERWRITE_DB"); .expect("Failed to parse OVERWRITE_DB")
static ref FILTER_PATH: String = })
env::var("FILTER_PATH").unwrap_or("data/filter.surql".to_string()); .await
}
async fn get_filter_path() -> &'static String {
FILTER_PATH
.get_or_init(|| async {
env::var("FILTER_PATH").unwrap_or("data/filter.surql".to_string())
})
.await
} }
#[derive(Clone, Copy, Default)] #[derive(Clone, Copy, Default)]
pub enum CreateVersion { pub enum CreateVersion {
#[default] #[default]
Bulk, Bulk,
/// must create a filter.surql file in the root directory /// must create a `filter.surql` file in the root directory
BulkFilter, BulkFilter,
} }
@ -77,7 +90,7 @@ impl CreateVersion {
Some(db) => self.create_retry(&db, &chunk, &pb, batch_size).await, Some(db) => self.create_retry(&db, &chunk, &pb, batch_size).await,
None => { None => {
let db = init_db::create_db_remote let db = init_db::create_db_remote
.retry(*init_backoff::exponential) .retry(*init_backoff::get_exponential().await)
.await .await
.expect("Failed to create remote db"); .expect("Failed to create remote db");
self.create_retry(&db, &chunk, &pb, batch_size).await self.create_retry(&db, &chunk, &pb, batch_size).await
@ -96,7 +109,7 @@ impl CreateVersion {
batch_size: usize, batch_size: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
(|| async { self.create(db, chunk, pb, batch_size).await }) (|| async { self.create(db, chunk, pb, batch_size).await })
.retry(*init_backoff::exponential) .retry(*init_backoff::get_exponential().await)
.await .await
} }
@ -137,7 +150,7 @@ impl CreateVersion {
Ok(data) => data, Ok(data) => data,
Err(_) => continue, Err(_) => continue,
}; };
let (claims, data) = EntityMini::from_entity(data); let (claims, data) = EntityMini::from_entity(data).await;
match data.id.clone().expect("No ID").tb.as_str() { match data.id.clone().expect("No ID").tb.as_str() {
"Property" => property_vec.push(data), "Property" => property_vec.push(data),
"Lexeme" => lexeme_vec.push(data), "Lexeme" => lexeme_vec.push(data),
@ -147,7 +160,7 @@ impl CreateVersion {
claims_vec.push(claims); claims_vec.push(claims);
} }
if *OVERWRITE_DB { if get_overwrite_db().await {
db.upsert::<Vec<EntityMini>>("Entity") db.upsert::<Vec<EntityMini>>("Entity")
.content(entity_vec) .content(entity_vec)
.await?; .await?;
@ -191,7 +204,7 @@ impl CreateVersion {
let db_mem = init_db::create_db_mem().await?; let db_mem = init_db::create_db_mem().await?;
self.create_bulk(&db_mem, lines, &None, batch_size).await?; self.create_bulk(&db_mem, lines, &None, batch_size).await?;
let filter = tokio::fs::read_to_string(&*FILTER_PATH).await?; let filter = tokio::fs::read_to_string(get_filter_path().await).await?;
db_mem.query(filter).await?; db_mem.query(filter).await?;
let file_name: String = rand::thread_rng() let file_name: String = rand::thread_rng()

View file

@ -1,9 +1,14 @@
use backon::ExponentialBuilder; use backon::ExponentialBuilder;
use lazy_static::lazy_static; use tokio::{sync::OnceCell, time::Duration};
use tokio::time::Duration;
lazy_static! { static BACKOFF_EXPONENTIAL: OnceCell<ExponentialBuilder> = OnceCell::const_new();
pub static ref exponential: ExponentialBuilder = ExponentialBuilder::default()
pub async fn get_exponential() -> &'static ExponentialBuilder {
BACKOFF_EXPONENTIAL
.get_or_init(|| async {
ExponentialBuilder::default()
.with_max_times(30) .with_max_times(30)
.with_max_delay(Duration::from_secs(60)); .with_max_delay(Duration::from_secs(60))
})
.await
} }

View file

@ -1,6 +1,5 @@
use anyhow::Error; use anyhow::Error;
use anyhow::Result; use anyhow::Result;
use lazy_static::lazy_static;
use std::env; use std::env;
use surrealdb::{ use surrealdb::{
engine::{ engine::{
@ -10,20 +9,36 @@ use surrealdb::{
opt::auth::Root, opt::auth::Root,
Surreal, Surreal,
}; };
use tokio::sync::OnceCell;
lazy_static! { static DB_USER: OnceCell<String> = OnceCell::const_new();
static ref DB_USER: String = env::var("DB_USER").expect("DB_USER not set"); static DB_PASSWORD: OnceCell<String> = OnceCell::const_new();
static ref DB_PASSWORD: String = env::var("DB_PASSWORD").expect("DB_PASSWORD not set"); static WIKIDATA_DB_PORT: OnceCell<String> = OnceCell::const_new();
static ref WIKIDATA_DB_PORT: String =
env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set"); pub async fn get_db_user() -> &'static String {
DB_USER
.get_or_init(|| async { env::var("DB_USER").expect("DB_USER not set") })
.await
}
pub async fn get_db_password() -> &'static String {
DB_PASSWORD
.get_or_init(|| async { env::var("DB_PASSWORD").expect("DB_PASSWORD not set") })
.await
}
pub async fn get_wikidata_db_port() -> &'static String {
WIKIDATA_DB_PORT
.get_or_init(|| async { env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set") })
.await
} }
pub async fn create_db_remote() -> Result<Surreal<Client>, Error> { pub async fn create_db_remote() -> Result<Surreal<Client>, Error> {
let db = Surreal::new::<Http>(WIKIDATA_DB_PORT.as_str()).await?; let db = Surreal::new::<Http>(get_wikidata_db_port().await).await?;
db.signin(Root { db.signin(Root {
username: &DB_USER, username: get_db_user().await,
password: &DB_PASSWORD, password: get_db_password().await,
}) })
.await?; .await?;
db.use_ns("wikidata").use_db("wikidata").await?; db.use_ns("wikidata").use_db("wikidata").await?;

View file

@ -1,13 +1,16 @@
use lazy_static::lazy_static; use futures::future::join_all;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::env; use std::env;
use surrealdb::sql::{Id, Thing}; use surrealdb::sql::{Id, Thing};
use tokio::sync::OnceCell;
use wikidata::{ClaimValue, ClaimValueData, Entity, Lang, Pid, WikiId}; use wikidata::{ClaimValue, ClaimValueData, Entity, Lang, Pid, WikiId};
lazy_static! { static WIKIDATA_LANG: OnceCell<String> = OnceCell::const_new();
static ref WIKIDATA_LANG: String = env::var("WIKIDATA_LANG")
.expect("WIKIDATA_LANG not set") async fn get_wikidata_lang() -> &'static String {
.to_string(); WIKIDATA_LANG
.get_or_init(|| async { env::var("WIKIDATA_LANG").expect("WIKIDATA_LANG not set") })
.await
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@ -17,7 +20,7 @@ pub enum ClaimData {
} }
impl ClaimData { impl ClaimData {
fn from_cvd(cvd: ClaimValueData) -> Self { async fn from_cvd(cvd: ClaimValueData) -> Self {
match cvd { match cvd {
ClaimValueData::Item(qid) => ClaimData::Thing(Thing::from(("Entity", Id::from(qid.0)))), ClaimValueData::Item(qid) => ClaimData::Thing(Thing::from(("Entity", Id::from(qid.0)))),
ClaimValueData::Property(pid) => { ClaimValueData::Property(pid) => {
@ -54,48 +57,53 @@ pub struct EntityMini {
} }
impl EntityMini { impl EntityMini {
pub fn from_entity(entity: Entity) -> (Claims, Self) { pub async fn from_entity(entity: Entity) -> (Claims, Self) {
let thing_claim = Thing::from(("Claims", get_id_entity(&entity).id)); let thing_claim = Thing::from(("Claims", get_id_entity(&entity).await.id));
( (
Claims { Claims {
id: Some(thing_claim.clone()), id: Some(thing_claim.clone()),
..Self::flatten_claims(entity.claims.clone()) ..Self::flatten_claims(entity.claims.clone()).await
}, },
Self { Self {
id: Some(get_id_entity(&entity)), id: Some(get_id_entity(&entity).await),
label: get_name(&entity), label: get_name(&entity).await,
claims: thing_claim, claims: thing_claim,
description: get_description(&entity), description: get_description(&entity).await,
}, },
) )
} }
fn flatten_claims(claims: Vec<(Pid, ClaimValue)>) -> Claims { async fn flatten_claims(claims: Vec<(Pid, ClaimValue)>) -> Claims {
Claims { Claims {
id: None, id: None,
claims: claims claims: {
.iter() let futures = claims.iter().map(|(pid, claim_value)| async {
.flat_map(|(pid, claim_value)| {
let mut flattened = vec![Claim { let mut flattened = vec![Claim {
id: Thing::from(("Property", Id::from(pid.0))), id: Thing::from(("Property", Id::from(pid.0))),
value: ClaimData::from_cvd(claim_value.data.clone()), value: ClaimData::from_cvd(claim_value.data.clone()).await,
}]; }];
flattened.extend(claim_value.qualifiers.iter().map( let inner_futures = claim_value.qualifiers.iter().map(
|(qualifier_pid, qualifier_value)| Claim { |(qualifier_pid, qualifier_value)| async {
let qualifier_data = ClaimData::from_cvd(qualifier_value.clone()).await;
Claim {
id: Thing::from(("Claims", Id::from(qualifier_pid.0))), id: Thing::from(("Claims", Id::from(qualifier_pid.0))),
value: ClaimData::from_cvd(qualifier_value.clone()), value: qualifier_data,
}
}, },
)); );
flattened.extend(join_all(inner_futures).await);
flattened flattened
}) });
.collect(),
join_all(futures).await.into_iter().flatten().collect()
},
} }
} }
} }
fn get_id_entity(entity: &Entity) -> Thing { async fn get_id_entity(entity: &Entity) -> Thing {
let (id, tb) = match entity.id { let (id, tb) = match entity.id {
WikiId::EntityId(qid) => (qid.0, "Entity".to_string()), WikiId::EntityId(qid) => (qid.0, "Entity".to_string()),
WikiId::PropertyId(pid) => (pid.0, "Property".to_string()), WikiId::PropertyId(pid) => (pid.0, "Property".to_string()),
@ -106,18 +114,18 @@ fn get_id_entity(entity: &Entity) -> Thing {
Thing::from((tb, Id::from(id))) Thing::from((tb, Id::from(id)))
} }
fn get_name(entity: &Entity) -> String { async fn get_name(entity: &Entity) -> String {
entity entity
.labels .labels
.get(&Lang(WIKIDATA_LANG.to_string())) .get(&Lang(get_wikidata_lang().await.to_string()))
.map(|label| label.to_string()) .map(|label| label.to_string())
.unwrap_or_default() .unwrap_or_default()
} }
fn get_description(entity: &Entity) -> String { async fn get_description(entity: &Entity) -> String {
entity entity
.descriptions .descriptions
.get(&Lang(WIKIDATA_LANG.to_string())) .get(&Lang(get_wikidata_lang().await.to_string()))
.cloned() .cloned()
.unwrap_or_default() .unwrap_or_default()
} }