From fdc051593934dfef6d23f3ca44d3c7da2bd8e192 Mon Sep 17 00:00:00 2001 From: NexVeridian Date: Thu, 10 Aug 2023 10:10:45 +0000 Subject: [PATCH] 1.1.2 --- Cargo.toml | 9 +- README.md | 9 +- src/main.rs | 2 +- src/util.rs | 238 ++++++++++++++++++++++++++++++------------- tests/integration.rs | 7 +- 5 files changed, 183 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2ca07b..deecd18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,11 @@ [package] name = "ark-invest-api-rust-data" -version = "0.1.0" +license = "Apache-2.0" +version = "1.0.0" edition = "2021" [dependencies] -polars = { version = "0.30", features = [ +polars = { version = "0.32", features = [ "lazy", "strings", "parquet", @@ -16,7 +17,7 @@ polars = { version = "0.30", features = [ ] } reqwest = { version = "0.11", features = ["blocking", "gzip"] } glob = { version = "0.3" } -clokwerk = "0.4.0" +clokwerk = "0.4" strum_macros = "0.25" strum = "0.25" tokio = { version = "1.26", features = ["full"] } @@ -25,7 +26,7 @@ chrono = { version = "0.4", features = ["serde"] } serde_json = "1.0" rand = "0.8" futures = "0.3" -lazy_static = "1.4.0" +lazy_static = "1.4" [dev-dependencies] serial_test = "*" diff --git a/README.md b/README.md index ea95519..2b68da7 100644 --- a/README.md +++ b/README.md @@ -29,10 +29,15 @@ pub enum Source { Read, // From ARK Invest Ark, - // From api.NexVeridian.com (Default) + // From api.NexVeridian.com + #[default] ApiIncremental, - // From api.NexVeridian.com, not usually nessisary, use ApiIncremental + // From api.NexVeridian.com, not usually nessisary, use ApiIncremental ApiFull, + // From arkfunds.io/api, avoid using, use ApiIncremental instead + ArkFundsIoIncremental, + // From arkfunds.io/api, avoid using, use ApiFull instead + ArkFundsIoFull, } ``` diff --git a/src/main.rs b/src/main.rs index b25ee31..c128f52 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ lazy_static! { static ref SOURCE: Source = match env::var("ARK_SOURCE") { Ok(val) => Source::from_str(val.as_str()).expect("Env string ARK_SOURCE is not in enum Source"), - Err(_e) => Source::ApiIncremental, + Err(_) => Source::ApiIncremental, }; } diff --git a/src/util.rs b/src/util.rs index 171a753..63fdff9 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,4 @@ -use chrono::NaiveDate; +use chrono::{Duration, NaiveDate}; use glob::glob; use polars::datatypes::DataType; use polars::lazy::dsl::StrptimeOptions; @@ -10,15 +10,17 @@ use serde_json::Value; use std::error::Error; use std::fs::{create_dir_all, File}; use std::io::Cursor; + use std::path::Path; use std::result::Result; use strum_macros::{EnumIter, EnumString}; -#[derive(strum_macros::Display, EnumIter, Clone, Copy, PartialEq, Debug)] +#[derive(Debug, Default, strum_macros::Display, EnumIter, Clone, Copy, PartialEq)] pub enum Ticker { ARKVC, ARKF, ARKG, + #[default] ARKK, ARKQ, ARKW, @@ -80,17 +82,24 @@ impl DFS for Vec { } } -#[derive(EnumString, Clone, Copy)] +#[derive(Debug, Default, EnumString, Clone, Copy, PartialEq)] pub enum Source { // Reads Parquet file if exists Read, // From ARK Invest Ark, // From api.NexVeridian.com + #[default] ApiIncremental, // From api.NexVeridian.com, not usually nessisary, use ApiIncremental ApiFull, + // From arkfunds.io/api, avoid using, use ApiIncremental instead + ArkFundsIoIncremental, + // From arkfunds.io/api, avoid using, use ApiFull instead + ArkFundsIoFull, } + +#[derive(Clone)] pub struct Ark { pub df: DF, ticker: Ticker, @@ -102,11 +111,11 @@ impl Ark { ticker: Ticker, path: Option, ) -> Result> { - let existing_file = Self::read_parquet(ticker, path.clone()).is_ok(); + let existing_file = Self::read_parquet(&ticker, path.as_ref()).is_ok(); let mut ark = Self { df: match existing_file { - true => Self::read_parquet(ticker, path.clone())?, + true => Self::read_parquet(&ticker, path.as_ref())?, false => DF::DataFrame(df!["date" => [""],]?), }, ticker, @@ -115,22 +124,16 @@ impl Ark { let update = match (source, existing_file) { (Source::Read, false) => { - panic!("Can not read from file, file is empty, does not exist, or is locked") + panic!("Can not read from file. file is empty, does not exist, or is locked") } (Source::Read, true) => None, (Source::Ark, _) => Some(ark.get_csv_ark()?), - (Source::ApiIncremental, true) => { - let last_day = ark - .df - .clone() - .collect()? - .column("date") - .unwrap() - .max() - .and_then(NaiveDate::from_num_days_from_ce_opt); - Some(ark.get_api(last_day)?) + (Source::ApiIncremental, true) | (Source::ArkFundsIoIncremental, true) => { + let last_day = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() + + Duration::days(ark.df.clone().collect()?.column("date")?.max().unwrap()); + Some(ark.get_api(Some(last_day), Some(&source))?) } - (Source::ApiIncremental, false) | (Source::ApiFull, _) => Some(ark.get_api(None)?), + _ => Some(ark.get_api(None, Some(&source))?), }; if let Some(update) = update { @@ -174,7 +177,7 @@ impl Ark { Ok(()) } - fn read_parquet(ticker: Ticker, path: Option) -> Result> { + fn read_parquet(ticker: &Ticker, path: Option<&String>) -> Result> { let df = LazyFrame::scan_parquet( match path { Some(p) => format!("{}/{}.parquet", p, ticker), @@ -186,20 +189,25 @@ impl Ark { } pub fn sort(mut self) -> Result> { - self.df = Self::df_sort(self.df.clone())?; + self.df = Self::df_sort(self.df)?; Ok(self) } pub fn df_sort(df: DF) -> Result> { Ok(df .collect()? - .sort(["date", "weight"], vec![false, true])? + .sort(["date", "weight"], vec![false, true], false)? .into()) } fn concat_df(dfs: Vec) -> Result> { // with dedupe - let df = concat(dfs.lazy(), false, true)?; + let df = concat( + dfs.lazy(), + UnionArgs { + ..Default::default() + }, + )?; Self::dedupe(df.into()) } @@ -212,7 +220,7 @@ impl Ark { } pub fn format(mut self) -> Result> { - self.df = Self::df_format(self.df.clone())?; + self.df = Self::df_format(self.df)?; Ok(self) } @@ -263,34 +271,32 @@ impl Ark { } if !df.fields().contains(&Field::new("date", DataType::Date)) { - let date_format = |mut df: DataFrame, format:Option| -> Result> { - df = df - .lazy() - .with_column(col("date").str().strptime( - DataType::Date, - StrptimeOptions { - format, - strict: false, - exact: true, - cache: true, - }, - )) - .collect()?; - - if df.column("date").unwrap().null_count() > df.height() / 10 { - return Err("wrong date format".into()); - } + let date_format = + |mut df: DataFrame, format: Option| -> Result> { + df = df + .lazy() + .with_column(col("date").str().strptime( + DataType::Date, + StrptimeOptions { + format, + strict: false, + ..Default::default() + }, + )) + .collect()?; - Ok(df) - }; + if df.column("date").unwrap().null_count() > df.height() / 10 { + return Err("wrong date format".into()); + } + + Ok(df) + }; if let Ok(x) = date_format(df.clone(), Some("%m/%d/%Y".into())) { df = x - } - else if let Ok(x) = date_format(df.clone(), Some("%Y/%m/%d".into())) { + } else if let Ok(x) = date_format(df.clone(), Some("%Y/%m/%d".into())) { df = x - } - else if let Ok(x) = date_format(df.clone(), None) { + } else if let Ok(x) = date_format(df.clone(), None) { df = x } } @@ -321,6 +327,13 @@ impl Ark { ); } + if df + .fields() + .contains(&Field::new("market_value", DataType::Float64)) + { + expressions.push(col("market_value").cast(DataType::Int64)); + } + if df.fields().contains(&Field::new("shares", DataType::Utf8)) { expressions.push( col("shares") @@ -330,6 +343,53 @@ impl Ark { ); } + // rename values + expressions.push( + col("ticker") + .str() + .replace(lit("DKNG UW"), lit("DKNN"), true) + .str() + .replace(lit("NU UN"), lit("NU"), true) + .str() + .replace(lit("DSY"), lit("DSY FP"), true) + .str() + .replace(lit("GRMN UN"), lit("GRMN"), true) + .str() + .replace(lit("ARCT UQ"), lit("ARCT"), true) + .str() + .replace(lit("PRNT UF"), lit("PRNT"), true), + ); + expressions.push( + col("company") + .str() + .replace_all(lit("-A"), lit(""), true) + .str() + .replace_all(lit("- A"), lit(""), true) + .str() + .replace_all(lit("-CL A"), lit(""), true) + .str() + .replace_all(lit("-CLASS A"), lit(""), true) + .str() + .replace_all(lit("Inc"), lit(""), true) + .str() + .replace_all(lit("INC"), lit(""), true) + .str() + .replace_all(lit("LTD"), lit(""), true) + .str() + .replace_all(lit("CORP"), lit(""), true) + .str() + .replace_all(lit("CORPORATION"), lit(""), true) + .str() + .replace_all(lit(","), lit(""), true) + .str() + .replace_all(lit("."), lit(""), true) + .str() + .replace(lit("Blackdaemon"), lit("Blockdaemon"), true) + .str() + .rstrip(None), + ); + + // run expressions df = df .lazy() .with_columns(expressions) @@ -372,25 +432,55 @@ impl Ark { Ok(df.into()) } - pub fn get_api(&self, last_day: Option) -> Result> { - let url = match (self.ticker, last_day) { + pub fn get_api( + &self, + last_day: Option, + source: Option<&Source>, + ) -> Result> { + let url = match (&self.ticker, last_day) { (self::Ticker::ARKVC, Some(last_day)) => format!( "https://api.nexveridian.com/arkvc_holdings?start={}", last_day ), - (tic, Some(last_day)) => format!( - "https://api.nexveridian.com/ark_holdings?ticker={}&start={}", - tic, last_day - ), + (tic, Some(last_day)) => match source { + Some(Source::ArkFundsIoIncremental) => format!( + "https://arkfunds.io/api/v2/etf/holdings?symbol={}&date_from={}", + tic, last_day + ), + _ => format!( + "https://api.nexveridian.com/ark_holdings?ticker={}&start={}", + tic, last_day + ), + }, (self::Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(), - (tic, None) => { - format!("https://api.nexveridian.com/ark_holdings?ticker={}", tic) - } + (tic, None) => match source { + Some(Source::ArkFundsIoFull) => { + format!("https://arkfunds.io/api/v2/etf/holdings?symbol={}", tic) + } + _ => { + format!("https://api.nexveridian.com/ark_holdings?ticker={}", tic) + } + }, }; - Reader::Json.get_data_url(url) + + let mut df = Reader::Json.get_data_url(url)?; + df = match source { + Some(Source::ArkFundsIoIncremental) | Some(Source::ArkFundsIoFull) => { + df = df + .column("holdings")? + .clone() + .explode()? + .struct_()? + .clone() + .unnest(); + df + } + _ => df, + }; + Ok(df) } - pub fn get_csv_ark(&self) -> Result> { + pub fn get_csv_ark(&self) -> Result> { let url = match self.ticker { self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker), @@ -406,31 +496,38 @@ impl Ark { for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { dfs.push(LazyCsvReader::new(x).finish()?); } - let mut df = concat(dfs, false, true)?.into(); + let mut df = concat( + dfs, + UnionArgs { + ..Default::default() + }, + )? + .into(); - if Self::read_parquet(ticker, path.clone()).is_ok() { - let df_old = Self::read_parquet(ticker, path.clone())?; + if Self::read_parquet(&ticker, path.as_ref()).is_ok() { + let df_old = Self::read_parquet(&ticker, path.as_ref())?; df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?; } Ok(Self { df, ticker, path }) } } +#[derive(Debug, Clone, Copy, PartialEq)] pub enum Reader { Csv, Json, } impl Reader { - pub fn get_data_url(&self, url: String) -> Result> { + pub fn get_data_url(&self, url: String) -> Result> { let mut headers = HeaderMap::new(); headers.insert( - header::USER_AGENT, + header::USER_AGENT, HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"), ); headers.insert( - header::ACCEPT, + header::ACCEPT, HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8"), ); headers.insert( @@ -439,9 +536,11 @@ impl Reader { ); let response = Client::builder() - .default_headers(headers) - .gzip(true) - .build()?.get(url).send()?; + .default_headers(headers) + .gzip(true) + .build()? + .get(url) + .send()?; if !response.status().is_success() { return Err(format!( @@ -453,17 +552,14 @@ impl Reader { let data = response.text()?.into_bytes(); - let df: LazyFrame = match self { + let df = match self { Self::Csv => CsvReader::new(Cursor::new(data)) .has_header(true) - .finish()? - .lazy(), + .finish()?, Self::Json => { let json_string = String::from_utf8(data)?; let json: Value = serde_json::from_str(&json_string)?; - JsonReader::new(Cursor::new(json.to_string())) - .finish()? - .lazy() + JsonReader::new(Cursor::new(json.to_string())).finish()? } }; diff --git a/tests/integration.rs b/tests/integration.rs index cab530d..342b2dc 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -12,8 +12,7 @@ fn get_api_arkk() -> Result<(), Box> { Ticker::ARKK, Some("data/test".to_owned()), )? - .get_api(NaiveDate::from_ymd_opt(2023, 5, 18))? - .collect()?; + .get_api(NaiveDate::from_ymd_opt(2023, 5, 18), None)?; let expected = [ "company", @@ -43,7 +42,7 @@ fn get_api_format_arkk() -> Result<(), Box> { Ticker::ARKK, Some("data/test".to_owned()), )? - .get_api(NaiveDate::from_ymd_opt(2023, 5, 18))?; + .get_api(NaiveDate::from_ymd_opt(2023, 5, 18), None)?; let df = Ark::df_format(dfl.into())?.collect()?; assert_eq!( @@ -83,7 +82,7 @@ fn get_api_format_arkvc() -> Result<(), Box> { Ticker::ARKVC, Some("data/test".to_owned()), )? - .get_api(NaiveDate::from_ymd_opt(2023, 1, 1))?; + .get_api(NaiveDate::from_ymd_opt(2023, 1, 1), None)?; let df = Ark::df_format(dfl.into())?.collect()?; assert_eq!(