This commit is contained in:
Elijah McMorris 2023-08-10 10:10:45 +00:00 committed by NexVeridian@gmail.com
parent 1eda4729d4
commit fdc0515939
5 changed files with 183 additions and 82 deletions

View file

@ -1,4 +1,4 @@
use chrono::NaiveDate;
use chrono::{Duration, NaiveDate};
use glob::glob;
use polars::datatypes::DataType;
use polars::lazy::dsl::StrptimeOptions;
@ -10,15 +10,17 @@ use serde_json::Value;
use std::error::Error;
use std::fs::{create_dir_all, File};
use std::io::Cursor;
use std::path::Path;
use std::result::Result;
use strum_macros::{EnumIter, EnumString};
#[derive(strum_macros::Display, EnumIter, Clone, Copy, PartialEq, Debug)]
#[derive(Debug, Default, strum_macros::Display, EnumIter, Clone, Copy, PartialEq)]
pub enum Ticker {
ARKVC,
ARKF,
ARKG,
#[default]
ARKK,
ARKQ,
ARKW,
@ -80,17 +82,24 @@ impl DFS for Vec<DF> {
}
}
#[derive(EnumString, Clone, Copy)]
#[derive(Debug, Default, EnumString, Clone, Copy, PartialEq)]
pub enum Source {
// Reads Parquet file if exists
Read,
// From ARK Invest
Ark,
// From api.NexVeridian.com
#[default]
ApiIncremental,
// From api.NexVeridian.com, not usually nessisary, use ApiIncremental
ApiFull,
// From arkfunds.io/api, avoid using, use ApiIncremental instead
ArkFundsIoIncremental,
// From arkfunds.io/api, avoid using, use ApiFull instead
ArkFundsIoFull,
}
#[derive(Clone)]
pub struct Ark {
pub df: DF,
ticker: Ticker,
@ -102,11 +111,11 @@ impl Ark {
ticker: Ticker,
path: Option<String>,
) -> Result<Self, Box<dyn Error>> {
let existing_file = Self::read_parquet(ticker, path.clone()).is_ok();
let existing_file = Self::read_parquet(&ticker, path.as_ref()).is_ok();
let mut ark = Self {
df: match existing_file {
true => Self::read_parquet(ticker, path.clone())?,
true => Self::read_parquet(&ticker, path.as_ref())?,
false => DF::DataFrame(df!["date" => [""],]?),
},
ticker,
@ -115,22 +124,16 @@ impl Ark {
let update = match (source, existing_file) {
(Source::Read, false) => {
panic!("Can not read from file, file is empty, does not exist, or is locked")
panic!("Can not read from file. file is empty, does not exist, or is locked")
}
(Source::Read, true) => None,
(Source::Ark, _) => Some(ark.get_csv_ark()?),
(Source::ApiIncremental, true) => {
let last_day = ark
.df
.clone()
.collect()?
.column("date")
.unwrap()
.max()
.and_then(NaiveDate::from_num_days_from_ce_opt);
Some(ark.get_api(last_day)?)
(Source::ApiIncremental, true) | (Source::ArkFundsIoIncremental, true) => {
let last_day = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()
+ Duration::days(ark.df.clone().collect()?.column("date")?.max().unwrap());
Some(ark.get_api(Some(last_day), Some(&source))?)
}
(Source::ApiIncremental, false) | (Source::ApiFull, _) => Some(ark.get_api(None)?),
_ => Some(ark.get_api(None, Some(&source))?),
};
if let Some(update) = update {
@ -174,7 +177,7 @@ impl Ark {
Ok(())
}
fn read_parquet(ticker: Ticker, path: Option<String>) -> Result<DF, Box<dyn Error>> {
fn read_parquet(ticker: &Ticker, path: Option<&String>) -> Result<DF, Box<dyn Error>> {
let df = LazyFrame::scan_parquet(
match path {
Some(p) => format!("{}/{}.parquet", p, ticker),
@ -186,20 +189,25 @@ impl Ark {
}
pub fn sort(mut self) -> Result<Self, Box<dyn Error>> {
self.df = Self::df_sort(self.df.clone())?;
self.df = Self::df_sort(self.df)?;
Ok(self)
}
pub fn df_sort(df: DF) -> Result<DF, Box<dyn Error>> {
Ok(df
.collect()?
.sort(["date", "weight"], vec![false, true])?
.sort(["date", "weight"], vec![false, true], false)?
.into())
}
fn concat_df(dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> {
// with dedupe
let df = concat(dfs.lazy(), false, true)?;
let df = concat(
dfs.lazy(),
UnionArgs {
..Default::default()
},
)?;
Self::dedupe(df.into())
}
@ -212,7 +220,7 @@ impl Ark {
}
pub fn format(mut self) -> Result<Self, Box<dyn Error>> {
self.df = Self::df_format(self.df.clone())?;
self.df = Self::df_format(self.df)?;
Ok(self)
}
@ -263,34 +271,32 @@ impl Ark {
}
if !df.fields().contains(&Field::new("date", DataType::Date)) {
let date_format = |mut df: DataFrame, format:Option<String>| -> Result<DataFrame, Box<dyn Error>> {
df = df
.lazy()
.with_column(col("date").str().strptime(
DataType::Date,
StrptimeOptions {
format,
strict: false,
exact: true,
cache: true,
},
))
.collect()?;
if df.column("date").unwrap().null_count() > df.height() / 10 {
return Err("wrong date format".into());
}
let date_format =
|mut df: DataFrame, format: Option<String>| -> Result<DataFrame, Box<dyn Error>> {
df = df
.lazy()
.with_column(col("date").str().strptime(
DataType::Date,
StrptimeOptions {
format,
strict: false,
..Default::default()
},
))
.collect()?;
Ok(df)
};
if df.column("date").unwrap().null_count() > df.height() / 10 {
return Err("wrong date format".into());
}
Ok(df)
};
if let Ok(x) = date_format(df.clone(), Some("%m/%d/%Y".into())) {
df = x
}
else if let Ok(x) = date_format(df.clone(), Some("%Y/%m/%d".into())) {
} else if let Ok(x) = date_format(df.clone(), Some("%Y/%m/%d".into())) {
df = x
}
else if let Ok(x) = date_format(df.clone(), None) {
} else if let Ok(x) = date_format(df.clone(), None) {
df = x
}
}
@ -321,6 +327,13 @@ impl Ark {
);
}
if df
.fields()
.contains(&Field::new("market_value", DataType::Float64))
{
expressions.push(col("market_value").cast(DataType::Int64));
}
if df.fields().contains(&Field::new("shares", DataType::Utf8)) {
expressions.push(
col("shares")
@ -330,6 +343,53 @@ impl Ark {
);
}
// rename values
expressions.push(
col("ticker")
.str()
.replace(lit("DKNG UW"), lit("DKNN"), true)
.str()
.replace(lit("NU UN"), lit("NU"), true)
.str()
.replace(lit("DSY"), lit("DSY FP"), true)
.str()
.replace(lit("GRMN UN"), lit("GRMN"), true)
.str()
.replace(lit("ARCT UQ"), lit("ARCT"), true)
.str()
.replace(lit("PRNT UF"), lit("PRNT"), true),
);
expressions.push(
col("company")
.str()
.replace_all(lit("-A"), lit(""), true)
.str()
.replace_all(lit("- A"), lit(""), true)
.str()
.replace_all(lit("-CL A"), lit(""), true)
.str()
.replace_all(lit("-CLASS A"), lit(""), true)
.str()
.replace_all(lit("Inc"), lit(""), true)
.str()
.replace_all(lit("INC"), lit(""), true)
.str()
.replace_all(lit("LTD"), lit(""), true)
.str()
.replace_all(lit("CORP"), lit(""), true)
.str()
.replace_all(lit("CORPORATION"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.str()
.replace_all(lit("."), lit(""), true)
.str()
.replace(lit("Blackdaemon"), lit("Blockdaemon"), true)
.str()
.rstrip(None),
);
// run expressions
df = df
.lazy()
.with_columns(expressions)
@ -372,25 +432,55 @@ impl Ark {
Ok(df.into())
}
pub fn get_api(&self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> {
let url = match (self.ticker, last_day) {
pub fn get_api(
&self,
last_day: Option<NaiveDate>,
source: Option<&Source>,
) -> Result<DataFrame, Box<dyn Error>> {
let url = match (&self.ticker, last_day) {
(self::Ticker::ARKVC, Some(last_day)) => format!(
"https://api.nexveridian.com/arkvc_holdings?start={}",
last_day
),
(tic, Some(last_day)) => format!(
"https://api.nexveridian.com/ark_holdings?ticker={}&start={}",
tic, last_day
),
(tic, Some(last_day)) => match source {
Some(Source::ArkFundsIoIncremental) => format!(
"https://arkfunds.io/api/v2/etf/holdings?symbol={}&date_from={}",
tic, last_day
),
_ => format!(
"https://api.nexveridian.com/ark_holdings?ticker={}&start={}",
tic, last_day
),
},
(self::Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(),
(tic, None) => {
format!("https://api.nexveridian.com/ark_holdings?ticker={}", tic)
}
(tic, None) => match source {
Some(Source::ArkFundsIoFull) => {
format!("https://arkfunds.io/api/v2/etf/holdings?symbol={}", tic)
}
_ => {
format!("https://api.nexveridian.com/ark_holdings?ticker={}", tic)
}
},
};
Reader::Json.get_data_url(url)
let mut df = Reader::Json.get_data_url(url)?;
df = match source {
Some(Source::ArkFundsIoIncremental) | Some(Source::ArkFundsIoFull) => {
df = df
.column("holdings")?
.clone()
.explode()?
.struct_()?
.clone()
.unnest();
df
}
_ => df,
};
Ok(df)
}
pub fn get_csv_ark(&self) -> Result<LazyFrame, Box<dyn Error>> {
pub fn get_csv_ark(&self) -> Result<DataFrame, Box<dyn Error>> {
let url = match self.ticker {
self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(),
_ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker),
@ -406,31 +496,38 @@ impl Ark {
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?);
}
let mut df = concat(dfs, false, true)?.into();
let mut df = concat(
dfs,
UnionArgs {
..Default::default()
},
)?
.into();
if Self::read_parquet(ticker, path.clone()).is_ok() {
let df_old = Self::read_parquet(ticker, path.clone())?;
if Self::read_parquet(&ticker, path.as_ref()).is_ok() {
let df_old = Self::read_parquet(&ticker, path.as_ref())?;
df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?;
}
Ok(Self { df, ticker, path })
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Reader {
Csv,
Json,
}
impl Reader {
pub fn get_data_url(&self, url: String) -> Result<LazyFrame, Box<dyn Error>> {
pub fn get_data_url(&self, url: String) -> Result<DataFrame, Box<dyn Error>> {
let mut headers = HeaderMap::new();
headers.insert(
header::USER_AGENT,
header::USER_AGENT,
HeaderValue::from_static("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"),
);
headers.insert(
header::ACCEPT,
header::ACCEPT,
HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8"),
);
headers.insert(
@ -439,9 +536,11 @@ impl Reader {
);
let response = Client::builder()
.default_headers(headers)
.gzip(true)
.build()?.get(url).send()?;
.default_headers(headers)
.gzip(true)
.build()?
.get(url)
.send()?;
if !response.status().is_success() {
return Err(format!(
@ -453,17 +552,14 @@ impl Reader {
let data = response.text()?.into_bytes();
let df: LazyFrame = match self {
let df = match self {
Self::Csv => CsvReader::new(Cursor::new(data))
.has_header(true)
.finish()?
.lazy(),
.finish()?,
Self::Json => {
let json_string = String::from_utf8(data)?;
let json: Value = serde_json::from_str(&json_string)?;
JsonReader::new(Cursor::new(json.to_string()))
.finish()?
.lazy()
JsonReader::new(Cursor::new(json.to_string())).finish()?
}
};