diff --git a/Cargo.toml b/Cargo.toml index 82c4f68..5e67de5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,9 @@ polars = { version = "0.30", features = [ "parquet", "round_series", "lazy_regex", + "json", + "object", + "dtype-struct", ] } reqwest = { version = "0.11", features = ["blocking"] } glob = { version = "0.3" } @@ -18,3 +21,5 @@ strum_macros = "0.24" strum = "0.24" tokio = { version = "1.26", features = ["full"] } openssl = { version = "0.10", features = ["vendored"] } +chrono = { version = "0.4", features = ["serde"] } +serde_json = "1.0" diff --git a/src/main.rs b/src/main.rs index 6aa2c06..fa3b2f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,14 +37,17 @@ use util::*; // } fn main() { - let dfn = df_format(read_parquet(Ticker::ARKVC).unwrap()).unwrap(); + let dfn = df_format(read_parquet(Ticker::ARKK).unwrap()).unwrap(); println!("{:#?}", dfn); - // let update = df_format(get_csv(Ticker::ARKF).unwrap()).unwrap(); + let api = df_format(get_api(Ticker::ARKK, None).unwrap()).unwrap(); + println!("{:#?}", api); + + // let update = df_format(get_csv_ark(Ticker::ARKK).unwrap()).unwrap(); // println!("{:#?}", update); - // update_parquet(Ticker::ARKVC).unwrap(); - // let x = df_format(read_parquet(Ticker::ARKVC).unwrap()).unwrap(); + // update_parquet(Ticker::ARKK).unwrap(); + // let x = df_format(read_parquet(Ticker::ARKK).unwrap()).unwrap(); // println!("{:#?}", x); // merge_csv_to_parquet(Ticker::ARKVC).unwrap(); diff --git a/src/util.rs b/src/util.rs index 1ae543d..f358070 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,8 +1,10 @@ +use chrono::NaiveDate; use glob::glob; use polars::datatypes::DataType; use polars::prelude::*; use polars::prelude::{DataFrame, StrptimeOptions, UniqueKeepStrategy}; use reqwest::blocking::Client; +use serde_json::Value; use std::error::Error; use std::fs::File; use std::io::Cursor; @@ -34,23 +36,23 @@ impl Ticker { } } +#[derive(Clone, Copy)] +pub enum Source { + Ark, + ApiIncremental, + ApiFull, +} + pub fn merge_csv_to_parquet(ticker: Ticker) -> Result<(), Box> { let mut dfs = vec![]; - for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { dfs.push(LazyCsvReader::new(x).finish()?); } - let mut df = concat(dfs, false, true)?; if read_parquet(ticker).is_ok() { let df_old = read_parquet(ticker)?; - df = concat( - vec![df_format(df_old)?.lazy(), df_format(df)?.lazy()], - false, - true, - )? - .unique_stable(None, UniqueKeepStrategy::First); + df = concat_df(vec![df_old, df])?; write_parquet(ticker, df_sort(df.collect()?)?)?; } else { write_parquet(ticker, df_format(df)?)?; @@ -59,22 +61,31 @@ pub fn merge_csv_to_parquet(ticker: Ticker) -> Result<(), Box> { Ok(()) } -pub fn update_parquet(ticker: Ticker) -> Result<(), Box> { - let update = get_csv(ticker)?; - +pub fn update_parquet(ticker: Ticker, source: Source) -> Result<(), Box> { let mut df = read_parquet(ticker)?; + let last_day = df.clone().collect()?.column("date").unwrap().max(); - df = concat( - vec![df_format(df)?.lazy(), df_format(update)?.lazy()], - false, - true, - )? - .unique_stable(None, UniqueKeepStrategy::First); + let update = match source { + Source::Ark => get_csv_ark(ticker)?, + Source::ApiIncremental => get_api(ticker, last_day)?, + Source::ApiFull => get_api(ticker, None)?, + }; + df = concat_df(vec![df, update])?; write_parquet(ticker, df_sort(df.collect()?)?)?; Ok(()) } +fn concat_df(mut dfs: Vec) -> Result> { + // with dedupe and format + for x in &mut dfs { + *x = df_format(x.to_owned())?.lazy(); + } + let mut df = concat(dfs, false, true)?; + df = df.unique_stable(None, UniqueKeepStrategy::First); + Ok(df) +} + pub fn read_parquet(ticker: Ticker) -> Result> { let df = LazyFrame::scan_parquet( format!("data/parquet/{}.parquet", ticker), @@ -86,12 +97,12 @@ pub fn read_parquet(ticker: Ticker) -> Result> { pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box> { ParquetWriter::new(File::create(format!("data/parquet/{}.parquet", ticker))?) .finish(&mut df)?; - Ok(()) } pub fn df_sort(df: DataFrame) -> Result> { - Ok(df.sort(["date", "weight"], vec![false, true])?) + let df = df.sort(["date", "weight"], vec![false, true])?; + Ok(df) } pub fn df_format(df: LazyFrame) -> Result> { @@ -141,10 +152,11 @@ pub fn df_format(df: LazyFrame) -> Result> { expressions.push(col("date").str().strptime( DataType::Date, StrptimeOptions { - format: Some("%m/%d/%Y".into()), + // format: Some("%m/%d/%Y".into()), + format: None, strict: false, exact: true, - cache: false, + cache: true, }, )); } @@ -221,7 +233,45 @@ pub fn df_format(df: LazyFrame) -> Result> { Ok(df) } -pub fn get_csv(ticker: Ticker) -> Result> { +pub fn get_api(ticker: Ticker, last_day: Option) -> Result> { + let url = match (ticker, last_day) { + (Ticker::ARKVC, Some(last_day)) => format!( + "https://api.nexveridian.com/arkvc_holdings?end={}", + NaiveDate::from_num_days_from_ce_opt(last_day).unwrap() + ), + (ticker, Some(last_day)) => format!( + "https://api.nexveridian.com/ark_holdings?ticker={}&end={}", + ticker, + NaiveDate::from_num_days_from_ce_opt(last_day).unwrap() + ), + (Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(), + (ticker, None) => { + format!("https://api.nexveridian.com/ark_holdings?ticker={}", ticker) + } + }; + let response = Client::builder() + .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3") + .build()?.get(url).send()?; + + if !response.status().is_success() { + return Err(format!( + "HTTP request failed with status code: {:?}", + response.status() + ) + .into()); + } + let data = response.text()?.into_bytes(); + + let json_string = String::from_utf8(data)?; + let json: Value = serde_json::from_str(&json_string)?; + let df = JsonReader::new(Cursor::new(json.to_string())) + .finish()? + .lazy(); + + Ok(df) +} + +pub fn get_csv_ark(ticker: Ticker) -> Result> { let url = match ticker { Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", ticker.value(), ticker),