mirror of
https://github.com/NexVeridian/wikidata-to-surrealdb.git
synced 2025-09-02 09:59:13 +00:00
Add insert filter
This commit is contained in:
parent
93cfa42984
commit
5c6963c8ff
9 changed files with 248 additions and 264 deletions
46
src/utils.rs
46
src/utils.rs
|
@ -3,6 +3,7 @@ use bzip2::read::MultiBzDecoder;
|
|||
use futures::future::join_all;
|
||||
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
|
||||
use lazy_static::lazy_static;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde_json::{from_str, Value};
|
||||
use std::{
|
||||
env,
|
||||
|
@ -10,7 +11,10 @@ use std::{
|
|||
io::{BufRead, BufReader},
|
||||
};
|
||||
use surrealdb::{
|
||||
engine::remote::ws::{Client, Ws},
|
||||
engine::{
|
||||
local::{Db, Mem},
|
||||
remote::ws::{Client, Ws},
|
||||
},
|
||||
opt::auth::Root,
|
||||
Connection, Surreal,
|
||||
};
|
||||
|
@ -29,6 +33,8 @@ lazy_static! {
|
|||
static ref DB_PASSWORD: String = env::var("DB_PASSWORD").expect("DB_PASSWORD not set");
|
||||
static ref WIKIDATA_DB_PORT: String =
|
||||
env::var("WIKIDATA_DB_PORT").expect("WIKIDATA_DB_PORT not set");
|
||||
static ref FILTER_PATH: String =
|
||||
env::var("FILTER_PATH").unwrap_or("../filter.surql".to_string());
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
|
@ -159,10 +165,39 @@ pub async fn create_db_entities_bulk(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn create_db_entities_bulk_filter(
|
||||
db: &Surreal<impl Connection>,
|
||||
lines: &[String],
|
||||
pb: &Option<ProgressBar>,
|
||||
batch_size: usize,
|
||||
) -> Result<(), Error> {
|
||||
let db_mem = create_db_mem().await?;
|
||||
create_db_entities_bulk(&db_mem, lines, pb, batch_size).await?;
|
||||
|
||||
let filter = tokio::fs::read_to_string(&*FILTER_PATH).await?;
|
||||
db_mem.query(filter).await?;
|
||||
|
||||
let file_name: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(30)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
|
||||
db_mem
|
||||
.export(format!("../data/temp/{}.surql", file_name))
|
||||
.await?;
|
||||
db.import(format!("../data/temp{}.surql", file_name))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum CreateVersion {
|
||||
Single,
|
||||
Bulk,
|
||||
// must create a filter.surql file in the root directory
|
||||
BulkFilter,
|
||||
}
|
||||
impl CreateVersion {
|
||||
pub async fn run(
|
||||
|
@ -177,6 +212,9 @@ impl CreateVersion {
|
|||
CreateVersion::Bulk => create_db_entities_bulk(db, chunk, pb, batch_size)
|
||||
.await
|
||||
.is_ok(),
|
||||
CreateVersion::BulkFilter => create_db_entities_bulk_filter(db, chunk, pb, batch_size)
|
||||
.await
|
||||
.is_ok(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -276,6 +314,12 @@ pub async fn create_db_ws() -> Result<Surreal<Client>, Error> {
|
|||
Ok(db)
|
||||
}
|
||||
|
||||
pub async fn create_db_mem() -> Result<Surreal<Db>, Error> {
|
||||
let db = Surreal::new::<Mem>(()).await?;
|
||||
db.use_ns("wikidata").use_db("wikidata").await?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
pub async fn create_pb() -> ProgressBar {
|
||||
let total_size = 110_000_000;
|
||||
let pb = ProgressBar::new(total_size);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue