This commit is contained in:
Elijah McMorris 2023-06-16 02:38:36 +00:00
parent 5d5fb5cd94
commit 0aff65bd51
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
4 changed files with 209 additions and 38 deletions

View file

@ -1,2 +1,5 @@
# [target.x86_64-unknown-linux-gnu] # [target.x86_64-unknown-linux-gnu]
# rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"] # rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"]
[alias]
t = "nextest run"

View file

@ -23,3 +23,6 @@ tokio = { version = "1.26", features = ["full"] }
openssl = { version = "0.10", features = ["vendored"] } openssl = { version = "0.10", features = ["vendored"] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
serde_json = "1.0" serde_json = "1.0"
[dev-dependencies]
serial_test = "*"

View file

@ -37,25 +37,26 @@ use util::*;
// } // }
fn main() { fn main() {
let read = Ark::new(Source::Read, Ticker::ARKK) let read = Ark::new(Source::Read, Ticker::ARKK, None)
.unwrap() .unwrap()
.collect() .collect()
.unwrap(); .unwrap();
println!("{:#?}", read.dtypes());
println!("{:#?}", read); println!("{:#?}", read);
let api = Ark::new(Source::ApiFull, Ticker::ARKK) let api = Ark::new(Source::ApiFull, Ticker::ARKK, None)
.unwrap() .unwrap()
.collect() .collect()
.unwrap(); .unwrap();
println!("{:#?}", api); println!("{:#?}", api);
// let ark = Ark::new(Source::Ark, Ticker::ARKK) // let ark = Ark::new(Source::Ark, Ticker::ARKK, None)
// .unwrap() // .unwrap()
// .collect() // .collect()
// .unwrap(); // .unwrap();
// println!("{:#?}", ark); // println!("{:#?}", ark);
// let ark = Ark::new(Source::Ark, Ticker::ARKVC) // let ark = Ark::new(Source::Ark, Ticker::ARKVC, None)
// .unwrap() // .unwrap()
// .collect() // .collect()
// .unwrap(); // .unwrap();

View file

@ -1,6 +1,7 @@
use chrono::NaiveDate; use chrono::NaiveDate;
use glob::glob; use glob::glob;
use polars::datatypes::DataType; use polars::datatypes::DataType;
use polars::prelude::*; use polars::prelude::*;
use polars::prelude::{DataFrame, StrptimeOptions, UniqueKeepStrategy}; use polars::prelude::{DataFrame, StrptimeOptions, UniqueKeepStrategy};
use reqwest::blocking::Client; use reqwest::blocking::Client;
@ -8,6 +9,7 @@ use serde_json::Value;
use std::error::Error; use std::error::Error;
use std::fs::File; use std::fs::File;
use std::io::Cursor; use std::io::Cursor;
use std::result::Result; use std::result::Result;
use strum_macros::EnumIter; use strum_macros::EnumIter;
@ -84,20 +86,34 @@ pub enum Source {
ApiFull, ApiFull,
} }
pub struct Ark { pub struct Ark {
df: DF, pub df: DF,
ticker: Ticker, ticker: Ticker,
path: Option<String>,
} }
impl Ark { impl Ark {
pub fn new(source: Source, ticker: Ticker) -> Result<Self, Box<dyn Error>> { pub fn new(
source: Source,
ticker: Ticker,
path: Option<String>,
) -> Result<Self, Box<dyn Error>> {
let existing_file = Self::read_parquet(ticker, path.clone()).is_ok();
let mut ark = Self { let mut ark = Self {
df: Self::read_parquet(ticker)?, df: match existing_file {
true => Self::read_parquet(ticker, path.clone())?,
false => DF::DataFrame(df!["date" => [""],]?),
},
ticker, ticker,
path,
}; };
let update = match source { let update = match (source, existing_file) {
Source::Read => None, (Source::Read, false) => {
Source::Ark => Some(ark.get_csv_ark()?), panic!("Can not read from file, file is empty or does not exist")
Source::ApiIncremental => { }
(Source::Read, true) => None,
(Source::Ark, _) => Some(ark.get_csv_ark()?),
(Source::ApiIncremental, true) => {
let last_day = ark let last_day = ark
.df .df
.clone() .clone()
@ -108,15 +124,20 @@ impl Ark {
.and_then(NaiveDate::from_num_days_from_ce_opt); .and_then(NaiveDate::from_num_days_from_ce_opt);
Some(ark.get_api(last_day)?) Some(ark.get_api(last_day)?)
} }
Source::ApiFull => Some(ark.get_api(None)?), (Source::ApiIncremental, false) | (Source::ApiFull, _) => Some(ark.get_api(None)?),
}; };
if let Some(update) = update { if let Some(update) = update {
if existing_file {
ark.df = Self::concat_df(vec![ ark.df = Self::concat_df(vec![
Self::df_format(ark.df)?, Self::df_format(ark.df)?,
Self::df_format(update.into())?, Self::df_format(update.into())?,
])?; ])?;
} else {
ark.df = Self::df_format(update.into())?;
} }
}
Ok(ark) Ok(ark)
} }
@ -124,17 +145,35 @@ impl Ark {
self.df.collect() self.df.collect()
} }
pub fn write_parquet(&mut self) -> Result<&Self, Box<dyn Error>> { pub fn write_parquet(self) -> Result<Self, Box<dyn Error>> {
// with format // with format df
let ark = self.format()?; let ark = self.format()?;
ParquetWriter::new(File::create(format!( Self::write_df_parquet(
"data/parquet/{}.parquet", match &ark.path {
ark.ticker Some(path) => format!("{}/{}.parquet", path, ark.ticker),
))?) None => format!("data/parquet/{}.parquet", ark.ticker),
.finish(&mut ark.df.clone().collect()?)?; },
ark.df.clone(),
)?;
Ok(ark) Ok(ark)
} }
fn write_df_parquet(path: String, df: DF) -> Result<(), Box<dyn Error>> {
ParquetWriter::new(File::create(path)?).finish(&mut df.collect()?)?;
Ok(())
}
fn read_parquet(ticker: Ticker, path: Option<String>) -> Result<DF, Box<dyn Error>> {
let df = LazyFrame::scan_parquet(
match path {
Some(p) => format!("{}/{}.parquet", p, ticker),
None => format!("data/parquet/{}.parquet", ticker),
},
ScanArgsParquet::default(),
)?;
Ok(df.into())
}
fn sort(mut self) -> Result<Self, Box<dyn Error>> { fn sort(mut self) -> Result<Self, Box<dyn Error>> {
self.df = Self::df_sort(self.df.clone())?; self.df = Self::df_sort(self.df.clone())?;
Ok(self) Ok(self)
@ -147,14 +186,6 @@ impl Ark {
.into()) .into())
} }
fn read_parquet(ticker: Ticker) -> Result<DF, Box<dyn Error>> {
let df = LazyFrame::scan_parquet(
format!("data/parquet/{}.parquet", ticker),
ScanArgsParquet::default(),
)?;
Ok(df.into())
}
fn concat_df(dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> { fn concat_df(dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> {
// with dedupe // with dedupe
let df = concat(dfs.lazy(), false, true)?; let df = concat(dfs.lazy(), false, true)?;
@ -169,7 +200,7 @@ impl Ark {
Ok(df) Ok(df)
} }
pub fn format(&mut self) -> Result<&Self, Box<dyn Error>> { pub fn format(mut self) -> Result<Self, Box<dyn Error>> {
self.df = Self::df_format(self.df.clone())?; self.df = Self::df_format(self.df.clone())?;
Ok(self) Ok(self)
} }
@ -307,11 +338,11 @@ impl Ark {
pub fn get_api(&self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> { pub fn get_api(&self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> {
let url = match (self.ticker, last_day) { let url = match (self.ticker, last_day) {
(self::Ticker::ARKVC, Some(last_day)) => format!( (self::Ticker::ARKVC, Some(last_day)) => format!(
"https://api.nexveridian.com/arkvc_holdings?end={}", "https://api.nexveridian.com/arkvc_holdings?start={}",
last_day last_day
), ),
(tic, Some(last_day)) => format!( (tic, Some(last_day)) => format!(
"https://api.nexveridian.com/ark_holdings?ticker={}&end={}", "https://api.nexveridian.com/ark_holdings?ticker={}&start={}",
tic, last_day tic, last_day
), ),
(self::Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(), (self::Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(),
@ -330,19 +361,22 @@ impl Ark {
Reader::Csv.get_data_url(url) Reader::Csv.get_data_url(url)
} }
pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<Self, Box<dyn Error>> { pub fn merge_old_csv_to_parquet(
ticker: Ticker,
path: Option<String>,
) -> Result<Self, Box<dyn Error>> {
let mut dfs = vec![]; let mut dfs = vec![];
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?); dfs.push(LazyCsvReader::new(x).finish()?);
} }
let mut df = concat(dfs, false, true)?.into(); let mut df = concat(dfs, false, true)?.into();
if Self::read_parquet(ticker).is_ok() { if Self::read_parquet(ticker, path.clone()).is_ok() {
let df_old = Self::read_parquet(ticker)?; let df_old = Self::read_parquet(ticker, path.clone())?;
df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])? df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?
} }
Ok(Self { df, ticker }) Ok(Self { df, ticker, path })
} }
} }
@ -384,3 +418,133 @@ impl Reader {
Ok(df) Ok(df)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::fs;
fn write_df_parquet(df: DF) -> Result<(), Box<dyn Error>> {
Ark::write_df_parquet("data/test/ARKK.parquet".into(), df)?;
Ok(())
}
#[test]
#[serial]
fn read_write_parquet() -> Result<(), Box<dyn Error>> {
let test_df = df![
"date" => ["2023-01-01"],
"ticker" => ["TSLA"],
"cusip" => ["123abc"],
"company" => ["Tesla"],
"market_value" => [100],
"shares" => [10],
"share_price" => [10],
"weight" => [10.00]
]?;
write_df_parquet(test_df.clone().into())?;
let read = Ark::new(Source::Read, Ticker::ARKK, Some("data/test".to_owned()))?.collect()?;
fs::remove_file("data/test/ARKK.parquet")?;
assert_eq!(read, test_df);
Ok(())
}
#[test]
#[serial]
fn get_api_arkk() -> Result<(), Box<dyn Error>> {
let df = Ark::new(
Source::ApiIncremental,
Ticker::ARKK,
Some("data/test".to_owned()),
)?
.get_api(NaiveDate::from_ymd_opt(2023, 5, 18))?
.collect()?;
assert_eq!(
df.get_column_names(),
[
"company",
"cusip",
"date",
"market_value",
"share_price",
"shares",
"ticker",
"weight",
"weight_rank"
]
);
Ok(())
}
#[test]
#[serial]
fn get_api_format_arkk() -> Result<(), Box<dyn Error>> {
let dfl = Ark::new(
Source::ApiIncremental,
Ticker::ARKK,
Some("data/test".to_owned()),
)?
.get_api(NaiveDate::from_ymd_opt(2023, 5, 18))?;
let df = Ark::df_format(dfl.into())?.collect()?;
assert_eq!(
(df.get_column_names(), df.dtypes(), df.shape().1 > 1),
(
vec![
"date",
"ticker",
"cusip",
"company",
"market_value",
"shares",
"share_price",
"weight",
],
vec![
DataType::Date,
DataType::Utf8,
DataType::Utf8,
DataType::Utf8,
DataType::Int64,
DataType::Int64,
DataType::Float64,
DataType::Float64,
],
true
)
);
Ok(())
}
#[test]
#[serial]
fn get_api_format_arkvc() -> Result<(), Box<dyn Error>> {
let dfl = Ark::new(
Source::ApiIncremental,
Ticker::ARKVC,
Some("data/test".to_owned()),
)?
.get_api(NaiveDate::from_ymd_opt(2023, 1, 1))?;
let df = Ark::df_format(dfl.into())?.collect()?;
assert_eq!(
(df.get_column_names(), df.dtypes(), df.shape().1 > 1),
(
vec!["date", "ticker", "cusip", "company", "weight"],
vec![
DataType::Date,
DataType::Utf8,
DataType::Utf8,
DataType::Utf8,
DataType::Float64
],
true
)
);
Ok(())
}
}