diff --git a/src/util.rs b/src/util.rs index 8ec0d2b..cc48488 100644 --- a/src/util.rs +++ b/src/util.rs @@ -21,7 +21,6 @@ pub enum Ticker { ARKW, ARKX, } - impl Ticker { pub fn value(&self) -> &str { match *self { @@ -36,234 +35,289 @@ impl Ticker { } } -pub fn merge_csv_to_parquet(ticker: Ticker) -> Result<(), Box> { - let mut dfs = vec![]; - for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { - dfs.push(LazyCsvReader::new(x).finish()?); +pub enum DF { + LazyFrame(LazyFrame), + DataFrame(DataFrame), +} +impl From for DF { + fn from(lf: LazyFrame) -> Self { + DF::LazyFrame(lf) } - let mut df = concat(dfs, false, true)?; - - if read_parquet(ticker).is_ok() { - let df_old = read_parquet(ticker)?; - df = concat_df(vec![df_old, df])?; - write_parquet(ticker, df_sort(df.collect()?)?)?; - } else { - write_parquet(ticker, df_format(df)?)?; +} +impl From for DF { + fn from(df: DataFrame) -> Self { + DF::DataFrame(df) + } +} +impl DF { + pub fn collect(self) -> Result> { + match self { + DF::LazyFrame(x) => Ok(x.collect()?), + DF::DataFrame(x) => Ok(x), + } + } + pub fn lazy(self) -> LazyFrame { + match self { + DF::LazyFrame(x) => x, + DF::DataFrame(x) => x.lazy(), + } } - - Ok(()) } pub enum Source { + Read, Ark, ApiIncremental, ApiFull, } +struct Ark { + df: DF, + ticker: Ticker, +} +impl Ark { + pub fn new(source: Source, ticker: Ticker) -> Result> { + let mut ark = Self { + df: Self::read_parquet(ticker)?.into(), + ticker, + }; -pub fn update_parquet(ticker: Ticker, source: Source) -> Result<(), Box> { - let mut df = read_parquet(ticker)?; + let update = match source { + Source::Read => None, + Source::Ark => Some(ark.get_csv_ark()?), + Source::ApiIncremental => { + let last_day = ark + .df + .collect()? + .column("date") + .unwrap() + .max() + .and_then(NaiveDate::from_num_days_from_ce_opt); + Some(ark.get_api(last_day)?) + } + Source::ApiFull => Some(ark.get_api(None)?), + }; - let update = match source { - Source::Ark => get_csv_ark(ticker)?, - Source::ApiIncremental => { - let last_day = df - .clone() - .collect()? - .column("date") - .unwrap() - .max() - .and_then(NaiveDate::from_num_days_from_ce_opt); - get_api(ticker, last_day)? + if let Some(update) = update { + ark.df = Self::concat_df(vec![ark.df, update.into()])?.into(); } - Source::ApiFull => get_api(ticker, None)?, - }; - - df = concat_df(vec![df, update])?; - write_parquet(ticker, df_sort(df.collect()?)?)?; - Ok(()) -} - -fn concat_df(mut dfs: Vec) -> Result> { - // with dedupe and format - for x in &mut dfs { - *x = df_format(x.to_owned())?.lazy(); - } - let mut df = concat(dfs, false, true)?; - df = df.unique_stable(None, UniqueKeepStrategy::First); - Ok(df) -} - -pub fn read_parquet(ticker: Ticker) -> Result> { - let df = LazyFrame::scan_parquet( - format!("data/parquet/{}.parquet", ticker), - ScanArgsParquet::default(), - )?; - Ok(df) -} - -pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box> { - ParquetWriter::new(File::create(format!("data/parquet/{}.parquet", ticker))?) - .finish(&mut df)?; - Ok(()) -} - -pub fn df_sort(df: DataFrame) -> Result> { - let df = df.sort(["date", "weight"], vec![false, true])?; - Ok(df) -} - -pub fn df_format(df: LazyFrame) -> Result> { - let mut df = df.collect()?; - - if df.get_column_names().contains(&"market_value_($)") { - df = df - .lazy() - .rename( - vec!["market_value_($)", "weight_(%)"], - vec!["market_value", "weight"], - ) - .collect()?; - } - if df.get_column_names().contains(&"market value ($)") { - df = df - .lazy() - .rename( - vec!["market value ($)", "weight (%)"], - vec!["market_value", "weight"], - ) - .collect()?; - } - if df.get_column_names().contains(&"CUSIP") { - df = df - .lazy() - .rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]) - .collect()?; + Ok(ark) } - // if df.rename("market_value_($)", "market_value").is_ok() {} - // if df.rename("market value ($)", "market_value").is_ok() {} - // if df.rename("weight_(%)", "weight").is_ok() {} - // if df.rename("weight (%)", "weight").is_ok() {} - // if df.rename("CUSIP", "cusip").is_ok() {} - - if df.get_column_names().contains(&"fund") { - _ = df.drop_in_place("fund"); - } - if df.get_column_names().contains(&"weight_rank") { - _ = df.drop_in_place("weight_rank"); + pub fn write_parquet(mut self) -> Result> { + ParquetWriter::new(File::create(format!( + "data/parquet/{}.parquet", + self.ticker + ))?) + .finish(&mut self.df.collect()?)?; + Ok(self) } - let mut expressions: Vec = vec![]; - - if !df.fields().contains(&Field::new("date", DataType::Date)) { - expressions.push(col("date").str().strptime( - DataType::Date, - StrptimeOptions { - // format: Some("%m/%d/%Y".into()), - format: None, - strict: false, - exact: true, - cache: true, - }, - )); - } - - if df.fields().contains(&Field::new("weight", DataType::Utf8)) { - expressions.push( - col("weight") - .str() - .replace(lit("%"), lit(""), true) - .cast(DataType::Float64), - ); - } - - if df - .fields() - .contains(&Field::new("market_value", DataType::Utf8)) - { - expressions.push( - col("market_value") - .str() - .replace(lit("$"), lit(""), true) - .str() - .replace_all(lit(","), lit(""), true) - .cast(DataType::Float64) - .cast(DataType::Int64), - ); - } - - if df.fields().contains(&Field::new("shares", DataType::Utf8)) { - expressions.push( - col("shares") - .str() - .replace_all(lit(","), lit(""), true) - .cast(DataType::Int64), - ); - } - - df = df - .lazy() - .with_columns(expressions) - .filter(col("date").is_not_null()) - .collect()?; - - if !df.get_column_names().contains(&"share_price") - && df.get_column_names().contains(&"market_value") - { - df = df - .lazy() - .with_column( - (col("market_value").cast(DataType::Float64) - / col("shares").cast(DataType::Float64)) - .alias("share_price") - .cast(DataType::Float64) - .round(2), - ) + pub fn df_sort(&self) -> Result<&Self, Box> { + let df = self + .df .collect()? + .sort(["date", "weight"], vec![false, true])?; + self.df = df.into(); + Ok(self) } - if df.get_column_names().contains(&"share_price") { - df = df.select([ - "date", - "ticker", - "cusip", - "company", - "market_value", - "shares", - "share_price", - "weight", - ])?; - } else { - df = df.select(["date", "ticker", "cusip", "company", "weight"])?; + fn read_parquet(ticker: Ticker) -> Result> { + let df = LazyFrame::scan_parquet( + format!("data/parquet/{}.parquet", ticker), + ScanArgsParquet::default(), + )?; + Ok(df) } - Ok(df) -} + fn concat_df(mut dfs: Vec) -> Result> { + // with dedupe and format + // for x in &mut dfs { + // *x = Self::df_format(x.to_owned())?.lazy(); + // } + let mut df = concat(dfs, false, true)?; + df = df.unique_stable(None, UniqueKeepStrategy::First); + Ok(df.into()) + } -pub fn get_api(ticker: Ticker, last_day: Option) -> Result> { - let url = match (ticker, last_day) { - (Ticker::ARKVC, Some(last_day)) => format!( - "https://api.nexveridian.com/arkvc_holdings?end={}", - last_day - ), - (ticker, Some(last_day)) => format!( - "https://api.nexveridian.com/ark_holdings?ticker={}&end={}", - ticker, last_day - ), - (Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(), - (ticker, None) => { - format!("https://api.nexveridian.com/ark_holdings?ticker={}", ticker) + fn df_format(df: LazyFrame) -> Result> { + let mut df = df.collect()?; + + if df.get_column_names().contains(&"market_value_($)") { + df = df + .lazy() + .rename( + vec!["market_value_($)", "weight_(%)"], + vec!["market_value", "weight"], + ) + .collect()?; + } + if df.get_column_names().contains(&"market value ($)") { + df = df + .lazy() + .rename( + vec!["market value ($)", "weight (%)"], + vec!["market_value", "weight"], + ) + .collect()?; + } + if df.get_column_names().contains(&"CUSIP") { + df = df + .lazy() + .rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]) + .collect()?; } - }; - Reader::Json.get_data_url(url) -} -pub fn get_csv_ark(ticker: Ticker) -> Result> { - let url = match ticker { - Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), - _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", ticker.value(), ticker), - }; - Reader::Csv.get_data_url(url) + // if df.rename("market_value_($)", "market_value").is_ok() {} + // if df.rename("market value ($)", "market_value").is_ok() {} + // if df.rename("weight_(%)", "weight").is_ok() {} + // if df.rename("weight (%)", "weight").is_ok() {} + // if df.rename("CUSIP", "cusip").is_ok() {} + + if df.get_column_names().contains(&"fund") { + _ = df.drop_in_place("fund"); + } + if df.get_column_names().contains(&"weight_rank") { + _ = df.drop_in_place("weight_rank"); + } + + let mut expressions: Vec = vec![]; + + if !df.fields().contains(&Field::new("date", DataType::Date)) { + expressions.push(col("date").str().strptime( + DataType::Date, + StrptimeOptions { + // format: Some("%m/%d/%Y".into()), + format: None, + strict: false, + exact: true, + cache: true, + }, + )); + } + + if df.fields().contains(&Field::new("weight", DataType::Utf8)) { + expressions.push( + col("weight") + .str() + .replace(lit("%"), lit(""), true) + .cast(DataType::Float64), + ); + } + + if df + .fields() + .contains(&Field::new("market_value", DataType::Utf8)) + { + expressions.push( + col("market_value") + .str() + .replace(lit("$"), lit(""), true) + .str() + .replace_all(lit(","), lit(""), true) + .cast(DataType::Float64) + .cast(DataType::Int64), + ); + } + + if df.fields().contains(&Field::new("shares", DataType::Utf8)) { + expressions.push( + col("shares") + .str() + .replace_all(lit(","), lit(""), true) + .cast(DataType::Int64), + ); + } + + df = df + .lazy() + .with_columns(expressions) + .filter(col("date").is_not_null()) + .collect()?; + + if !df.get_column_names().contains(&"share_price") + && df.get_column_names().contains(&"market_value") + { + df = df + .lazy() + .with_column( + (col("market_value").cast(DataType::Float64) + / col("shares").cast(DataType::Float64)) + .alias("share_price") + .cast(DataType::Float64) + .round(2), + ) + .collect()? + } + + if df.get_column_names().contains(&"share_price") { + df = df.select([ + "date", + "ticker", + "cusip", + "company", + "market_value", + "shares", + "share_price", + "weight", + ])?; + } else if !df + .get_column_names() + .eq(&["date", "ticker", "cusip", "company", "weight"]) + { + df = df.select(["date", "ticker", "cusip", "company", "weight"])?; + } + + Ok(df) + } + + pub fn get_api(self, last_day: Option) -> Result> { + let tic = self.ticker; + let url = match (tic, last_day) { + (self::Ticker::ARKVC, Some(last_day)) => format!( + "https://api.nexveridian.com/arkvc_holdings?end={}", + last_day + ), + (tic, Some(last_day)) => format!( + "https://api.nexveridian.com/ark_holdings?ticker={}&end={}", + tic.value(), + last_day + ), + (self::Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(), + (tic, None) => { + format!( + "https://api.nexveridian.com/ark_holdings?ticker={}", + tic.value() + ) + } + }; + Reader::Json.get_data_url(url) + } + + pub fn get_csv_ark(self) -> Result> { + let url = match self.ticker { + self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), + _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker), + }; + Reader::Csv.get_data_url(url) + } + + pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<(), Box> { + let mut dfs = vec![]; + for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { + dfs.push(LazyCsvReader::new(x).finish()?); + } + let mut df = concat(dfs, false, true)?; + + if Self::read_parquet(ticker).is_ok() { + let df_old = Self::read_parquet(ticker)?; + df = Self::concat_df(vec![df_old, df])?; + write_parquet(ticker, df_sort(df.collect()?)?)?; + } else { + write_parquet(ticker, df_format(df)?)?; + } + + Ok(()) + } } pub enum Reader {