diff --git a/Cargo.toml b/Cargo.toml index 4c295c0..f2ca07b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ chrono = { version = "0.4", features = ["serde"] } serde_json = "1.0" rand = "0.8" futures = "0.3" +lazy_static = "1.4.0" [dev-dependencies] serial_test = "*" diff --git a/docker-compose.yml b/docker-compose.yml index f45ba81..675e2e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,14 +4,14 @@ services: image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest # image: ark-invest-api-rust-data container_name: ark-invest-api-rust-data - build: - context: . - target: test - args: - DOCKER_BUILDKIT: 1 restart: unless-stopped + # environment: + # - ARK_SOURCE=ApiIncremental + # - STARTUP_CSV_MERGE=true + # - STARTUP_ARK_ETF=true volumes: - ./data:/ark-invest-api-rust-data/data + # ark-invest-api-rust-data-test: # container_name: ark-invest-api-rust-data-test # build: diff --git a/src/main.rs b/src/main.rs index 50e8d0a..2b72ccd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,53 +1,89 @@ use clokwerk::{AsyncScheduler, Job, TimeUnits}; use futures::future::join_all; +use lazy_static::lazy_static; use rand::Rng; +use std::env; use std::error::Error; use std::result::Result; +use std::str::FromStr; use std::thread; use strum::IntoEnumIterator; use tokio::task; -use tokio::time::{sleep, Duration}; +use tokio::time::Duration; mod util; use util::*; +lazy_static! { + static ref SOURCE: Source = match env::var("ARK_SOURCE") { + Ok(val) => Source::from_str(val.as_str()).expect("Env string SOURCE is not in enum Source"), + Err(_e) => Source::Ark, + }; +} + +fn csv_merge() -> Result<(), Box> { + for ticker in Ticker::iter() { + let df = Ark::merge_old_csv_to_parquet(ticker, None)? + .format()? + .sort()? + .write_parquet()? + .collect(); + println!("Ticker: {:#?}\n{:#?}", ticker, df); + } + Ok(()) +} + +fn ark_plan(ticker: Ticker) -> Result<(), Box> { + println!("Starting: {:#?}", ticker); + let sec = Duration::from_secs(rand::thread_rng().gen_range(5 * 60..=30 * 60)); + // sleep(sec).await; + thread::sleep(sec); + + let df = Ark::new(*SOURCE, ticker, None)? + .format()? + .write_parquet()? + .collect()?; + + println!("Ticker: {:#?}\n{:#?}", ticker, df.tail(Some(1))); + Ok(()) +} + +async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box> { + task::spawn_blocking(move || ark_plan(ticker).unwrap()) + .await + .unwrap(); + Ok(()) +} + +async fn ark_etf() { + let futures = Ticker::iter() + .filter(|&x| x != Ticker::ARKVC) + .map(spawn_ark_plan) + .collect::>(); + + join_all(futures).await; +} + #[tokio::main] async fn main() { let mut scheduler = AsyncScheduler::new(); println!("Scheduler Started"); - fn ark_plan(ticker: Ticker) -> Result<(), Box> { - println!("Starting: {:#?}", ticker); - let sec = Duration::from_secs(rand::thread_rng().gen_range(5 * 60..=30 * 60)); - // sleep(sec).await; - thread::sleep(sec); - - let df = Ark::new(Source::Ark, ticker, None)? - .format()? - .write_parquet()? - .collect()?; - - println!("Ticker: {:#?}\n{:#?}", ticker, df.tail(Some(1))); - Ok(()) + if env::var("STARTUP_CSV_MERGE") + .map(|v| v == "true") + .unwrap_or(false) + { + print!("Merging CSVs to Parquet..."); + csv_merge().unwrap(); } - async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box> { - task::spawn_blocking(move || ark_plan(ticker).unwrap()) - .await - .unwrap(); - Ok(()) + if env::var("STARTUP_ARK_ETF") + .map(|v| v == "true") + .unwrap_or(false) + { + ark_etf().await; } - async fn ark_etf() { - let futures = Ticker::iter() - .filter(|&x| x != Ticker::ARKVC) - .map(spawn_ark_plan) - .collect::>(); - - join_all(futures).await; - } - - // ark_etf().await; scheduler.every(1.day()).at("11:30 pm").run(ark_etf); scheduler diff --git a/src/util.rs b/src/util.rs index b286c6c..cc91477 100644 --- a/src/util.rs +++ b/src/util.rs @@ -12,7 +12,7 @@ use std::fs::{create_dir_all, File}; use std::io::Cursor; use std::path::Path; use std::result::Result; -use strum_macros::EnumIter; +use strum_macros::{EnumIter, EnumString}; #[derive(strum_macros::Display, EnumIter, Clone, Copy, PartialEq, Debug)] pub enum Ticker { @@ -80,6 +80,7 @@ impl DFS for Vec { } } +#[derive(EnumString, Clone, Copy)] pub enum Source { Read, Ark,