This commit is contained in:
Elijah McMorris 2023-12-17 18:12:56 -08:00
parent 2edaeef042
commit e37d413372
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
14 changed files with 525 additions and 250 deletions

2
src/lib.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod utils;
pub use utils::*;

View file

@ -1,24 +1,8 @@
use anyhow::{Error, Ok, Result};
use bzip2::read::MultiBzDecoder;
use futures::future::join_all;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use lazy_static::lazy_static;
use serde_json::{from_str, Value};
use std::{
env,
fmt::Write,
fs::File,
io::{BufRead, BufReader},
thread,
time::Duration,
};
use surrealdb::{
engine::remote::ws::{Client, Ws},
opt::auth::Root,
Surreal,
};
use wikidata::Entity;
use std::{env, fmt::Write, io::BufRead, thread, time::Duration};
use surrealdb::{engine::remote::ws::Ws, opt::auth::Root, Surreal};
mod utils;
use utils::*;
@ -32,71 +16,6 @@ lazy_static! {
static ref THREADED_REQUESTS: bool = env::var("THREADED_REQUESTS").expect("THREADED_REQUESTS not set").parse().expect("Failed to parse THREADED_REQUESTS");
}
#[allow(non_camel_case_types)]
enum File_Format {
json,
bz2,
}
impl File_Format {
fn new(file: &str) -> Self {
match file {
"json" => Self::json,
"bz2" => Self::bz2,
_ => panic!("Unknown file format"),
}
}
fn reader(self, file: &str) -> Result<Box<dyn BufRead>, Error> {
let file = File::open(file)?;
match self {
File_Format::json => Ok(Box::new(BufReader::new(file))),
File_Format::bz2 => Ok(Box::new(BufReader::new(MultiBzDecoder::new(file)))),
}
}
}
async fn create_db_entity(db: &Surreal<Client>, line: String) -> Result<(), Error> {
let line = line.trim().trim_end_matches(',').to_string();
if line == "[" || line == "]" {
return Ok(());
}
let json: Value = from_str(&line)?;
let data = Entity::from_json(json).expect("Failed to parse JSON");
let (mut claims, mut data) = EntityMini::from_entity(data);
let id = data.id.clone().expect("No ID");
data.id = None;
let _ = db.create::<Option<EntityMini>>(&id).await.is_err();
{
db.update::<Option<EntityMini>>(&id).content(data).await?;
};
let id = claims.id.clone().expect("No ID");
claims.id = None;
let _ = db.create::<Option<Claims>>(&id).await.is_err();
{
db.update::<Option<Claims>>(&id).content(claims).await?;
}
Ok(())
}
async fn create_db_entities(
db: &Surreal<Client>,
lines: Vec<String>,
pb: ProgressBar,
) -> Result<(), Error> {
let mut counter = 0;
for line in lines {
create_db_entity(db, line.to_string()).await?;
counter += 1;
if counter % 100 == 0 {
pb.inc(100);
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
thread::sleep(Duration::from_secs(10));
@ -136,35 +55,7 @@ async fn main() -> Result<(), Error> {
}
}
} else {
let mut futures = Vec::new();
let mut chunk = Vec::new();
let mut chunk_counter = 0;
const BATCH_SIZE: usize = 1000;
const BATCH_NUM: usize = 100;
for line in reader.lines() {
chunk.push(line.unwrap());
if chunk.len() >= BATCH_SIZE {
let db = db.clone();
let lines = chunk.clone();
let pb = pb.clone();
futures.push(tokio::spawn(async move {
create_db_entities(&db, lines, pb).await.unwrap();
}));
chunk_counter += 1;
chunk.clear();
}
if chunk_counter >= BATCH_NUM {
join_all(futures).await;
futures = Vec::new();
chunk_counter = 0;
}
}
join_all(futures).await;
create_db_entities_threaded(&db, reader, Some(pb.clone()), 1000, 100).await?;
}
pb.finish();

View file

@ -1,138 +1,119 @@
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::env;
use surrealdb::sql::Thing;
use wikidata::{ClaimValue, ClaimValueData, Entity, Lang, Pid, WikiId};
use anyhow::{Error, Ok, Result};
use bzip2::read::MultiBzDecoder;
use futures::future::join_all;
use indicatif::ProgressBar;
use serde_json::{from_str, Value};
use std::{
fs::File,
io::{BufRead, BufReader},
};
use surrealdb::{Connection, Surreal};
use wikidata::Entity;
lazy_static! {
static ref WIKIDATA_LANG: String = env::var("WIKIDATA_LANG")
.expect("WIKIDATA_LANG not set")
.to_string();
mod tables;
use tables::*;
#[allow(non_camel_case_types)]
pub enum File_Format {
json,
bz2,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ClaimData {
Thing(Thing),
ClaimValueData(ClaimValueData),
}
impl ClaimData {
fn from_cvd(cvd: ClaimValueData) -> Self {
match cvd {
ClaimValueData::Item(qid) => ClaimData::Thing(Thing {
id: qid.0.into(),
tb: "Entity".to_string(),
}),
ClaimValueData::Property(pid) => ClaimData::Thing(Thing {
id: pid.0.into(),
tb: "Property".to_string(),
}),
ClaimValueData::Lexeme(lid) => ClaimData::Thing(Thing {
id: lid.0.into(),
tb: "Lexeme".to_string(),
}),
_ => ClaimData::ClaimValueData(cvd),
impl File_Format {
pub fn new(file: &str) -> Self {
match file {
"json" => Self::json,
"bz2" => Self::bz2,
_ => panic!("Unknown file format"),
}
}
pub fn reader(self, file: &str) -> Result<Box<dyn BufRead>, Error> {
let file = File::open(file)?;
match self {
File_Format::json => Ok(Box::new(BufReader::new(file))),
File_Format::bz2 => Ok(Box::new(BufReader::new(MultiBzDecoder::new(file)))),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Claims {
// Table: Claims
pub id: Option<Thing>,
pub claims: Vec<Claim>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Claim {
// Table: Claim
pub id: Thing,
pub value: ClaimData,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EntityMini {
// Table: Entity, Property, Lexeme
pub id: Option<Thing>,
pub label: String,
pub claims: Thing,
pub description: String,
}
impl EntityMini {
pub fn from_entity(entity: Entity) -> (Claims, Self) {
let thing_claim = Thing {
id: get_id_entity(&entity).id,
tb: "Claims".to_string(),
};
(
Claims {
id: Some(thing_claim.clone()),
..Self::flatten_claims(entity.claims.clone())
},
Self {
id: Some(get_id_entity(&entity)),
label: get_name(&entity),
claims: thing_claim,
description: get_description(&entity),
},
)
pub async fn create_db_entity<C: Connection>(db: &Surreal<C>, line: String) -> Result<(), Error> {
let line = line.trim().trim_end_matches(',').to_string();
if line == "[" || line == "]" {
return Ok(());
}
fn flatten_claims(claims: Vec<(Pid, ClaimValue)>) -> Claims {
Claims {
id: None,
claims: claims
.iter()
.flat_map(|(pid, claim_value)| {
let mut flattened = vec![Claim {
id: Thing {
id: pid.0.into(),
tb: "Property".to_string(),
},
value: ClaimData::from_cvd(claim_value.data.clone()),
}];
let json: Value = from_str(&line)?;
let data = Entity::from_json(json).expect("Failed to parse JSON");
flattened.extend(claim_value.qualifiers.iter().map(
|(qualifier_pid, qualifier_value)| Claim {
id: Thing {
id: qualifier_pid.0.into(),
tb: "Property".to_string(),
},
value: ClaimData::from_cvd(qualifier_value.clone()),
},
));
flattened
})
.collect(),
}
}
}
let (mut claims, mut data) = EntityMini::from_entity(data);
fn get_id_entity(entity: &Entity) -> Thing {
let (id, tb) = match entity.id {
WikiId::EntityId(qid) => (qid.0, "Entity".to_string()),
WikiId::PropertyId(pid) => (pid.0, "Property".to_string()),
WikiId::LexemeId(lid) => (lid.0, "Lexeme".to_string()),
_ => todo!("Not implemented"),
let id = data.id.clone().expect("No ID");
data.id = None;
let _ = db.create::<Option<EntityMini>>(&id).await.is_err();
{
db.update::<Option<EntityMini>>(&id).content(data).await?;
};
Thing { id: id.into(), tb }
let id = claims.id.clone().expect("No ID");
claims.id = None;
let _ = db.create::<Option<Claims>>(&id).await.is_err();
{
db.update::<Option<Claims>>(&id).content(claims).await?;
}
Ok(())
}
fn get_name(entity: &Entity) -> String {
entity
.labels
.get(&Lang(WIKIDATA_LANG.to_string()))
.map(|label| label.to_string())
.unwrap_or_default()
pub async fn create_db_entities<C: Connection>(
db: &Surreal<C>,
lines: Vec<String>,
pb: Option<ProgressBar>,
) -> Result<(), Error> {
let mut counter = 0;
for line in lines {
create_db_entity(db, line.to_string()).await?;
counter += 1;
if counter % 100 == 0 {
if let Some(ref p) = pb {
p.inc(100)
}
}
}
Ok(())
}
fn get_description(entity: &Entity) -> String {
entity
.descriptions
.get(&Lang(WIKIDATA_LANG.to_string()))
.cloned()
.unwrap_or_default()
pub async fn create_db_entities_threaded<C: Connection>(
db: &Surreal<C>,
reader: Box<dyn BufRead>,
pb: Option<ProgressBar>,
batch_size: usize,
batch_num: usize,
) -> Result<(), Error> {
let mut futures = Vec::new();
let mut chunk = Vec::new();
let mut chunk_counter = 0;
for line in reader.lines() {
chunk.push(line.unwrap());
if chunk.len() >= batch_size {
let db = db.clone();
let lines = chunk.clone();
let pb = pb.clone();
futures.push(tokio::spawn(async move {
create_db_entities(&db, lines, pb).await.unwrap();
}));
chunk_counter += 1;
chunk.clear();
}
if chunk_counter >= batch_num {
join_all(futures).await;
futures = Vec::new();
chunk_counter = 0;
}
}
create_db_entities(db, chunk, pb).await.unwrap();
join_all(futures).await;
Ok(())
}

137
src/utils/tables.rs Normal file
View file

@ -0,0 +1,137 @@
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::env;
use surrealdb::sql::Thing;
use wikidata::{ClaimValue, ClaimValueData, Entity, Lang, Pid, WikiId};
lazy_static! {
static ref WIKIDATA_LANG: String = env::var("WIKIDATA_LANG")
.expect("WIKIDATA_LANG not set")
.to_string();
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ClaimData {
Thing(Thing),
ClaimValueData(ClaimValueData),
}
impl ClaimData {
fn from_cvd(cvd: ClaimValueData) -> Self {
match cvd {
ClaimValueData::Item(qid) => ClaimData::Thing(Thing {
id: qid.0.into(),
tb: "Entity".to_string(),
}),
ClaimValueData::Property(pid) => ClaimData::Thing(Thing {
id: pid.0.into(),
tb: "Property".to_string(),
}),
ClaimValueData::Lexeme(lid) => ClaimData::Thing(Thing {
id: lid.0.into(),
tb: "Lexeme".to_string(),
}),
_ => ClaimData::ClaimValueData(cvd),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Claims {
pub id: Option<Thing>,
pub claims: Vec<Claim>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Claim {
pub id: Thing,
pub value: ClaimData,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EntityMini {
// Table: Entity, Property, Lexeme
pub id: Option<Thing>,
pub label: String,
// Claims Table
pub claims: Thing,
pub description: String,
}
impl EntityMini {
pub fn from_entity(entity: Entity) -> (Claims, Self) {
let thing_claim = Thing {
id: get_id_entity(&entity).id,
tb: "Claims".to_string(),
};
(
Claims {
id: Some(thing_claim.clone()),
..Self::flatten_claims(entity.claims.clone())
},
Self {
id: Some(get_id_entity(&entity)),
label: get_name(&entity),
claims: thing_claim,
description: get_description(&entity),
},
)
}
fn flatten_claims(claims: Vec<(Pid, ClaimValue)>) -> Claims {
Claims {
id: None,
claims: claims
.iter()
.flat_map(|(pid, claim_value)| {
let mut flattened = vec![Claim {
id: Thing {
id: pid.0.into(),
tb: "Property".to_string(),
},
value: ClaimData::from_cvd(claim_value.data.clone()),
}];
flattened.extend(claim_value.qualifiers.iter().map(
|(qualifier_pid, qualifier_value)| Claim {
id: Thing {
id: qualifier_pid.0.into(),
tb: "Property".to_string(),
},
value: ClaimData::from_cvd(qualifier_value.clone()),
},
));
flattened
})
.collect(),
}
}
}
fn get_id_entity(entity: &Entity) -> Thing {
let (id, tb) = match entity.id {
WikiId::EntityId(qid) => (qid.0, "Entity".to_string()),
WikiId::PropertyId(pid) => (pid.0, "Property".to_string()),
WikiId::LexemeId(lid) => (lid.0, "Lexeme".to_string()),
_ => todo!("Not implemented"),
};
Thing { id: id.into(), tb }
}
fn get_name(entity: &Entity) -> String {
entity
.labels
.get(&Lang(WIKIDATA_LANG.to_string()))
.map(|label| label.to_string())
.unwrap_or_default()
}
fn get_description(entity: &Entity) -> String {
entity
.descriptions
.get(&Lang(WIKIDATA_LANG.to_string()))
.cloned()
.unwrap_or_default()
}