From 4074a97ae2acd0a81a1893b8a7646f92d5145a94 Mon Sep 17 00:00:00 2001 From: NexVeridian Date: Wed, 7 Jun 2023 02:23:14 +0000 Subject: [PATCH] 0.2.1 --- src/main.rs | 11 ++- src/util.rs | 251 +++++++++++++++++++++++++++------------------------- 2 files changed, 135 insertions(+), 127 deletions(-) diff --git a/src/main.rs b/src/main.rs index 532ce52..8fff30c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,6 @@ mod util; use util::*; - // #[tokio::main] // async fn main() { // let mut scheduler = AsyncScheduler::new(); @@ -38,13 +37,13 @@ use util::*; // } fn main() { - let dfn = df_format(Ticker::ARKF, read_parquet(Ticker::ARKF).unwrap()).unwrap(); + let dfn = df_format(read_parquet(Ticker::ARKVC).unwrap()).unwrap(); println!("{:#?}", dfn); - // update_parquet(Ticker::ARKF).unwrap(); - let update = df_format(Ticker::ARKF, get_csv(Ticker::ARKF).unwrap()).unwrap(); + // update_parquet(Ticker::ARKVC).unwrap(); + // let update = df_format(Ticker::ARKF, get_csv(Ticker::ARKF).unwrap()).unwrap(); // let update = get_csv(Ticker::ARKF).unwrap().collect().unwrap(); - // let x = read_parquet(Ticker::ARKF).unwrap().collect().unwrap(); - println!("{:#?}", update); + // let x = df_format(read_parquet(Ticker::ARKVC).unwrap()).unwrap(); + // println!("{:#?}", x); } diff --git a/src/util.rs b/src/util.rs index 140d84e..b6476c7 100644 --- a/src/util.rs +++ b/src/util.rs @@ -44,7 +44,7 @@ pub fn merge_csv_to_parquet(folder: Ticker) -> Result<(), Box> { let df = concat(dfs, false, true)?; - write_parquet(folder, df_format(folder, df)?)?; + write_parquet(folder, df_format(df)?)?; Ok(()) } @@ -53,7 +53,12 @@ pub fn update_parquet(ticker: Ticker) -> Result<(), Box> { let mut df = read_parquet(ticker)?; - df = concat(vec![df, update], false, true)?.unique_stable(None, UniqueKeepStrategy::First); + df = concat( + vec![df_format(df)?.lazy(), df_format(update)?.lazy()], + false, + true, + )? + .unique_stable(None, UniqueKeepStrategy::First); write_parquet(ticker, df.collect()?)?; Ok(()) @@ -74,127 +79,131 @@ pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box Result> { - match ticker { - Ticker::ARKVC => { - dfl = dfl.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]); +pub fn df_format(df: LazyFrame) -> Result> { + let mut df = df.collect()?; - let df = dfl - .with_columns(vec![ - col("date").str().strptime(StrpTimeOptions { - date_dtype: DataType::Date, - fmt: Some("%m/%d/%Y".into()), - strict: false, - exact: true, - cache: false, - tz_aware: false, - utc: false, - }), - col("weight") - .str() - .extract(r"[0-9]*\.[0-9]+", 0) - .cast(DataType::Float64), - ]) - .filter(col("date").is_not_null()) - .collect()?; - - Ok(df) - } - _ => { - let mut df = dfl.collect()?; - let col_names = df.get_column_names(); - - if df.rename("market_value_($)", "market_value").is_ok() {} - if df.rename("market value ($)", "market_value").is_ok() {} - if df.rename("weight_(%)", "weight").is_ok() {} - if df.rename("weight (%)", "weight").is_ok() {} - if df.get_column_names().contains(&"fund") { - _ = df.drop_in_place("fund"); - } - if df.get_column_names().contains(&"weight_rank") { - _ = df.drop_in_place("weight_rank"); - } - - let mut expressions: Vec = vec![]; - - if !df.fields().contains(&Field::new("date", DataType::Date)) { - expressions.push(col("date").str().strptime(StrpTimeOptions { - date_dtype: DataType::Date, - fmt: Some("%m/%d/%Y".into()), - strict: false, - exact: true, - cache: false, - tz_aware: false, - utc: false, - })); - } - - if df.fields().contains(&Field::new("weight", DataType::Utf8)) { - expressions.push( - col("weight") - .str() - .replace(lit("%"), lit(""), true) - .cast(DataType::Float64), - ); - } - - if df - .fields() - .contains(&Field::new("market_value", DataType::Utf8)) - { - expressions.push( - col("market_value") - .str() - .replace(lit("$"), lit(""), true) - .str() - .replace_all(lit(","), lit(""), true) - .cast(DataType::Float64) - .cast(DataType::Int64), - ); - } - - if df.fields().contains(&Field::new("shares", DataType::Utf8)) { - expressions.push( - col("shares") - .str() - .replace_all(lit(","), lit(""), true) - .cast(DataType::Int64), - ); - } - - df = df - .lazy() - .with_columns(expressions) - .filter(col("date").is_not_null()) - .collect()?; - - if !df.get_column_names().contains(&"share_price") { - df = df - .lazy() - .with_column( - (col("market_value").cast(DataType::Float64) - / col("shares").cast(DataType::Float64)) - .alias("share_price") - .cast(DataType::Float64) - .round(2), - ) - .collect()?; - } - - df = df.select([ - "date", - "ticker", - "cusip", - "company", - "market_value", - "shares", - "share_price", - "weight", - ])?; - - Ok(df) - } + if df.get_column_names().contains(&"market_value_($)") { + df = df + .lazy() + .rename( + vec!["market_value_($), weight_(%)"], + vec!["market_value, weight"], + ) + .collect()?; } + if df.get_column_names().contains(&"market value ($)") { + df = df + .lazy() + .rename( + vec!["market value ($), weight (%)"], + vec!["market_value, weight"], + ) + .collect()?; + } + if df.get_column_names().contains(&"CUSIP") { + df = df + .lazy() + .rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]) + .collect()?; + } + + // if df.rename("market_value_($)", "market_value").is_ok() {} + // if df.rename("market value ($)", "market_value").is_ok() {} + // if df.rename("weight_(%)", "weight").is_ok() {} + // if df.rename("weight (%)", "weight").is_ok() {} + // if df.rename("CUSIP", "cusip").is_ok() {} + + if df.get_column_names().contains(&"fund") { + _ = df.drop_in_place("fund"); + } + if df.get_column_names().contains(&"weight_rank") { + _ = df.drop_in_place("weight_rank"); + } + + let mut expressions: Vec = vec![]; + + if !df.fields().contains(&Field::new("date", DataType::Date)) { + expressions.push(col("date").str().strptime(StrpTimeOptions { + date_dtype: DataType::Date, + fmt: Some("%m/%d/%Y".into()), + strict: false, + exact: true, + cache: false, + tz_aware: false, + utc: false, + })); + } + + if df.fields().contains(&Field::new("weight", DataType::Utf8)) { + expressions.push( + col("weight") + .str() + .replace(lit("%"), lit(""), true) + .cast(DataType::Float64), + ); + } + + if df + .fields() + .contains(&Field::new("market_value", DataType::Utf8)) + { + expressions.push( + col("market_value") + .str() + .replace(lit("$"), lit(""), true) + .str() + .replace_all(lit(","), lit(""), true) + .cast(DataType::Float64) + .cast(DataType::Int64), + ); + } + + if df.fields().contains(&Field::new("shares", DataType::Utf8)) { + expressions.push( + col("shares") + .str() + .replace_all(lit(","), lit(""), true) + .cast(DataType::Int64), + ); + } + + df = df + .lazy() + .with_columns(expressions) + .filter(col("date").is_not_null()) + .collect()?; + + if !df.get_column_names().contains(&"share_price") + && df.get_column_names().contains(&"market_value") + { + df = df + .lazy() + .with_column( + (col("market_value").cast(DataType::Float64) + / col("shares").cast(DataType::Float64)) + .alias("share_price") + .cast(DataType::Float64) + .round(2), + ) + .collect()? + } + + if df.get_column_names().contains(&"share_price") { + df = df.select([ + "date", + "ticker", + "cusip", + "company", + "market_value", + "shares", + "share_price", + "weight", + ])?; + } else { + df = df.select(["date", "ticker", "cusip", "company", "weight"])?; + } + + Ok(df) } pub fn get_csv(ticker: Ticker) -> Result> {