This commit is contained in:
Elijah McMorris 2023-07-27 07:23:38 +00:00 committed by NexVeridian@gmail.com
parent 7391a9fb31
commit 1eda4729d4
6 changed files with 91 additions and 35 deletions

View file

@ -1,5 +1,6 @@
/.devcontainer /.devcontainer
/.vscode /.vscode
/data
/target /target
.dockerignore .dockerignore
Cargo.lock Cargo.lock

View file

@ -4,7 +4,7 @@ Fetches and caches ETF data daily, from csv download or api, and saves the data
Not affiliated with Ark Invest Not affiliated with Ark Invest
# Install for csv download # Install
Copy docker-compose.yml Copy docker-compose.yml
Create data folder next to docker-compose.yml Create data folder next to docker-compose.yml
@ -16,6 +16,26 @@ Create data folder next to docker-compose.yml
`docker compose up --pull always` `docker compose up --pull always`
# Changing the data source
In docker-compose.yml, change the data source by changing the environment variable
```
environment:
- ARK_SOURCE=ApiIncremental
```
Env string ARK_SOURCE must be in the enum Source
```
pub enum Source {
// Reads Parquet file if exists
Read,
// From ARK Invest
Ark,
// From api.NexVeridian.com (Default)
ApiIncremental,
// From api.NexVeridian.com, not usually nessisary, use ApiIncremental
ApiFull,
}
```
# Dev Install # Dev Install
## Dev Containers ## Dev Containers
Install docker, vscode and the [Dev Containers Extension](https://marketplace.visualstudio.com/items?itemName=ms-vscode-remote.remote-containers) Install docker, vscode and the [Dev Containers Extension](https://marketplace.visualstudio.com/items?itemName=ms-vscode-remote.remote-containers)
@ -31,15 +51,7 @@ Run tests with `cargo t`
## Docker Compose ## Docker Compose
`git clone` `git clone`
`docker compose build && docker compose up` `docker compose -f docker-compose.dev.yml build && docker compose -f docker-compose.dev.yml up`
Remove the cargo cache for buildkit with `docker builder prune --filter type=exec.cachemount` Remove the cargo cache for buildkit with `docker builder prune --filter type=exec.cachemount`
# Install for api
`git clone`
in main.rs change `Source::Ark` to `Source::ApiIncremental` or `Source::ApiFull` for first run
in docker-compose.yml remove this line`image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest`
uncomment everything else

30
docker-compose.dev.yml Normal file
View file

@ -0,0 +1,30 @@
version: "3"
services:
ark-invest-api-rust-data-test:
container_name: ark-invest-api-rust-data-test
build:
context: .
target: test
args:
DOCKER_BUILDKIT: 1
restart: no
ark-invest-api-rust-data:
# image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest
image: ark-invest-api-rust-data
container_name: ark-invest-api-rust-data
build:
context: .
target: main
args:
DOCKER_BUILDKIT: 1
restart: no
environment:
- ARK_SOURCE=ARK
# - STARTUP_CSV_MERGE=true
# - STARTUP_ARK_ETF=true
volumes:
- ./data:/ark-invest-api-rust-data/data
volumes:
data:

View file

@ -2,7 +2,6 @@ version: "3"
services: services:
ark-invest-api-rust-data: ark-invest-api-rust-data:
image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest image: ghcr.io/NexVeridian/ark-invest-api-rust-data:latest
# image: ark-invest-api-rust-data
container_name: ark-invest-api-rust-data container_name: ark-invest-api-rust-data
restart: unless-stopped restart: unless-stopped
# environment: # environment:
@ -12,14 +11,5 @@ services:
volumes: volumes:
- ./data:/ark-invest-api-rust-data/data - ./data:/ark-invest-api-rust-data/data
# ark-invest-api-rust-data-test:
# container_name: ark-invest-api-rust-data-test
# build:
# context: .
# target: test
# args:
# DOCKER_BUILDKIT: 1
# restart: no
volumes: volumes:
data: data:

View file

@ -1,6 +1,7 @@
use clokwerk::{AsyncScheduler, Job, TimeUnits}; use clokwerk::{AsyncScheduler, Job, TimeUnits};
use futures::future::join_all; use futures::future::join_all;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use polars::prelude::DataFrame;
use rand::Rng; use rand::Rng;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
@ -16,19 +17,29 @@ use util::*;
lazy_static! { lazy_static! {
static ref SOURCE: Source = match env::var("ARK_SOURCE") { static ref SOURCE: Source = match env::var("ARK_SOURCE") {
Ok(val) => Source::from_str(val.as_str()).expect("Env string SOURCE is not in enum Source"), Ok(val) =>
Err(_e) => Source::Ark, Source::from_str(val.as_str()).expect("Env string ARK_SOURCE is not in enum Source"),
Err(_e) => Source::ApiIncremental,
}; };
} }
fn print_df(ticker: &Ticker, df: &DataFrame) {
println!(
"Ticker: {:#?}\nShape: {:?}\n{:#?}",
ticker,
df.shape(),
df.tail(Some(1))
);
}
fn csv_merge() -> Result<(), Box<dyn Error>> { fn csv_merge() -> Result<(), Box<dyn Error>> {
for ticker in Ticker::iter() { for ticker in Ticker::iter() {
let df = Ark::merge_old_csv_to_parquet(ticker, None)? let df = Ark::merge_old_csv_to_parquet(ticker, None)?
.format()? .format()?
.sort()? .sort()?
.write_parquet()? .write_parquet()?
.collect(); .collect()?;
println!("Ticker: {:#?}\n{:#?}", ticker, df); print_df(&ticker, &df);
} }
Ok(()) Ok(())
} }
@ -44,7 +55,7 @@ fn ark_plan(ticker: Ticker) -> Result<(), Box<dyn Error>> {
.write_parquet()? .write_parquet()?
.collect()?; .collect()?;
println!("Ticker: {:#?}\n{:#?}", ticker, df.tail(Some(1))); print_df(&ticker, &df);
Ok(()) Ok(())
} }
@ -73,7 +84,7 @@ async fn main() {
.map(|v| v == "true") .map(|v| v == "true")
.unwrap_or(false) .unwrap_or(false)
{ {
print!("Merging CSVs to Parquet..."); println!("Merging CSVs to Parquet");
csv_merge().unwrap(); csv_merge().unwrap();
} }
@ -84,7 +95,7 @@ async fn main() {
ark_etf().await; ark_etf().await;
} }
scheduler.every(1.day()).at("11:30 pm").run(ark_etf); scheduler.every(1.day()).at("10:00 am").run(ark_etf);
scheduler scheduler
.every(5.day()) .every(5.day())

View file

@ -82,9 +82,13 @@ impl DFS for Vec<DF> {
#[derive(EnumString, Clone, Copy)] #[derive(EnumString, Clone, Copy)]
pub enum Source { pub enum Source {
// Reads Parquet file if exists
Read, Read,
// From ARK Invest
Ark, Ark,
// From api.NexVeridian.com
ApiIncremental, ApiIncremental,
// From api.NexVeridian.com, not usually nessisary, use ApiIncremental
ApiFull, ApiFull,
} }
pub struct Ark { pub struct Ark {
@ -259,25 +263,34 @@ impl Ark {
} }
if !df.fields().contains(&Field::new("date", DataType::Date)) { if !df.fields().contains(&Field::new("date", DataType::Date)) {
let date_format = |df: DataFrame, format: &str| -> Result<DataFrame, Box<dyn Error>> { let date_format = |mut df: DataFrame, format:Option<String>| -> Result<DataFrame, Box<dyn Error>> {
Ok(df df = df
.lazy() .lazy()
.with_column(col("date").str().strptime( .with_column(col("date").str().strptime(
DataType::Date, DataType::Date,
StrptimeOptions { StrptimeOptions {
format: Some(format.into()), format,
strict: false, strict: false,
exact: true, exact: true,
cache: true, cache: true,
}, },
)) ))
.collect()?) .collect()?;
if df.column("date").unwrap().null_count() > df.height() / 10 {
return Err("wrong date format".into());
}
Ok(df)
}; };
if let Ok(x) = date_format(df.clone(), "%m/%d/%Y") { if let Ok(x) = date_format(df.clone(), Some("%m/%d/%Y".into())) {
df = x df = x
} }
if let Ok(x) = date_format(df.clone(), "%Y/%m/%d") { else if let Ok(x) = date_format(df.clone(), Some("%Y/%m/%d".into())) {
df = x
}
else if let Ok(x) = date_format(df.clone(), None) {
df = x df = x
} }
} }
@ -397,9 +410,8 @@ impl Ark {
if Self::read_parquet(ticker, path.clone()).is_ok() { if Self::read_parquet(ticker, path.clone()).is_ok() {
let df_old = Self::read_parquet(ticker, path.clone())?; let df_old = Self::read_parquet(ticker, path.clone())?;
df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])? df = Self::concat_df(vec![Self::df_format(df_old)?, Self::df_format(df)?])?;
} }
Ok(Self { df, ticker, path }) Ok(Self { df, ticker, path })
} }
} }