mirror of
https://github.com/NexVeridian/wikidata-to-surrealdb.git
synced 2025-09-02 09:59:13 +00:00
multi thread
This commit is contained in:
parent
ee15b46ae2
commit
4ea36f78b7
3 changed files with 75 additions and 27 deletions
|
@ -10,6 +10,7 @@ serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
surrealdb = "1.0"
|
surrealdb = "1.0"
|
||||||
tokio = "1.35"
|
tokio = "1.35"
|
||||||
|
futures = "0.3"
|
||||||
wikidata = "0.3.1"
|
wikidata = "0.3.1"
|
||||||
bzip2 = { version = "0.4", features = ["tokio"] }
|
bzip2 = { version = "0.4", features = ["tokio"] }
|
||||||
lazy_static = "1.4"
|
lazy_static = "1.4"
|
||||||
|
|
|
@ -39,6 +39,7 @@ FILE_FORMAT=bz2
|
||||||
FILE_NAME=data/latest-all.json.bz2
|
FILE_NAME=data/latest-all.json.bz2
|
||||||
# If not using docker file for Wikidata to SurrealDB, use 0.0.0.0:8000
|
# If not using docker file for Wikidata to SurrealDB, use 0.0.0.0:8000
|
||||||
WIKIDATA_DB_PORT=surrealdb:8000
|
WIKIDATA_DB_PORT=surrealdb:8000
|
||||||
|
THREADED_REQUESTS=true
|
||||||
```
|
```
|
||||||
|
|
||||||
## View Progress
|
## View Progress
|
||||||
|
|
100
src/main.rs
100
src/main.rs
|
@ -1,5 +1,6 @@
|
||||||
use anyhow::{Error, Ok, Result};
|
use anyhow::{Error, Ok, Result};
|
||||||
use bzip2::read::MultiBzDecoder;
|
use bzip2::read::MultiBzDecoder;
|
||||||
|
use futures::future::join_all;
|
||||||
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use serde_json::{from_str, Value};
|
use serde_json::{from_str, Value};
|
||||||
|
@ -11,7 +12,11 @@ use std::{
|
||||||
thread,
|
thread,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal};
|
use surrealdb::{
|
||||||
|
engine::remote::ws::{Client, Ws},
|
||||||
|
opt::auth::Root,
|
||||||
|
Surreal,
|
||||||
|
};
|
||||||
use wikidata::Entity;
|
use wikidata::Entity;
|
||||||
|
|
||||||
mod utils;
|
mod utils;
|
||||||
|
@ -24,6 +29,7 @@ lazy_static! {
|
||||||
static ref WIKIDATA_FILE_FORMAT: String = env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set");
|
static ref WIKIDATA_FILE_FORMAT: String = env::var("WIKIDATA_FILE_FORMAT").expect("FILE_FORMAT not set");
|
||||||
static ref WIKIDATA_FILE_NAME: String = env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set");
|
static ref WIKIDATA_FILE_NAME: String = env::var("WIKIDATA_FILE_NAME").expect("FILE_NAME not set");
|
||||||
static ref WIKIDATA_DB_PORT: String = env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set");
|
static ref WIKIDATA_DB_PORT: String = env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set");
|
||||||
|
static ref THREADED_REQUESTS: bool = env::var("THREADED_REQUESTS").expect("THREADED_REQUESTS not set").parse().expect("Failed to parse THREADED_REQUESTS");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(non_camel_case_types)]
|
#[allow(non_camel_case_types)]
|
||||||
|
@ -48,17 +54,45 @@ impl File_Format {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn create_db_entity(db: &Surreal<Client>, line: String) -> Result<(), Error> {
|
||||||
|
let line = line.trim().trim_end_matches(',').to_string();
|
||||||
|
if line == "[" || line == "]" {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let json: Value = from_str(&line)?;
|
||||||
|
let data = Entity::from_json(json).expect("Failed to parse JSON");
|
||||||
|
|
||||||
|
let (mut claims, mut data) = EntityMini::from_entity(data);
|
||||||
|
|
||||||
|
let id = data.id.clone().expect("No ID");
|
||||||
|
data.id = None;
|
||||||
|
let _: Option<EntityMini> = db.delete(&id).await?;
|
||||||
|
let _: Option<EntityMini> = db.create(&id).content(data.clone()).await?;
|
||||||
|
|
||||||
|
let id = claims.id.clone().expect("No ID");
|
||||||
|
claims.id = None;
|
||||||
|
let _: Option<Claims> = db.delete(&id).await?;
|
||||||
|
let _: Option<Claims> = db.create(&id).content(claims).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_db_entities(db: &Surreal<Client>, lines: Vec<String>) -> Result<(), Error> {
|
||||||
|
for line in lines {
|
||||||
|
create_db_entity(db, line.to_string()).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Error> {
|
||||||
thread::sleep(Duration::from_secs(10));
|
thread::sleep(Duration::from_secs(10));
|
||||||
|
|
||||||
let mut compleated = 0;
|
|
||||||
let total_size = 113_000_000;
|
let total_size = 113_000_000;
|
||||||
|
|
||||||
let pb = ProgressBar::new(total_size);
|
let pb = ProgressBar::new(total_size);
|
||||||
pb.set_style(
|
pb.set_style(
|
||||||
ProgressStyle::with_template(
|
ProgressStyle::with_template(
|
||||||
"[{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} {percent} ETA:{eta}",
|
"[{elapsed_precise}] [{wide_bar:.cyan/blue}] {human_pos}/{human_len} ETA:[{eta}]",
|
||||||
)?
|
)?
|
||||||
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
|
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| {
|
||||||
let sec = state.eta().as_secs();
|
let sec = state.eta().as_secs();
|
||||||
|
@ -79,31 +113,43 @@ async fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?;
|
let reader = File_Format::new(&WIKIDATA_FILE_FORMAT).reader(&WIKIDATA_FILE_NAME)?;
|
||||||
|
|
||||||
for line in reader.lines() {
|
if !*THREADED_REQUESTS {
|
||||||
let line = line?.trim().trim_end_matches(',').to_string();
|
let counter = 0;
|
||||||
if line == "[" || line == "]" {
|
for line in reader.lines() {
|
||||||
continue;
|
create_db_entity(&db, line?).await?;
|
||||||
|
if counter % 100 == 0 {
|
||||||
|
pb.inc(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let mut futures = Vec::new();
|
||||||
|
let mut chunk = Vec::new();
|
||||||
|
let mut chunk_counter: i32 = 0;
|
||||||
|
const BATCH_AMMOUNT: u16 = 50;
|
||||||
|
|
||||||
|
for line in reader.lines() {
|
||||||
|
chunk.push(line.unwrap());
|
||||||
|
|
||||||
|
if chunk.len() >= BATCH_AMMOUNT.try_into().unwrap() {
|
||||||
|
let db = db.clone();
|
||||||
|
let lines = chunk.clone();
|
||||||
|
let pb = pb.clone();
|
||||||
|
|
||||||
|
futures.push(tokio::spawn(async move {
|
||||||
|
create_db_entities(&db, lines).await.unwrap();
|
||||||
|
pb.inc(BATCH_AMMOUNT.try_into().unwrap());
|
||||||
|
}));
|
||||||
|
chunk_counter += 1;
|
||||||
|
chunk.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
if chunk_counter >= 50 {
|
||||||
|
join_all(futures).await;
|
||||||
|
futures = Vec::new();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let json: Value = from_str(&line)?;
|
join_all(futures).await;
|
||||||
let data = Entity::from_json(json).expect("Failed to parse JSON");
|
|
||||||
|
|
||||||
let (mut claims, mut data) = EntityMini::from_entity(data);
|
|
||||||
|
|
||||||
let id = data.id.clone().expect("No ID");
|
|
||||||
data.id = None;
|
|
||||||
let _: Option<EntityMini> = db.delete(&id).await?;
|
|
||||||
let _: Option<EntityMini> = db.create(&id).content(data.clone()).await?;
|
|
||||||
|
|
||||||
let id = claims.id.clone().expect("No ID");
|
|
||||||
claims.id = None;
|
|
||||||
let _: Option<Claims> = db.delete(&id).await?;
|
|
||||||
let _: Option<Claims> = db.create(&id).content(claims).await?;
|
|
||||||
|
|
||||||
compleated += 1;
|
|
||||||
if compleated % 1000 == 0 {
|
|
||||||
pb.set_position(compleated);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pb.finish_with_message("Done parsing Wikidata");
|
pb.finish_with_message("Done parsing Wikidata");
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue