refactor: move format and data reader

This commit is contained in:
Elijah McMorris 2024-06-14 22:08:38 -07:00
parent 817f3c9baf
commit b344b9ac24
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
4 changed files with 199 additions and 188 deletions

View file

@ -1,19 +1,19 @@
use anyhow::{anyhow, Error, Result};
use chrono::{Duration, NaiveDate};
use data_reader::Reader;
use df::{DF, DFS};
use glob::glob;
use polars::datatypes::DataType;
use polars::lazy::dsl::StrptimeOptions;
use polars::prelude::*;
use reqwest::blocking::Client;
use serde_json::Value;
use std::fs::{create_dir_all, File};
use std::io::Cursor;
use std::path::Path;
use strum_macros::EnumString;
use ticker::{DataSource, Ticker};
pub mod data_reader;
pub mod df;
mod df_format;
pub mod ticker;
#[derive(Debug, Default, EnumString, Clone, Copy, PartialEq)]
@ -154,154 +154,13 @@ impl Ark {
Ok(self)
}
fn df_format_21shares(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"Weightings") {
df = df
.lazy()
.rename(
vec![
"Date",
"StockTicker",
"CUSIP",
"SecurityName",
"Shares",
"Price",
"MarketValue",
"Weightings",
],
vec![
"date",
"ticker",
"cusip",
"company",
"shares",
"share_price",
"market_value",
"weight",
],
)
.collect()?;
_ = df.drop_in_place("Account");
_ = df.drop_in_place("NetAssets");
_ = df.drop_in_place("SharesOutstanding");
_ = df.drop_in_place("CreationUnits");
_ = df.drop_in_place("MoneyMarketFlag");
}
Ok(df.into())
}
fn df_format_arkvx(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"CUSIP") {
df = df
.lazy()
.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"])
.collect()?;
}
if df.get_column_names().contains(&"weight (%)") {
df = df
.lazy()
.rename(vec!["weight (%)"], vec!["weight"])
.collect()?;
}
if !df.get_column_names().contains(&"market_value") {
df = df
.lazy()
.with_columns([
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
fn df_format_europe(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"Currency") {
_ = df.drop_in_place("Currency");
df = df
.lazy()
.rename(
vec!["name", "ISIN", "Weight"],
vec!["company", "cusip", "weight"],
)
.with_columns([
Series::new("date", [chrono::Local::now().date_naive()]).lit(),
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
fn df_format_europe_arkfundsio(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df
.get_column_names()
.eq(&["company", "cusip", "date", "fund", "weight", "weight_rank"])
{
_ = df.drop_in_place("fund");
_ = df.drop_in_place("weight_rank");
df = df
.lazy()
.with_columns([
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
fn df_format_europe_csv(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"_duplicated_0") {
df = df.slice(2, df.height());
df = df
.clone()
.lazy()
.rename(df.get_column_names(), ["company", "cusip", "weight"])
.with_columns([
Series::new("date", [chrono::Local::now().date_naive()]).lit(),
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
pub fn df_format(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
df = Self::df_format_europe_csv(df.into())?.collect()?;
df = Self::df_format_europe_arkfundsio(df.into())?.collect()?;
df = Self::df_format_21shares(df.into())?.collect()?;
df = Self::df_format_arkvx(df.into())?.collect()?;
df = Self::df_format_europe(df.into())?.collect()?;
df = df_format::df_format_europe_csv(df.into())?.collect()?;
df = df_format::df_format_europe_arkfundsio(df.into())?.collect()?;
df = df_format::df_format_21shares(df.into())?.collect()?;
df = df_format::df_format_arkvx(df.into())?.collect()?;
df = df_format::df_format_europe(df.into())?.collect()?;
if df.get_column_names().contains(&"market_value_($)") {
df = df
@ -706,44 +565,6 @@ impl Ark {
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Reader {
Csv,
Json,
}
impl Reader {
pub fn get_data_url(&self, url: String) -> Result<DataFrame, Error> {
let response = Client::builder()
.user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36")
.gzip(true)
.build()?
.get(url)
.send()?;
if !response.status().is_success() {
return Err(anyhow!(
"HTTP request failed with status code: {:?}",
response.status()
));
}
let data = response.text()?.into_bytes();
let df = match self {
Self::Csv => CsvReader::new(Cursor::new(data))
.has_header(true)
.finish()?,
Self::Json => {
let json_string = String::from_utf8(data)?;
let json: Value = serde_json::from_str(&json_string)?;
JsonReader::new(Cursor::new(json.to_string())).finish()?
}
};
Ok(df)
}
}
#[cfg(test)]
mod tests {
use super::*;

45
src/util/data_reader.rs Normal file
View file

@ -0,0 +1,45 @@
use anyhow::{anyhow, Error};
use polars::frame::DataFrame;
use polars::io::SerReader;
use polars::prelude::{CsvReader, JsonReader};
use reqwest::blocking::Client;
use serde_json::Value;
use std::io::Cursor;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Reader {
Csv,
Json,
}
impl Reader {
pub fn get_data_url(&self, url: String) -> anyhow::Result<DataFrame, Error> {
let response = Client::builder()
.user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36")
.gzip(true)
.build()?
.get(url)
.send()?;
if !response.status().is_success() {
return Err(anyhow!(
"HTTP request failed with status code: {:?}",
response.status()
));
}
let data = response.text()?.into_bytes();
let df = match self {
Self::Csv => CsvReader::new(Cursor::new(data))
.has_header(true)
.finish()?,
Self::Json => {
let json_string = String::from_utf8(data)?;
let json: Value = serde_json::from_str(&json_string)?;
JsonReader::new(Cursor::new(json.to_string())).finish()?
}
};
Ok(df)
}
}

View file

@ -35,7 +35,7 @@ impl DF {
}
}
#[allow(clippy::upper_case_acronyms)]
#[allow(clippy::upper_case_acronyms, dead_code)]
pub trait DFS {
fn lazy(self) -> Vec<LazyFrame>;
fn collect(self) -> Vec<DataFrame>;

145
src/util/df_format.rs Normal file
View file

@ -0,0 +1,145 @@
use anyhow::{Error, Result};
use polars::prelude::*;
use crate::util::df::DF;
pub fn df_format_21shares(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"Weightings") {
df = df
.lazy()
.rename(
vec![
"Date",
"StockTicker",
"CUSIP",
"SecurityName",
"Shares",
"Price",
"MarketValue",
"Weightings",
],
vec![
"date",
"ticker",
"cusip",
"company",
"shares",
"share_price",
"market_value",
"weight",
],
)
.collect()?;
_ = df.drop_in_place("Account");
_ = df.drop_in_place("NetAssets");
_ = df.drop_in_place("SharesOutstanding");
_ = df.drop_in_place("CreationUnits");
_ = df.drop_in_place("MoneyMarketFlag");
}
Ok(df.into())
}
pub fn df_format_arkvx(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"CUSIP") {
df = df
.lazy()
.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"])
.collect()?;
}
if df.get_column_names().contains(&"weight (%)") {
df = df
.lazy()
.rename(vec!["weight (%)"], vec!["weight"])
.collect()?;
}
if !df.get_column_names().contains(&"market_value") {
df = df
.lazy()
.with_columns([
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
pub fn df_format_europe(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"Currency") {
_ = df.drop_in_place("Currency");
df = df
.lazy()
.rename(
vec!["name", "ISIN", "Weight"],
vec!["company", "cusip", "weight"],
)
.with_columns([
Series::new("date", [chrono::Local::now().date_naive()]).lit(),
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
pub fn df_format_europe_arkfundsio(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df
.get_column_names()
.eq(&["company", "cusip", "date", "fund", "weight", "weight_rank"])
{
_ = df.drop_in_place("fund");
_ = df.drop_in_place("weight_rank");
df = df
.lazy()
.with_columns([
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
pub fn df_format_europe_csv(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"_duplicated_0") {
df = df.slice(2, df.height());
df = df
.clone()
.lazy()
.rename(df.get_column_names(), ["company", "cusip", "weight"])
.with_columns([
Series::new("date", [chrono::Local::now().date_naive()]).lit(),
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}