This commit is contained in:
Elijah McMorris 2023-07-27 01:21:34 +00:00
parent 9ddfb2a563
commit 7391a9fb31
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
4 changed files with 73 additions and 35 deletions

View file

@ -25,6 +25,7 @@ chrono = { version = "0.4", features = ["serde"] }
serde_json = "1.0" serde_json = "1.0"
rand = "0.8" rand = "0.8"
futures = "0.3" futures = "0.3"
lazy_static = "1.4.0"
[dev-dependencies] [dev-dependencies]
serial_test = "*" serial_test = "*"

View file

@ -4,14 +4,14 @@ services:
image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest
# image: ark-invest-api-rust-data # image: ark-invest-api-rust-data
container_name: ark-invest-api-rust-data container_name: ark-invest-api-rust-data
build:
context: .
target: test
args:
DOCKER_BUILDKIT: 1
restart: unless-stopped restart: unless-stopped
# environment:
# - ARK_SOURCE=ApiIncremental
# - STARTUP_CSV_MERGE=true
# - STARTUP_ARK_ETF=true
volumes: volumes:
- ./data:/ark-invest-api-rust-data/data - ./data:/ark-invest-api-rust-data/data
# ark-invest-api-rust-data-test: # ark-invest-api-rust-data-test:
# container_name: ark-invest-api-rust-data-test # container_name: ark-invest-api-rust-data-test
# build: # build:

View file

@ -1,53 +1,89 @@
use clokwerk::{AsyncScheduler, Job, TimeUnits}; use clokwerk::{AsyncScheduler, Job, TimeUnits};
use futures::future::join_all; use futures::future::join_all;
use lazy_static::lazy_static;
use rand::Rng; use rand::Rng;
use std::env;
use std::error::Error; use std::error::Error;
use std::result::Result; use std::result::Result;
use std::str::FromStr;
use std::thread; use std::thread;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::task; use tokio::task;
use tokio::time::{sleep, Duration}; use tokio::time::Duration;
mod util; mod util;
use util::*; use util::*;
#[tokio::main] lazy_static! {
async fn main() { static ref SOURCE: Source = match env::var("ARK_SOURCE") {
let mut scheduler = AsyncScheduler::new(); Ok(val) => Source::from_str(val.as_str()).expect("Env string SOURCE is not in enum Source"),
println!("Scheduler Started"); Err(_e) => Source::Ark,
};
}
fn ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error>> { fn csv_merge() -> Result<(), Box<dyn Error>> {
for ticker in Ticker::iter() {
let df = Ark::merge_old_csv_to_parquet(ticker, None)?
.format()?
.sort()?
.write_parquet()?
.collect();
println!("Ticker: {:#?}\n{:#?}", ticker, df);
}
Ok(())
}
fn ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error>> {
println!("Starting: {:#?}", ticker); println!("Starting: {:#?}", ticker);
let sec = Duration::from_secs(rand::thread_rng().gen_range(5 * 60..=30 * 60)); let sec = Duration::from_secs(rand::thread_rng().gen_range(5 * 60..=30 * 60));
// sleep(sec).await; // sleep(sec).await;
thread::sleep(sec); thread::sleep(sec);
let df = Ark::new(Source::Ark, ticker, None)? let df = Ark::new(*SOURCE, ticker, None)?
.format()? .format()?
.write_parquet()? .write_parquet()?
.collect()?; .collect()?;
println!("Ticker: {:#?}\n{:#?}", ticker, df.tail(Some(1))); println!("Ticker: {:#?}\n{:#?}", ticker, df.tail(Some(1)));
Ok(()) Ok(())
} }
async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error + Send>> { async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error + Send>> {
task::spawn_blocking(move || ark_plan(ticker).unwrap()) task::spawn_blocking(move || ark_plan(ticker).unwrap())
.await .await
.unwrap(); .unwrap();
Ok(()) Ok(())
} }
async fn ark_etf() { async fn ark_etf() {
let futures = Ticker::iter() let futures = Ticker::iter()
.filter(|&x| x != Ticker::ARKVC) .filter(|&x| x != Ticker::ARKVC)
.map(spawn_ark_plan) .map(spawn_ark_plan)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
join_all(futures).await; join_all(futures).await;
}
#[tokio::main]
async fn main() {
let mut scheduler = AsyncScheduler::new();
println!("Scheduler Started");
if env::var("STARTUP_CSV_MERGE")
.map(|v| v == "true")
.unwrap_or(false)
{
print!("Merging CSVs to Parquet...");
csv_merge().unwrap();
}
if env::var("STARTUP_ARK_ETF")
.map(|v| v == "true")
.unwrap_or(false)
{
ark_etf().await;
} }
// ark_etf().await;
scheduler.every(1.day()).at("11:30 pm").run(ark_etf); scheduler.every(1.day()).at("11:30 pm").run(ark_etf);
scheduler scheduler

View file

@ -12,7 +12,7 @@ use std::fs::{create_dir_all, File};
use std::io::Cursor; use std::io::Cursor;
use std::path::Path; use std::path::Path;
use std::result::Result; use std::result::Result;
use strum_macros::EnumIter; use strum_macros::{EnumIter, EnumString};
#[derive(strum_macros::Display, EnumIter, Clone, Copy, PartialEq, Debug)] #[derive(strum_macros::Display, EnumIter, Clone, Copy, PartialEq, Debug)]
pub enum Ticker { pub enum Ticker {
@ -80,6 +80,7 @@ impl DFS for Vec<DF> {
} }
} }
#[derive(EnumString, Clone, Copy)]
pub enum Source { pub enum Source {
Read, Read,
Ark, Ark,