diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7aa0a93..9c37c5f 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -63,7 +63,9 @@ "christian-kohler.path-intellisense", "Gruntfuggly.todo-tree", "ms-azuretools.vscode-docker", - "redhat.vscode-yaml" + "redhat.vscode-yaml", + "GitHub.copilot", + "GitHub.copilot-chat" ] } } diff --git a/.github/workflows/nextest.yml b/.github/workflows/nextest.yml index f1c8056..2de7bac 100644 --- a/.github/workflows/nextest.yml +++ b/.github/workflows/nextest.yml @@ -16,7 +16,7 @@ env: jobs: run-tests: - name: Run tests + name: run tests runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -31,7 +31,7 @@ jobs: run: cargo nextest run -E "all() - test(get_api) - kind(bin)" clippy: - name: Clippy + name: clippy runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/Cargo.toml b/Cargo.toml index deecd18..d2414c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ version = "1.0.0" edition = "2021" [dependencies] -polars = { version = "0.32", features = [ +polars = { version = "0.35", features = [ "lazy", "strings", "parquet", @@ -27,6 +27,7 @@ serde_json = "1.0" rand = "0.8" futures = "0.3" lazy_static = "1.4" +anyhow = "1.0" [dev-dependencies] serial_test = "*" diff --git a/src/main.rs b/src/main.rs index 47bfcb9..49c7cb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,10 @@ +use anyhow::{Error, Result}; use clokwerk::{AsyncScheduler, Job, TimeUnits}; use futures::future::join_all; use lazy_static::lazy_static; use polars::prelude::DataFrame; use rand::Rng; use std::env; -use std::error::Error; -use std::result::Result; use std::str::FromStr; use std::thread; use strum::IntoEnumIterator; @@ -32,7 +31,7 @@ fn print_df(ticker: &Ticker, df: &DataFrame) { ); } -fn csv_merge() -> Result<(), Box> { +fn csv_merge() -> Result<(), Error> { for ticker in Ticker::iter() { let df = Ark::merge_old_csv_to_parquet(ticker, None)? .format()? @@ -44,9 +43,9 @@ fn csv_merge() -> Result<(), Box> { Ok(()) } -fn ark_plan(ticker: Ticker) -> Result<(), Box> { +fn ark_plan(ticker: Ticker) -> Result<(), Error> { println!("Starting: {:#?}", ticker); - let sec = Duration::from_secs(rand::thread_rng().gen_range(30 * 60..=4* 60 * 60)); + let sec = Duration::from_secs(rand::thread_rng().gen_range(30 * 60..=4 * 60 * 60)); // sleep(sec).await; thread::sleep(sec); @@ -59,7 +58,7 @@ fn ark_plan(ticker: Ticker) -> Result<(), Box> { Ok(()) } -async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box> { +async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Error> { task::spawn_blocking(move || ark_plan(ticker).unwrap()) .await .unwrap(); diff --git a/src/util.rs b/src/util.rs index 23bcf70..3bfdab7 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, Error, Result}; use chrono::{Duration, NaiveDate}; use glob::glob; use polars::datatypes::DataType; @@ -7,12 +8,9 @@ use reqwest::blocking::Client; use reqwest::header; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::Value; -use std::error::Error; use std::fs::{create_dir_all, File}; use std::io::Cursor; - use std::path::Path; -use std::result::Result; use strum_macros::{EnumIter, EnumString}; #[derive(Debug, Default, strum_macros::Display, EnumIter, Clone, Copy, PartialEq)] @@ -56,7 +54,7 @@ impl From for DF { } } impl DF { - pub fn collect(self) -> Result> { + pub fn collect(self) -> Result { match self { DF::LazyFrame(x) => Ok(x.collect()?), DF::DataFrame(x) => Ok(x), @@ -106,11 +104,7 @@ pub struct Ark { path: Option, } impl Ark { - pub fn new( - source: Source, - ticker: Ticker, - path: Option, - ) -> Result> { + pub fn new(source: Source, ticker: Ticker, path: Option) -> Result { let existing_file = Self::read_parquet(&ticker, path.as_ref()).is_ok(); let mut ark = Self { @@ -150,11 +144,11 @@ impl Ark { Ok(ark) } - pub fn collect(self) -> Result> { + pub fn collect(self) -> Result { self.df.collect() } - pub fn write_parquet(self) -> Result> { + pub fn write_parquet(self) -> Result { // with format df let ark = self.format()?; Self::write_df_parquet( @@ -167,7 +161,7 @@ impl Ark { Ok(ark) } - fn write_df_parquet(path: String, df: DF) -> Result<(), Box> { + fn write_df_parquet(path: String, df: DF) -> Result<(), Error> { if let Some(parent) = Path::new(&path).parent() { if !parent.exists() { create_dir_all(parent)?; @@ -177,7 +171,7 @@ impl Ark { Ok(()) } - fn read_parquet(ticker: &Ticker, path: Option<&String>) -> Result> { + fn read_parquet(ticker: &Ticker, path: Option<&String>) -> Result { let df = LazyFrame::scan_parquet( match path { Some(p) => format!("{}/{}.parquet", p, ticker), @@ -188,19 +182,19 @@ impl Ark { Ok(df.into()) } - pub fn sort(mut self) -> Result> { + pub fn sort(mut self) -> Result { self.df = Self::df_sort(self.df)?; Ok(self) } - pub fn df_sort(df: DF) -> Result> { + pub fn df_sort(df: DF) -> Result { Ok(df .collect()? .sort(["date", "weight"], vec![false, true], false)? .into()) } - fn concat_df(dfs: Vec) -> Result> { + fn concat_df(dfs: Vec) -> Result { // with dedupe let df = concat( dfs.lazy(), @@ -211,7 +205,7 @@ impl Ark { Self::dedupe(df.into()) } - pub fn dedupe(mut df: DF) -> Result> { + pub fn dedupe(mut df: DF) -> Result { df = df .lazy() .unique_stable(None, UniqueKeepStrategy::First) @@ -219,12 +213,12 @@ impl Ark { Ok(df) } - pub fn format(mut self) -> Result> { + pub fn format(mut self) -> Result { self.df = Self::df_format(self.df)?; Ok(self) } - pub fn df_format(df: DF) -> Result> { + pub fn df_format(df: DF) -> Result { let mut df = df.collect()?; if df.get_column_names().contains(&"market_value_($)") { @@ -272,7 +266,7 @@ impl Ark { if !df.fields().contains(&Field::new("date", DataType::Date)) { let date_format = - |mut df: DataFrame, format: Option| -> Result> { + |mut df: DataFrame, format: Option| -> Result { df = df .lazy() .with_column(col("date").str().strptime( @@ -282,11 +276,12 @@ impl Ark { strict: false, ..Default::default() }, + lit("earliest"), )) .collect()?; if df.column("date").unwrap().null_count() > df.height() / 10 { - return Err("wrong date format".into()); + return Err(anyhow!("wrong date format")); } Ok(df) @@ -347,62 +342,24 @@ impl Ark { expressions.push( col("ticker") .str() - .replace_all(lit(" FP"), lit(""), true) - .str() - .replace_all(lit(" UQ"), lit(""), true) - .str() - .replace_all(lit(" UF"), lit(""), true) - .str() - .replace_all(lit(" UN"), lit(""), true) - .str() - .replace_all(lit(" UW"), lit(""), true) + .replace_all(lit("(?i) fp| uq| un| uw"), lit(""), true) .str() .replace(lit("DKNN"), lit("DKNG"), true) .str() - .rstrip(None), + .strip_chars(lit("None")), ); expressions.push( col("company") .str() - .replace_all(lit("-A"), lit(""), true) + .replace_all(lit("(?i) a| cl| class| inc| ltd| corp| corporation| c| cl| se| hold| holdings| international|-|,|."), lit(""), true) .str() - .replace_all(lit("- A"), lit(""), true) - .str() - .replace_all(lit("CL A"), lit(""), true) - .str() - .replace_all(lit("CLASS A"), lit(""), true) - .str() - .replace_all(lit("Inc"), lit(""), true) - .str() - .replace_all(lit("INC"), lit(""), true) - .str() - .replace_all(lit("incorporated"), lit(""), true) - .str() - .replace_all(lit("LTD"), lit(""), true) - .str() - .replace_all(lit("CORP"), lit(""), true) - .str() - .replace_all(lit("CORPORATION"), lit(""), true) - .str() - .replace_all(lit("Corporation"), lit(""), true) - .str() - .replace_all(lit("- C"), lit(""), true) - .str() - .replace_all(lit("-"), lit(""), true) - .str() - .replace_all(lit(","), lit(""), true) - .str() - .replace_all(lit("."), lit(""), true) - .str() - .replace(lit("COINBASE GLOBAL"), lit("COINBASE"), true) - .str() - .replace(lit("Coinbase Global"), lit("Coinbase"), true) + .replace(lit("(?i)Coinbase Global"), lit("Coinbase"), true) .str() .replace(lit("Blackdaemon"), lit("Blockdaemon"), true) .str() .replace(lit("DISCOVERY"), lit("Dassault Systemes"), true) .str() - .rstrip(None), + .strip_chars(lit("None")), ); // run expressions @@ -452,7 +409,7 @@ impl Ark { &self, last_day: Option, source: Option<&Source>, - ) -> Result> { + ) -> Result { let url = match (&self.ticker, last_day) { (self::Ticker::ARKVC, Some(last_day)) => format!( "https://api.nexveridian.com/arkvc_holdings?start={}", @@ -496,7 +453,7 @@ impl Ark { Ok(df) } - pub fn get_csv_ark(&self) -> Result> { + pub fn get_csv_ark(&self) -> Result { let url = match self.ticker { self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker), @@ -504,10 +461,7 @@ impl Ark { Reader::Csv.get_data_url(url) } - pub fn merge_old_csv_to_parquet( - ticker: Ticker, - path: Option, - ) -> Result> { + pub fn merge_old_csv_to_parquet(ticker: Ticker, path: Option) -> Result { let mut dfs = vec![]; for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { dfs.push(LazyCsvReader::new(x).finish()?); @@ -535,7 +489,7 @@ pub enum Reader { } impl Reader { - pub fn get_data_url(&self, url: String) -> Result> { + pub fn get_data_url(&self, url: String) -> Result { let mut headers = HeaderMap::new(); headers.insert( header::USER_AGENT, @@ -559,11 +513,10 @@ impl Reader { .send()?; if !response.status().is_success() { - return Err(format!( + return Err(anyhow!( "HTTP request failed with status code: {:?}", response.status() - ) - .into()); + )); } let data = response.text()?.into_bytes(); @@ -591,7 +544,7 @@ mod tests { #[test] #[serial] - fn read_write_parquet() -> Result<(), Box> { + fn read_write_parquet() -> Result<(), Error> { let test_df = df![ "date" => ["2023-01-01"], "ticker" => ["TSLA"],