This commit is contained in:
Elijah McMorris 2023-11-21 19:07:47 +00:00
parent 4063fa3c0f
commit acf6858c6c
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
5 changed files with 40 additions and 85 deletions

View file

@ -63,7 +63,9 @@
"christian-kohler.path-intellisense", "christian-kohler.path-intellisense",
"Gruntfuggly.todo-tree", "Gruntfuggly.todo-tree",
"ms-azuretools.vscode-docker", "ms-azuretools.vscode-docker",
"redhat.vscode-yaml" "redhat.vscode-yaml",
"GitHub.copilot",
"GitHub.copilot-chat"
] ]
} }
} }

View file

@ -16,7 +16,7 @@ env:
jobs: jobs:
run-tests: run-tests:
name: Run tests name: run tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
@ -31,7 +31,7 @@ jobs:
run: cargo nextest run -E "all() - test(get_api) - kind(bin)" run: cargo nextest run -E "all() - test(get_api) - kind(bin)"
clippy: clippy:
name: Clippy name: clippy
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3

View file

@ -5,7 +5,7 @@ version = "1.0.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
polars = { version = "0.32", features = [ polars = { version = "0.35", features = [
"lazy", "lazy",
"strings", "strings",
"parquet", "parquet",
@ -27,6 +27,7 @@ serde_json = "1.0"
rand = "0.8" rand = "0.8"
futures = "0.3" futures = "0.3"
lazy_static = "1.4" lazy_static = "1.4"
anyhow = "1.0"
[dev-dependencies] [dev-dependencies]
serial_test = "*" serial_test = "*"

View file

@ -1,11 +1,10 @@
use anyhow::{Error, Result};
use clokwerk::{AsyncScheduler, Job, TimeUnits}; use clokwerk::{AsyncScheduler, Job, TimeUnits};
use futures::future::join_all; use futures::future::join_all;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use polars::prelude::DataFrame; use polars::prelude::DataFrame;
use rand::Rng; use rand::Rng;
use std::env; use std::env;
use std::error::Error;
use std::result::Result;
use std::str::FromStr; use std::str::FromStr;
use std::thread; use std::thread;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
@ -32,7 +31,7 @@ fn print_df(ticker: &Ticker, df: &DataFrame) {
); );
} }
fn csv_merge() -> Result<(), Box<dyn Error>> { fn csv_merge() -> Result<(), Error> {
for ticker in Ticker::iter() { for ticker in Ticker::iter() {
let df = Ark::merge_old_csv_to_parquet(ticker, None)? let df = Ark::merge_old_csv_to_parquet(ticker, None)?
.format()? .format()?
@ -44,7 +43,7 @@ fn csv_merge() -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
fn ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error>> { fn ark_plan(ticker: Ticker) -> Result<(), Error> {
println!("Starting: {:#?}", ticker); println!("Starting: {:#?}", ticker);
let sec = Duration::from_secs(rand::thread_rng().gen_range(30 * 60..=4 * 60 * 60)); let sec = Duration::from_secs(rand::thread_rng().gen_range(30 * 60..=4 * 60 * 60));
// sleep(sec).await; // sleep(sec).await;
@ -59,7 +58,7 @@ fn ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error + Send>> { async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Error> {
task::spawn_blocking(move || ark_plan(ticker).unwrap()) task::spawn_blocking(move || ark_plan(ticker).unwrap())
.await .await
.unwrap(); .unwrap();

View file

@ -1,3 +1,4 @@
use anyhow::{anyhow, Error, Result};
use chrono::{Duration, NaiveDate}; use chrono::{Duration, NaiveDate};
use glob::glob; use glob::glob;
use polars::datatypes::DataType; use polars::datatypes::DataType;
@ -7,12 +8,9 @@ use reqwest::blocking::Client;
use reqwest::header; use reqwest::header;
use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::Value; use serde_json::Value;
use std::error::Error;
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::io::Cursor; use std::io::Cursor;
use std::path::Path; use std::path::Path;
use std::result::Result;
use strum_macros::{EnumIter, EnumString}; use strum_macros::{EnumIter, EnumString};
#[derive(Debug, Default, strum_macros::Display, EnumIter, Clone, Copy, PartialEq)] #[derive(Debug, Default, strum_macros::Display, EnumIter, Clone, Copy, PartialEq)]
@ -56,7 +54,7 @@ impl From<DataFrame> for DF {
} }
} }
impl DF { impl DF {
pub fn collect(self) -> Result<DataFrame, Box<dyn Error>> { pub fn collect(self) -> Result<DataFrame, Error> {
match self { match self {
DF::LazyFrame(x) => Ok(x.collect()?), DF::LazyFrame(x) => Ok(x.collect()?),
DF::DataFrame(x) => Ok(x), DF::DataFrame(x) => Ok(x),
@ -106,11 +104,7 @@ pub struct Ark {
path: Option<String>, path: Option<String>,
} }
impl Ark { impl Ark {
pub fn new( pub fn new(source: Source, ticker: Ticker, path: Option<String>) -> Result<Self, Error> {
source: Source,
ticker: Ticker,
path: Option<String>,
) -> Result<Self, Box<dyn Error>> {
let existing_file = Self::read_parquet(&ticker, path.as_ref()).is_ok(); let existing_file = Self::read_parquet(&ticker, path.as_ref()).is_ok();
let mut ark = Self { let mut ark = Self {
@ -150,11 +144,11 @@ impl Ark {
Ok(ark) Ok(ark)
} }
pub fn collect(self) -> Result<DataFrame, Box<dyn Error>> { pub fn collect(self) -> Result<DataFrame, Error> {
self.df.collect() self.df.collect()
} }
pub fn write_parquet(self) -> Result<Self, Box<dyn Error>> { pub fn write_parquet(self) -> Result<Self, Error> {
// with format df // with format df
let ark = self.format()?; let ark = self.format()?;
Self::write_df_parquet( Self::write_df_parquet(
@ -167,7 +161,7 @@ impl Ark {
Ok(ark) Ok(ark)
} }
fn write_df_parquet(path: String, df: DF) -> Result<(), Box<dyn Error>> { fn write_df_parquet(path: String, df: DF) -> Result<(), Error> {
if let Some(parent) = Path::new(&path).parent() { if let Some(parent) = Path::new(&path).parent() {
if !parent.exists() { if !parent.exists() {
create_dir_all(parent)?; create_dir_all(parent)?;
@ -177,7 +171,7 @@ impl Ark {
Ok(()) Ok(())
} }
fn read_parquet(ticker: &Ticker, path: Option<&String>) -> Result<DF, Box<dyn Error>> { fn read_parquet(ticker: &Ticker, path: Option<&String>) -> Result<DF, Error> {
let df = LazyFrame::scan_parquet( let df = LazyFrame::scan_parquet(
match path { match path {
Some(p) => format!("{}/{}.parquet", p, ticker), Some(p) => format!("{}/{}.parquet", p, ticker),
@ -188,19 +182,19 @@ impl Ark {
Ok(df.into()) Ok(df.into())
} }
pub fn sort(mut self) -> Result<Self, Box<dyn Error>> { pub fn sort(mut self) -> Result<Self, Error> {
self.df = Self::df_sort(self.df)?; self.df = Self::df_sort(self.df)?;
Ok(self) Ok(self)
} }
pub fn df_sort(df: DF) -> Result<DF, Box<dyn Error>> { pub fn df_sort(df: DF) -> Result<DF, Error> {
Ok(df Ok(df
.collect()? .collect()?
.sort(["date", "weight"], vec![false, true], false)? .sort(["date", "weight"], vec![false, true], false)?
.into()) .into())
} }
fn concat_df(dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> { fn concat_df(dfs: Vec<DF>) -> Result<DF, Error> {
// with dedupe // with dedupe
let df = concat( let df = concat(
dfs.lazy(), dfs.lazy(),
@ -211,7 +205,7 @@ impl Ark {
Self::dedupe(df.into()) Self::dedupe(df.into())
} }
pub fn dedupe(mut df: DF) -> Result<DF, Box<dyn Error>> { pub fn dedupe(mut df: DF) -> Result<DF, Error> {
df = df df = df
.lazy() .lazy()
.unique_stable(None, UniqueKeepStrategy::First) .unique_stable(None, UniqueKeepStrategy::First)
@ -219,12 +213,12 @@ impl Ark {
Ok(df) Ok(df)
} }
pub fn format(mut self) -> Result<Self, Box<dyn Error>> { pub fn format(mut self) -> Result<Self, Error> {
self.df = Self::df_format(self.df)?; self.df = Self::df_format(self.df)?;
Ok(self) Ok(self)
} }
pub fn df_format(df: DF) -> Result<DF, Box<dyn Error>> { pub fn df_format(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?; let mut df = df.collect()?;
if df.get_column_names().contains(&"market_value_($)") { if df.get_column_names().contains(&"market_value_($)") {
@ -272,7 +266,7 @@ impl Ark {
if !df.fields().contains(&Field::new("date", DataType::Date)) { if !df.fields().contains(&Field::new("date", DataType::Date)) {
let date_format = let date_format =
|mut df: DataFrame, format: Option<String>| -> Result<DataFrame, Box<dyn Error>> { |mut df: DataFrame, format: Option<String>| -> Result<DataFrame, Error> {
df = df df = df
.lazy() .lazy()
.with_column(col("date").str().strptime( .with_column(col("date").str().strptime(
@ -282,11 +276,12 @@ impl Ark {
strict: false, strict: false,
..Default::default() ..Default::default()
}, },
lit("earliest"),
)) ))
.collect()?; .collect()?;
if df.column("date").unwrap().null_count() > df.height() / 10 { if df.column("date").unwrap().null_count() > df.height() / 10 {
return Err("wrong date format".into()); return Err(anyhow!("wrong date format"));
} }
Ok(df) Ok(df)
@ -347,62 +342,24 @@ impl Ark {
expressions.push( expressions.push(
col("ticker") col("ticker")
.str() .str()
.replace_all(lit(" FP"), lit(""), true) .replace_all(lit("(?i) fp| uq| un| uw"), lit(""), true)
.str()
.replace_all(lit(" UQ"), lit(""), true)
.str()
.replace_all(lit(" UF"), lit(""), true)
.str()
.replace_all(lit(" UN"), lit(""), true)
.str()
.replace_all(lit(" UW"), lit(""), true)
.str() .str()
.replace(lit("DKNN"), lit("DKNG"), true) .replace(lit("DKNN"), lit("DKNG"), true)
.str() .str()
.rstrip(None), .strip_chars(lit("None")),
); );
expressions.push( expressions.push(
col("company") col("company")
.str() .str()
.replace_all(lit("-A"), lit(""), true) .replace_all(lit("(?i) a| cl| class| inc| ltd| corp| corporation| c| cl| se| hold| holdings| international|-|,|."), lit(""), true)
.str() .str()
.replace_all(lit("- A"), lit(""), true) .replace(lit("(?i)Coinbase Global"), lit("Coinbase"), true)
.str()
.replace_all(lit("CL A"), lit(""), true)
.str()
.replace_all(lit("CLASS A"), lit(""), true)
.str()
.replace_all(lit("Inc"), lit(""), true)
.str()
.replace_all(lit("INC"), lit(""), true)
.str()
.replace_all(lit("incorporated"), lit(""), true)
.str()
.replace_all(lit("LTD"), lit(""), true)
.str()
.replace_all(lit("CORP"), lit(""), true)
.str()
.replace_all(lit("CORPORATION"), lit(""), true)
.str()
.replace_all(lit("Corporation"), lit(""), true)
.str()
.replace_all(lit("- C"), lit(""), true)
.str()
.replace_all(lit("-"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.str()
.replace_all(lit("."), lit(""), true)
.str()
.replace(lit("COINBASE GLOBAL"), lit("COINBASE"), true)
.str()
.replace(lit("Coinbase Global"), lit("Coinbase"), true)
.str() .str()
.replace(lit("Blackdaemon"), lit("Blockdaemon"), true) .replace(lit("Blackdaemon"), lit("Blockdaemon"), true)
.str() .str()
.replace(lit("DISCOVERY"), lit("Dassault Systemes"), true) .replace(lit("DISCOVERY"), lit("Dassault Systemes"), true)
.str() .str()
.rstrip(None), .strip_chars(lit("None")),
); );
// run expressions // run expressions
@ -452,7 +409,7 @@ impl Ark {
&self, &self,
last_day: Option<NaiveDate>, last_day: Option<NaiveDate>,
source: Option<&Source>, source: Option<&Source>,
) -> Result<DataFrame, Box<dyn Error>> { ) -> Result<DataFrame, Error> {
let url = match (&self.ticker, last_day) { let url = match (&self.ticker, last_day) {
(self::Ticker::ARKVC, Some(last_day)) => format!( (self::Ticker::ARKVC, Some(last_day)) => format!(
"https://api.nexveridian.com/arkvc_holdings?start={}", "https://api.nexveridian.com/arkvc_holdings?start={}",
@ -496,7 +453,7 @@ impl Ark {
Ok(df) Ok(df)
} }
pub fn get_csv_ark(&self) -> Result<DataFrame, Box<dyn Error>> { pub fn get_csv_ark(&self) -> Result<DataFrame, Error> {
let url = match self.ticker { let url = match self.ticker {
self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(),
_ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker), _ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker),
@ -504,10 +461,7 @@ impl Ark {
Reader::Csv.get_data_url(url) Reader::Csv.get_data_url(url)
} }
pub fn merge_old_csv_to_parquet( pub fn merge_old_csv_to_parquet(ticker: Ticker, path: Option<String>) -> Result<Self, Error> {
ticker: Ticker,
path: Option<String>,
) -> Result<Self, Box<dyn Error>> {
let mut dfs = vec![]; let mut dfs = vec![];
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?); dfs.push(LazyCsvReader::new(x).finish()?);
@ -535,7 +489,7 @@ pub enum Reader {
} }
impl Reader { impl Reader {
pub fn get_data_url(&self, url: String) -> Result<DataFrame, Box<dyn Error>> { pub fn get_data_url(&self, url: String) -> Result<DataFrame, Error> {
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
headers.insert( headers.insert(
header::USER_AGENT, header::USER_AGENT,
@ -559,11 +513,10 @@ impl Reader {
.send()?; .send()?;
if !response.status().is_success() { if !response.status().is_success() {
return Err(format!( return Err(anyhow!(
"HTTP request failed with status code: {:?}", "HTTP request failed with status code: {:?}",
response.status() response.status()
) ));
.into());
} }
let data = response.text()?.into_bytes(); let data = response.text()?.into_bytes();
@ -591,7 +544,7 @@ mod tests {
#[test] #[test]
#[serial] #[serial]
fn read_write_parquet() -> Result<(), Box<dyn Error>> { fn read_write_parquet() -> Result<(), Error> {
let test_df = df![ let test_df = df![
"date" => ["2023-01-01"], "date" => ["2023-01-01"],
"ticker" => ["TSLA"], "ticker" => ["TSLA"],