diff --git a/src/util.rs b/src/util.rs index cc48488..da4f8d6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -63,6 +63,18 @@ impl DF { } } } +trait DFS { + fn lazy(self) -> Vec; + fn collect(self) -> Vec; +} +impl DFS for Vec { + fn lazy(self) -> Vec { + self.into_iter().map(|df| df.lazy()).collect() + } + fn collect(self) -> Vec { + self.into_iter().map(|df| df.collect().unwrap()).collect() + } +} pub enum Source { Read, @@ -77,7 +89,7 @@ struct Ark { impl Ark { pub fn new(source: Source, ticker: Ticker) -> Result> { let mut ark = Self { - df: Self::read_parquet(ticker)?.into(), + df: Self::read_parquet(ticker)?, ticker, }; @@ -98,12 +110,18 @@ impl Ark { }; if let Some(update) = update { - ark.df = Self::concat_df(vec![ark.df, update.into()])?.into(); + ark.df = Self::concat_df(vec![ + Self::df_format(ark.df)?, + Self::df_format(update.into())?, + ])? + .into(); } Ok(ark) } - pub fn write_parquet(mut self) -> Result> { + pub fn write_parquet(&self) -> Result<&Self, Box> { + // with format + self = self.format()?; ParquetWriter::new(File::create(format!( "data/parquet/{}.parquet", self.ticker @@ -112,34 +130,44 @@ impl Ark { Ok(self) } - pub fn df_sort(&self) -> Result<&Self, Box> { - let df = self - .df - .collect()? - .sort(["date", "weight"], vec![false, true])?; - self.df = df.into(); + fn sort(&self) -> Result<&Self, Box> { + self.df = Self::df_sort(self.df)?; Ok(self) } - fn read_parquet(ticker: Ticker) -> Result> { + fn df_sort(df: DF) -> Result> { + let df = df.collect()?.sort(["date", "weight"], vec![false, true])?; + Ok(df.into()) + } + + fn read_parquet(ticker: Ticker) -> Result> { let df = LazyFrame::scan_parquet( format!("data/parquet/{}.parquet", ticker), ScanArgsParquet::default(), )?; - Ok(df) - } - - fn concat_df(mut dfs: Vec) -> Result> { - // with dedupe and format - // for x in &mut dfs { - // *x = Self::df_format(x.to_owned())?.lazy(); - // } - let mut df = concat(dfs, false, true)?; - df = df.unique_stable(None, UniqueKeepStrategy::First); Ok(df.into()) } - fn df_format(df: LazyFrame) -> Result> { + fn concat_df(mut dfs: Vec) -> Result> { + // with dedupe + let mut df = concat(dfs.lazy(), false, true)?; + Ok(Self::dedupe(df.into())?) + } + + pub fn dedupe(mut df: DF) -> Result> { + df = df + .lazy() + .unique_stable(None, UniqueKeepStrategy::First) + .into(); + Ok(df) + } + + pub fn format(&self) -> Result<&Self, Box> { + self.df = Self::df_format(self.df)?.into(); + Ok(self) + } + + fn df_format(df: DF) -> Result> { let mut df = df.collect()?; if df.get_column_names().contains(&"market_value_($)") { @@ -267,11 +295,11 @@ impl Ark { df = df.select(["date", "ticker", "cusip", "company", "weight"])?; } - Ok(df) + Ok(df.into()) } pub fn get_api(self, last_day: Option) -> Result> { - let tic = self.ticker; + let tic: Ticker = self.ticker; let url = match (tic, last_day) { (self::Ticker::ARKVC, Some(last_day)) => format!( "https://api.nexveridian.com/arkvc_holdings?end={}", @@ -301,22 +329,22 @@ impl Ark { Reader::Csv.get_data_url(url) } - pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<(), Box> { + pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<&'static Self, Box> { let mut dfs = vec![]; for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { dfs.push(LazyCsvReader::new(x).finish()?); } - let mut df = concat(dfs, false, true)?; + let mut df = concat(dfs, false, true)?.into(); - if Self::read_parquet(ticker).is_ok() { + let ark = if Self::read_parquet(ticker).is_ok() { let df_old = Self::read_parquet(ticker)?; - df = Self::concat_df(vec![df_old, df])?; - write_parquet(ticker, df_sort(df.collect()?)?)?; + df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?; + Self { df, ticker }.sort()?.write_parquet()? } else { - write_parquet(ticker, df_format(df)?)?; - } + Self { df, ticker }.sort()?.write_parquet()? + }; - Ok(()) + Ok(ark) } }