This commit is contained in:
Elijah McMorris 2023-06-14 08:51:43 +00:00
parent 712ba4058a
commit bb47fb8fb0
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
2 changed files with 43 additions and 32 deletions

View file

@ -37,10 +37,16 @@ use util::*;
// } // }
fn main() { fn main() {
let dfn = df_format(read_parquet(Ticker::ARKK).unwrap()).unwrap(); let read = Ark::new(Source::Read, Ticker::ARKK)
println!("{:#?}", dfn); .unwrap()
.collect()
.unwrap();
println!("{:#?}", read);
let api = df_format(get_api(Ticker::ARKK, None).unwrap()).unwrap(); let api = Ark::new(Source::ApiFull, Ticker::ARKK)
.unwrap()
.collect()
.unwrap();
println!("{:#?}", api); println!("{:#?}", api);
// let update = df_format(get_csv_ark(Ticker::ARKK).unwrap()).unwrap(); // let update = df_format(get_csv_ark(Ticker::ARKK).unwrap()).unwrap();

View file

@ -35,6 +35,7 @@ impl Ticker {
} }
} }
#[derive(Clone)]
pub enum DF { pub enum DF {
LazyFrame(LazyFrame), LazyFrame(LazyFrame),
DataFrame(DataFrame), DataFrame(DataFrame),
@ -82,7 +83,7 @@ pub enum Source {
ApiIncremental, ApiIncremental,
ApiFull, ApiFull,
} }
struct Ark { pub struct Ark {
df: DF, df: DF,
ticker: Ticker, ticker: Ticker,
} }
@ -99,6 +100,7 @@ impl Ark {
Source::ApiIncremental => { Source::ApiIncremental => {
let last_day = ark let last_day = ark
.df .df
.clone()
.collect()? .collect()?
.column("date") .column("date")
.unwrap() .unwrap()
@ -119,25 +121,31 @@ impl Ark {
Ok(ark) Ok(ark)
} }
pub fn write_parquet(&self) -> Result<&Self, Box<dyn Error>> { pub fn collect(self) -> Result<DataFrame, Box<dyn Error>> {
// with format self.df.collect()
self = self.format()?;
ParquetWriter::new(File::create(format!(
"data/parquet/{}.parquet",
self.ticker
))?)
.finish(&mut self.df.collect()?)?;
Ok(self)
} }
fn sort(&self) -> Result<&Self, Box<dyn Error>> { pub fn write_parquet(&mut self) -> Result<&Self, Box<dyn Error>> {
self.df = Self::df_sort(self.df)?; // with format
let ark = self.format()?;
ParquetWriter::new(File::create(format!(
"data/parquet/{}.parquet",
ark.ticker
))?)
.finish(&mut ark.df.clone().collect()?)?;
Ok(ark)
}
fn sort(mut self) -> Result<Self, Box<dyn Error>> {
self.df = Self::df_sort(self.df.clone())?;
Ok(self) Ok(self)
} }
fn df_sort(df: DF) -> Result<DF, Box<dyn Error>> { fn df_sort(df: DF) -> Result<DF, Box<dyn Error>> {
let df = df.collect()?.sort(["date", "weight"], vec![false, true])?; Ok(df
Ok(df.into()) .collect()?
.sort(["date", "weight"], vec![false, true])?
.into())
} }
fn read_parquet(ticker: Ticker) -> Result<DF, Box<dyn Error>> { fn read_parquet(ticker: Ticker) -> Result<DF, Box<dyn Error>> {
@ -148,9 +156,9 @@ impl Ark {
Ok(df.into()) Ok(df.into())
} }
fn concat_df(mut dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> { fn concat_df(dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> {
// with dedupe // with dedupe
let mut df = concat(dfs.lazy(), false, true)?; let df = concat(dfs.lazy(), false, true)?;
Ok(Self::dedupe(df.into())?) Ok(Self::dedupe(df.into())?)
} }
@ -162,12 +170,12 @@ impl Ark {
Ok(df) Ok(df)
} }
pub fn format(&self) -> Result<&Self, Box<dyn Error>> { pub fn format(&mut self) -> Result<&Self, Box<dyn Error>> {
self.df = Self::df_format(self.df)?.into(); self.df = Self::df_format(self.df.clone())?.into();
Ok(self) Ok(self)
} }
fn df_format(df: DF) -> Result<DF, Box<dyn Error>> { fn df_format(mut df: DF) -> Result<DF, Box<dyn Error>> {
let mut df = df.collect()?; let mut df = df.collect()?;
if df.get_column_names().contains(&"market_value_($)") { if df.get_column_names().contains(&"market_value_($)") {
@ -298,7 +306,7 @@ impl Ark {
Ok(df.into()) Ok(df.into())
} }
pub fn get_api(self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> { pub fn get_api(&self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> {
let tic: Ticker = self.ticker; let tic: Ticker = self.ticker;
let url = match (tic, last_day) { let url = match (tic, last_day) {
(self::Ticker::ARKVC, Some(last_day)) => format!( (self::Ticker::ARKVC, Some(last_day)) => format!(
@ -321,7 +329,7 @@ impl Ark {
Reader::Json.get_data_url(url) Reader::Json.get_data_url(url)
} }
pub fn get_csv_ark(self) -> Result<LazyFrame, Box<dyn Error>> { pub fn get_csv_ark(&self) -> Result<LazyFrame, Box<dyn Error>> {
let url = match self.ticker { let url = match self.ticker {
self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(),
_ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker), _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker),
@ -329,22 +337,19 @@ impl Ark {
Reader::Csv.get_data_url(url) Reader::Csv.get_data_url(url)
} }
pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<&'static Self, Box<dyn Error>> { pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<Self, Box<dyn Error>> {
let mut dfs = vec![]; let mut dfs = vec![];
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?); dfs.push(LazyCsvReader::new(x).finish()?);
} }
let mut df = concat(dfs, false, true)?.into(); let mut df = concat(dfs, false, true)?.into();
let ark = if Self::read_parquet(ticker).is_ok() { if Self::read_parquet(ticker).is_ok() {
let df_old = Self::read_parquet(ticker)?; let df_old = Self::read_parquet(ticker)?;
df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?; df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?
Self { df, ticker }.sort()?.write_parquet()? }
} else {
Self { df, ticker }.sort()?.write_parquet()?
};
Ok(ark) Ok(Self { df, ticker })
} }
} }