This commit is contained in:
Elijah McMorris 2023-06-30 05:31:02 +00:00 committed by NexVeridian
parent ffb24e7943
commit 9ddfb2a563
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
10 changed files with 119 additions and 56 deletions

View file

@ -1,38 +1,59 @@
use clokwerk::{AsyncScheduler, Job, TimeUnits};
// use polars::prelude::LazyFrame;
// use polars::prelude::*;
use futures::future::join_all;
use rand::Rng;
use std::error::Error;
use std::result::Result;
use std::time::Duration;
use std::thread;
use strum::IntoEnumIterator;
use tokio::task;
use tokio::time::{sleep, Duration};
mod util;
use util::*;
#[tokio::main]
async fn main() {
let mut scheduler = AsyncScheduler::new();
println!("Scheduler Started");
scheduler.every(1.day()).at("11:30 pm").run(|| async {
for x in Ticker::iter() {
if x == Ticker::ARKVC {
continue;
}
let plan = || -> Result<(), Box<dyn Error>> {
let df = Ark::new(Source::Ark, x, None)?
.format()?
.write_parquet()?
.collect()?;
println!("{:#?}", df.head(Some(1)));
Ok(())
};
fn ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error>> {
println!("Starting: {:#?}", ticker);
let sec = Duration::from_secs(rand::thread_rng().gen_range(5 * 60..=30 * 60));
// sleep(sec).await;
thread::sleep(sec);
if plan().is_ok() {}
let sec = rand::thread_rng().gen_range(10..=30);
tokio::time::sleep(Duration::from_secs(sec)).await;
}
});
let df = Ark::new(Source::Ark, ticker, None)?
.format()?
.write_parquet()?
.collect()?;
println!("Ticker: {:#?}\n{:#?}", ticker, df.tail(Some(1)));
Ok(())
}
async fn spawn_ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error + Send>> {
task::spawn_blocking(move || ark_plan(ticker).unwrap())
.await
.unwrap();
Ok(())
}
async fn ark_etf() {
let futures = Ticker::iter()
.filter(|&x| x != Ticker::ARKVC)
.map(spawn_ark_plan)
.collect::<Vec<_>>();
join_all(futures).await;
}
// ark_etf().await;
scheduler.every(1.day()).at("11:30 pm").run(ark_etf);
scheduler
.every(5.day())
.at("11:30 pm")
.run(|| async { if spawn_ark_plan(Ticker::ARKVC).await.is_ok() {} });
loop {
scheduler.run_pending().await;
@ -40,24 +61,3 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
// fn main() -> Result<(), Box<dyn std::error::Error>> {
// let csv = Ark::merge_old_csv_to_parquet(Ticker::ARKK, None)?
// .format()?
// .write_parquet()?
// .collect()?;
// println!("{:#?}", csv);
// let read = Ark::new(Source::Read, Ticker::ARKK, None)?.collect()?;
// println!("{:#?}", read.dtypes());
// println!("{:#?}", read.get_column_names());
// println!("{:#?}", read);
// let api = Ark::new(Source::ApiFull, Ticker::ARKK, None)?.collect()?;
// println!("{:#?}", api);
// let ark = Ark::new(Source::Ark, Ticker::ARKK, None)?.collect()?;
// println!("{:#?}", ark);
// let ark = Ark::new(Source::Ark, Ticker::ARKVC, None)?.collect()?;
// println!("{:#?}", ark);
// Ok(())
// }