diff --git a/src/util.rs b/src/util.rs index 21db624..e709951 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,19 +1,19 @@ use anyhow::{anyhow, Error, Result}; use chrono::{Duration, NaiveDate}; +use data_reader::Reader; use df::{DF, DFS}; use glob::glob; use polars::datatypes::DataType; use polars::lazy::dsl::StrptimeOptions; use polars::prelude::*; -use reqwest::blocking::Client; -use serde_json::Value; use std::fs::{create_dir_all, File}; -use std::io::Cursor; use std::path::Path; use strum_macros::EnumString; use ticker::{DataSource, Ticker}; +pub mod data_reader; pub mod df; +mod df_format; pub mod ticker; #[derive(Debug, Default, EnumString, Clone, Copy, PartialEq)] @@ -154,154 +154,13 @@ impl Ark { Ok(self) } - fn df_format_21shares(df: DF) -> Result { - let mut df = df.collect()?; - - if df.get_column_names().contains(&"Weightings") { - df = df - .lazy() - .rename( - vec![ - "Date", - "StockTicker", - "CUSIP", - "SecurityName", - "Shares", - "Price", - "MarketValue", - "Weightings", - ], - vec![ - "date", - "ticker", - "cusip", - "company", - "shares", - "share_price", - "market_value", - "weight", - ], - ) - .collect()?; - - _ = df.drop_in_place("Account"); - _ = df.drop_in_place("NetAssets"); - _ = df.drop_in_place("SharesOutstanding"); - _ = df.drop_in_place("CreationUnits"); - _ = df.drop_in_place("MoneyMarketFlag"); - } - - Ok(df.into()) - } - - fn df_format_arkvx(df: DF) -> Result { - let mut df = df.collect()?; - - if df.get_column_names().contains(&"CUSIP") { - df = df - .lazy() - .rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]) - .collect()?; - } - if df.get_column_names().contains(&"weight (%)") { - df = df - .lazy() - .rename(vec!["weight (%)"], vec!["weight"]) - .collect()?; - } - - if !df.get_column_names().contains(&"market_value") { - df = df - .lazy() - .with_columns([ - Series::new("market_value", [None::]).lit(), - Series::new("shares", [None::]).lit(), - Series::new("share_price", [None::]).lit(), - ]) - .collect()?; - } - - Ok(df.into()) - } - - fn df_format_europe(df: DF) -> Result { - let mut df = df.collect()?; - - if df.get_column_names().contains(&"Currency") { - _ = df.drop_in_place("Currency"); - - df = df - .lazy() - .rename( - vec!["name", "ISIN", "Weight"], - vec!["company", "cusip", "weight"], - ) - .with_columns([ - Series::new("date", [chrono::Local::now().date_naive()]).lit(), - Series::new("ticker", [None::]).lit(), - Series::new("market_value", [None::]).lit(), - Series::new("shares", [None::]).lit(), - Series::new("share_price", [None::]).lit(), - ]) - .collect()?; - } - - Ok(df.into()) - } - - fn df_format_europe_arkfundsio(df: DF) -> Result { - let mut df = df.collect()?; - - if df - .get_column_names() - .eq(&["company", "cusip", "date", "fund", "weight", "weight_rank"]) - { - _ = df.drop_in_place("fund"); - _ = df.drop_in_place("weight_rank"); - - df = df - .lazy() - .with_columns([ - Series::new("ticker", [None::]).lit(), - Series::new("market_value", [None::]).lit(), - Series::new("shares", [None::]).lit(), - Series::new("share_price", [None::]).lit(), - ]) - .collect()?; - } - Ok(df.into()) - } - - fn df_format_europe_csv(df: DF) -> Result { - let mut df = df.collect()?; - - if df.get_column_names().contains(&"_duplicated_0") { - df = df.slice(2, df.height()); - - df = df - .clone() - .lazy() - .rename(df.get_column_names(), ["company", "cusip", "weight"]) - .with_columns([ - Series::new("date", [chrono::Local::now().date_naive()]).lit(), - Series::new("ticker", [None::]).lit(), - Series::new("market_value", [None::]).lit(), - Series::new("shares", [None::]).lit(), - Series::new("share_price", [None::]).lit(), - ]) - .collect()?; - } - - Ok(df.into()) - } - pub fn df_format(df: DF) -> Result { let mut df = df.collect()?; - df = Self::df_format_europe_csv(df.into())?.collect()?; - df = Self::df_format_europe_arkfundsio(df.into())?.collect()?; - df = Self::df_format_21shares(df.into())?.collect()?; - df = Self::df_format_arkvx(df.into())?.collect()?; - df = Self::df_format_europe(df.into())?.collect()?; + df = df_format::df_format_europe_csv(df.into())?.collect()?; + df = df_format::df_format_europe_arkfundsio(df.into())?.collect()?; + df = df_format::df_format_21shares(df.into())?.collect()?; + df = df_format::df_format_arkvx(df.into())?.collect()?; + df = df_format::df_format_europe(df.into())?.collect()?; if df.get_column_names().contains(&"market_value_($)") { df = df @@ -706,44 +565,6 @@ impl Ark { } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum Reader { - Csv, - Json, -} - -impl Reader { - pub fn get_data_url(&self, url: String) -> Result { - let response = Client::builder() - .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36") - .gzip(true) - .build()? - .get(url) - .send()?; - - if !response.status().is_success() { - return Err(anyhow!( - "HTTP request failed with status code: {:?}", - response.status() - )); - } - - let data = response.text()?.into_bytes(); - - let df = match self { - Self::Csv => CsvReader::new(Cursor::new(data)) - .has_header(true) - .finish()?, - Self::Json => { - let json_string = String::from_utf8(data)?; - let json: Value = serde_json::from_str(&json_string)?; - JsonReader::new(Cursor::new(json.to_string())).finish()? - } - }; - Ok(df) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/util/data_reader.rs b/src/util/data_reader.rs new file mode 100644 index 0000000..61787df --- /dev/null +++ b/src/util/data_reader.rs @@ -0,0 +1,45 @@ +use anyhow::{anyhow, Error}; +use polars::frame::DataFrame; +use polars::io::SerReader; +use polars::prelude::{CsvReader, JsonReader}; +use reqwest::blocking::Client; +use serde_json::Value; +use std::io::Cursor; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum Reader { + Csv, + Json, +} + +impl Reader { + pub fn get_data_url(&self, url: String) -> anyhow::Result { + let response = Client::builder() + .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36") + .gzip(true) + .build()? + .get(url) + .send()?; + + if !response.status().is_success() { + return Err(anyhow!( + "HTTP request failed with status code: {:?}", + response.status() + )); + } + + let data = response.text()?.into_bytes(); + + let df = match self { + Self::Csv => CsvReader::new(Cursor::new(data)) + .has_header(true) + .finish()?, + Self::Json => { + let json_string = String::from_utf8(data)?; + let json: Value = serde_json::from_str(&json_string)?; + JsonReader::new(Cursor::new(json.to_string())).finish()? + } + }; + Ok(df) + } +} diff --git a/src/util/df.rs b/src/util/df.rs index fe0fb4e..c84526f 100644 --- a/src/util/df.rs +++ b/src/util/df.rs @@ -35,7 +35,7 @@ impl DF { } } -#[allow(clippy::upper_case_acronyms)] +#[allow(clippy::upper_case_acronyms, dead_code)] pub trait DFS { fn lazy(self) -> Vec; fn collect(self) -> Vec; diff --git a/src/util/df_format.rs b/src/util/df_format.rs new file mode 100644 index 0000000..e18b178 --- /dev/null +++ b/src/util/df_format.rs @@ -0,0 +1,145 @@ +use anyhow::{Error, Result}; +use polars::prelude::*; + +use crate::util::df::DF; + +pub fn df_format_21shares(df: DF) -> Result { + let mut df = df.collect()?; + + if df.get_column_names().contains(&"Weightings") { + df = df + .lazy() + .rename( + vec![ + "Date", + "StockTicker", + "CUSIP", + "SecurityName", + "Shares", + "Price", + "MarketValue", + "Weightings", + ], + vec![ + "date", + "ticker", + "cusip", + "company", + "shares", + "share_price", + "market_value", + "weight", + ], + ) + .collect()?; + + _ = df.drop_in_place("Account"); + _ = df.drop_in_place("NetAssets"); + _ = df.drop_in_place("SharesOutstanding"); + _ = df.drop_in_place("CreationUnits"); + _ = df.drop_in_place("MoneyMarketFlag"); + } + + Ok(df.into()) +} + +pub fn df_format_arkvx(df: DF) -> Result { + let mut df = df.collect()?; + + if df.get_column_names().contains(&"CUSIP") { + df = df + .lazy() + .rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]) + .collect()?; + } + if df.get_column_names().contains(&"weight (%)") { + df = df + .lazy() + .rename(vec!["weight (%)"], vec!["weight"]) + .collect()?; + } + + if !df.get_column_names().contains(&"market_value") { + df = df + .lazy() + .with_columns([ + Series::new("market_value", [None::]).lit(), + Series::new("shares", [None::]).lit(), + Series::new("share_price", [None::]).lit(), + ]) + .collect()?; + } + + Ok(df.into()) +} + +pub fn df_format_europe(df: DF) -> Result { + let mut df = df.collect()?; + + if df.get_column_names().contains(&"Currency") { + _ = df.drop_in_place("Currency"); + + df = df + .lazy() + .rename( + vec!["name", "ISIN", "Weight"], + vec!["company", "cusip", "weight"], + ) + .with_columns([ + Series::new("date", [chrono::Local::now().date_naive()]).lit(), + Series::new("ticker", [None::]).lit(), + Series::new("market_value", [None::]).lit(), + Series::new("shares", [None::]).lit(), + Series::new("share_price", [None::]).lit(), + ]) + .collect()?; + } + + Ok(df.into()) +} + +pub fn df_format_europe_arkfundsio(df: DF) -> Result { + let mut df = df.collect()?; + + if df + .get_column_names() + .eq(&["company", "cusip", "date", "fund", "weight", "weight_rank"]) + { + _ = df.drop_in_place("fund"); + _ = df.drop_in_place("weight_rank"); + + df = df + .lazy() + .with_columns([ + Series::new("ticker", [None::]).lit(), + Series::new("market_value", [None::]).lit(), + Series::new("shares", [None::]).lit(), + Series::new("share_price", [None::]).lit(), + ]) + .collect()?; + } + Ok(df.into()) +} + +pub fn df_format_europe_csv(df: DF) -> Result { + let mut df = df.collect()?; + + if df.get_column_names().contains(&"_duplicated_0") { + df = df.slice(2, df.height()); + + df = df + .clone() + .lazy() + .rename(df.get_column_names(), ["company", "cusip", "weight"]) + .with_columns([ + Series::new("date", [chrono::Local::now().date_naive()]).lit(), + Series::new("ticker", [None::]).lit(), + Series::new("market_value", [None::]).lit(), + Series::new("shares", [None::]).lit(), + Series::new("share_price", [None::]).lit(), + ]) + .collect()?; + } + + Ok(df.into()) +}