This commit is contained in:
Elijah McMorris 2023-06-14 08:05:42 +00:00
parent 6e4c536f68
commit 712ba4058a
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8

View file

@ -63,6 +63,18 @@ impl DF {
} }
} }
} }
trait DFS {
fn lazy(self) -> Vec<LazyFrame>;
fn collect(self) -> Vec<DataFrame>;
}
impl DFS for Vec<DF> {
fn lazy(self) -> Vec<LazyFrame> {
self.into_iter().map(|df| df.lazy()).collect()
}
fn collect(self) -> Vec<DataFrame> {
self.into_iter().map(|df| df.collect().unwrap()).collect()
}
}
pub enum Source { pub enum Source {
Read, Read,
@ -77,7 +89,7 @@ struct Ark {
impl Ark { impl Ark {
pub fn new(source: Source, ticker: Ticker) -> Result<Self, Box<dyn Error>> { pub fn new(source: Source, ticker: Ticker) -> Result<Self, Box<dyn Error>> {
let mut ark = Self { let mut ark = Self {
df: Self::read_parquet(ticker)?.into(), df: Self::read_parquet(ticker)?,
ticker, ticker,
}; };
@ -98,12 +110,18 @@ impl Ark {
}; };
if let Some(update) = update { if let Some(update) = update {
ark.df = Self::concat_df(vec![ark.df, update.into()])?.into(); ark.df = Self::concat_df(vec![
Self::df_format(ark.df)?,
Self::df_format(update.into())?,
])?
.into();
} }
Ok(ark) Ok(ark)
} }
pub fn write_parquet(mut self) -> Result<Self, Box<dyn Error>> { pub fn write_parquet(&self) -> Result<&Self, Box<dyn Error>> {
// with format
self = self.format()?;
ParquetWriter::new(File::create(format!( ParquetWriter::new(File::create(format!(
"data/parquet/{}.parquet", "data/parquet/{}.parquet",
self.ticker self.ticker
@ -112,34 +130,44 @@ impl Ark {
Ok(self) Ok(self)
} }
pub fn df_sort(&self) -> Result<&Self, Box<dyn Error>> { fn sort(&self) -> Result<&Self, Box<dyn Error>> {
let df = self self.df = Self::df_sort(self.df)?;
.df
.collect()?
.sort(["date", "weight"], vec![false, true])?;
self.df = df.into();
Ok(self) Ok(self)
} }
fn read_parquet(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> { fn df_sort(df: DF) -> Result<DF, Box<dyn Error>> {
let df = df.collect()?.sort(["date", "weight"], vec![false, true])?;
Ok(df.into())
}
fn read_parquet(ticker: Ticker) -> Result<DF, Box<dyn Error>> {
let df = LazyFrame::scan_parquet( let df = LazyFrame::scan_parquet(
format!("data/parquet/{}.parquet", ticker), format!("data/parquet/{}.parquet", ticker),
ScanArgsParquet::default(), ScanArgsParquet::default(),
)?; )?;
Ok(df)
}
fn concat_df(mut dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> {
// with dedupe and format
// for x in &mut dfs {
// *x = Self::df_format(x.to_owned())?.lazy();
// }
let mut df = concat(dfs, false, true)?;
df = df.unique_stable(None, UniqueKeepStrategy::First);
Ok(df.into()) Ok(df.into())
} }
fn df_format(df: LazyFrame) -> Result<DataFrame, Box<dyn Error>> { fn concat_df(mut dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> {
// with dedupe
let mut df = concat(dfs.lazy(), false, true)?;
Ok(Self::dedupe(df.into())?)
}
pub fn dedupe(mut df: DF) -> Result<DF, Box<dyn Error>> {
df = df
.lazy()
.unique_stable(None, UniqueKeepStrategy::First)
.into();
Ok(df)
}
pub fn format(&self) -> Result<&Self, Box<dyn Error>> {
self.df = Self::df_format(self.df)?.into();
Ok(self)
}
fn df_format(df: DF) -> Result<DF, Box<dyn Error>> {
let mut df = df.collect()?; let mut df = df.collect()?;
if df.get_column_names().contains(&"market_value_($)") { if df.get_column_names().contains(&"market_value_($)") {
@ -267,11 +295,11 @@ impl Ark {
df = df.select(["date", "ticker", "cusip", "company", "weight"])?; df = df.select(["date", "ticker", "cusip", "company", "weight"])?;
} }
Ok(df) Ok(df.into())
} }
pub fn get_api(self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> { pub fn get_api(self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> {
let tic = self.ticker; let tic: Ticker = self.ticker;
let url = match (tic, last_day) { let url = match (tic, last_day) {
(self::Ticker::ARKVC, Some(last_day)) => format!( (self::Ticker::ARKVC, Some(last_day)) => format!(
"https://api.nexveridian.com/arkvc_holdings?end={}", "https://api.nexveridian.com/arkvc_holdings?end={}",
@ -301,22 +329,22 @@ impl Ark {
Reader::Csv.get_data_url(url) Reader::Csv.get_data_url(url)
} }
pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<(), Box<dyn Error>> { pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<&'static Self, Box<dyn Error>> {
let mut dfs = vec![]; let mut dfs = vec![];
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?); dfs.push(LazyCsvReader::new(x).finish()?);
} }
let mut df = concat(dfs, false, true)?; let mut df = concat(dfs, false, true)?.into();
if Self::read_parquet(ticker).is_ok() { let ark = if Self::read_parquet(ticker).is_ok() {
let df_old = Self::read_parquet(ticker)?; let df_old = Self::read_parquet(ticker)?;
df = Self::concat_df(vec![df_old, df])?; df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?;
write_parquet(ticker, df_sort(df.collect()?)?)?; Self { df, ticker }.sort()?.write_parquet()?
} else { } else {
write_parquet(ticker, df_format(df)?)?; Self { df, ticker }.sort()?.write_parquet()?
} };
Ok(()) Ok(ark)
} }
} }