This commit is contained in:
Elijah McMorris 2023-06-06 15:27:46 -07:00
parent 7e12fc1e65
commit be08848b0d
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
6 changed files with 162 additions and 72 deletions

2
.cargo/config.toml Normal file
View file

@ -0,0 +1,2 @@
# [target.x86_64-unknown-linux-gnu]
# rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"]

View file

@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/rust:bullseye
RUN rustup target add x86_64-unknown-linux-musl && \ RUN rustup target add x86_64-unknown-linux-musl && \
apt-get update && \ apt-get update && \
apt install -y build-essential xz-utils musl-tools musl-dev gcc-multilib && \ apt install -y build-essential xz-utils musl-tools musl-dev gcc-multilib pkg-config libssl-dev && \
rustup update rustup update
RUN LAZYGIT_VERSION=$(curl -s "https://api.github.com/repos/jesseduffield/lazygit/releases/latest" | grep -Po '"tag_name": "v\K[0-9.]+') && \ RUN LAZYGIT_VERSION=$(curl -s "https://api.github.com/repos/jesseduffield/lazygit/releases/latest" | grep -Po '"tag_name": "v\K[0-9.]+') && \

View file

@ -14,7 +14,6 @@
"--name", "--name",
"devcontainer-${containerWorkspaceFolderBasename}" "devcontainer-${containerWorkspaceFolderBasename}"
], ],
"initializeCommand": "docker rm -f devcontainer-${containerWorkspaceFolderBasename} || true",
// Use 'mounts' to make the cargo cache persistent in a Docker Volume. // Use 'mounts' to make the cargo cache persistent in a Docker Volume.
"mounts": [ "mounts": [
{ {
@ -29,8 +28,8 @@
"ghcr.io/devcontainers/features/git:1": {} "ghcr.io/devcontainers/features/git:1": {}
// "ghcr.io/devcontainers/features/nix:1": { // "ghcr.io/devcontainers/features/nix:1": {
// "packages": [ // "packages": [
// "btop" // "btop",
// // "lazygit" // "lazygit"
// ] // ]
// } // }
}, },
@ -41,7 +40,8 @@
// }, // },
"postAttachCommand": { "postAttachCommand": {
"AddGitSafeDir": "git config --global --add safe.directory /workspaces/${containerWorkspaceFolderBasename}", "AddGitSafeDir": "git config --global --add safe.directory /workspaces/${containerWorkspaceFolderBasename}",
"clippy": "cargo clippy" "cargo_update": "cargo update",
"clippy": "cargo clippy --fix --allow-dirty"
}, },
// Configure tool-specific properties. // Configure tool-specific properties.
// "customizations": {}, // "customizations": {},

View file

@ -9,6 +9,7 @@ polars = { version = "0.28", features = [
"strings", "strings",
"parquet", "parquet",
"round_series", "round_series",
"lazy_regex",
] } ] }
reqwest = { version = "0.11", features = ["blocking"] } reqwest = { version = "0.11", features = ["blocking"] }
glob = { version = "0.3" } glob = { version = "0.3" }

View file

@ -1,39 +1,50 @@
use clokwerk::Interval::*; // use clokwerk::{AsyncScheduler, Job, TimeUnits};
use clokwerk::{AsyncScheduler, Job, TimeUnits}; // use polars::prelude::LazyFrame;
use polars::prelude::LazyFrame; // use polars::prelude::*;
use polars::prelude::*; // use std::error::Error;
use std::error::Error; // use std::result::Result;
use std::result::Result; // use std::time::Duration;
use std::time::Duration; // use strum::IntoEnumIterator;
use strum::IntoEnumIterator;
mod util; mod util;
use util::*; use util::*;
#[tokio::main] // #[tokio::main]
async fn main() { // async fn main() {
let mut scheduler = AsyncScheduler::new(); // let mut scheduler = AsyncScheduler::new();
scheduler.every(1.day()).at("11:30 pm").run(|| async { // scheduler.every(1.day()).at("11:30 pm").run(|| async {
for x in Ticker::iter() { // for x in Ticker::iter() {
let plan = || -> Result<(), Box<dyn Error>> { // let plan = || -> Result<(), Box<dyn Error>> {
let df = LazyFrame::scan_parquet( // let df = LazyFrame::scan_parquet(
format!("data/old/{}/part.0.parquet", x), // format!("data/old/{}/part.0.parquet", x),
ScanArgsParquet::default(), // ScanArgsParquet::default(),
)?; // )?;
let df = df_format(x, df)?; // let df = df_format(x, df)?;
write_parquet(x, df)?; // write_parquet(x, df)?;
Ok(()) // Ok(())
}; // };
if let Ok(_) = plan() {} // if plan().is_ok() {}
} // }
}); // });
let dfn = read_parquet(Ticker::ARKF).unwrap().collect().unwrap(); // let dfn = read_parquet(Ticker::ARKF).unwrap().collect().unwrap();
// println!("{:#?}", dfn);
// loop {
// scheduler.run_pending().await;
// // tokio::time::sleep(Duration::from_millis(10)).await;
// tokio::time::sleep(Duration::from_secs(1)).await;
// }
// }
fn main() {
let dfn = df_format(Ticker::ARKF, read_parquet(Ticker::ARKF).unwrap()).unwrap();
println!("{:#?}", dfn); println!("{:#?}", dfn);
loop {
scheduler.run_pending().await; // update_parquet(Ticker::ARKF).unwrap();
// tokio::time::sleep(Duration::from_millis(10)).await; let update = df_format(Ticker::ARKF, get_csv(Ticker::ARKF).unwrap()).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await; // let update = get_csv(Ticker::ARKF).unwrap().collect().unwrap();
}
// let x = read_parquet(Ticker::ARKF).unwrap().collect().unwrap();
println!("{:#?}", update);
} }

View file

@ -38,7 +38,7 @@ impl Ticker {
pub fn merge_csv_to_parquet(folder: Ticker) -> Result<(), Box<dyn Error>> { pub fn merge_csv_to_parquet(folder: Ticker) -> Result<(), Box<dyn Error>> {
let mut dfs = vec![]; let mut dfs = vec![];
for x in glob(&format!("data/csv/{}/*", folder.to_string()))?.filter_map(Result::ok) { for x in glob(&format!("data/csv/{}/*", folder))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?); dfs.push(LazyCsvReader::new(x).finish()?);
} }
@ -61,24 +61,21 @@ pub fn update_parquet(ticker: Ticker) -> Result<(), Box<dyn Error>> {
pub fn read_parquet(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> { pub fn read_parquet(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> {
let df = LazyFrame::scan_parquet( let df = LazyFrame::scan_parquet(
format!("data/parquet/{}.parquet", ticker.to_string()), format!("data/parquet/{}.parquet", ticker),
ScanArgsParquet::default(), ScanArgsParquet::default(),
)?; )?;
Ok(df) Ok(df)
} }
pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box<dyn Error>> { pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box<dyn Error>> {
ParquetWriter::new(File::create(format!( ParquetWriter::new(File::create(format!("data/parquet/{}.parquet", ticker))?)
"data/parquet/{}.parquet",
ticker.to_string()
))?)
.finish(&mut df)?; .finish(&mut df)?;
Ok(()) Ok(())
} }
pub fn df_format(folder: Ticker, mut dfl: LazyFrame) -> Result<DataFrame, Box<dyn Error>> { pub fn df_format(ticker: Ticker, mut dfl: LazyFrame) -> Result<DataFrame, Box<dyn Error>> {
match folder { match ticker {
Ticker::ARKVC => { Ticker::ARKVC => {
dfl = dfl.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]); dfl = dfl.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]);
@ -105,26 +102,95 @@ pub fn df_format(folder: Ticker, mut dfl: LazyFrame) -> Result<DataFrame, Box<dy
} }
_ => { _ => {
let mut df = dfl.collect()?; let mut df = dfl.collect()?;
let col_names = df.get_column_names();
if let Ok(_) = df.rename("market_value_($)", "market_value") {} if df.rename("market_value_($)", "market_value").is_ok() {}
if let Ok(_) = df.rename("weight_(%)", "weight") {} if df.rename("market value ($)", "market_value").is_ok() {}
if df.rename("weight_(%)", "weight").is_ok() {}
if let Ok(x) = df if df.rename("weight (%)", "weight").is_ok() {}
.clone() if df.get_column_names().contains(&"fund") {
.lazy() _ = df.drop_in_place("fund");
.with_column(col("date").cast(DataType::Date))
.filter(col("date").is_not_null())
.collect()
{
df = x
} else if let Ok(x) = df
.clone()
.lazy()
.filter(col("date").is_not_null())
.collect()
{
df = x
} }
if df.get_column_names().contains(&"weight_rank") {
_ = df.drop_in_place("weight_rank");
}
let mut expressions: Vec<Expr> = vec![];
if !df.fields().contains(&Field::new("date", DataType::Date)) {
expressions.push(col("date").str().strptime(StrpTimeOptions {
date_dtype: DataType::Date,
fmt: Some("%m/%d/%Y".into()),
strict: false,
exact: true,
cache: false,
tz_aware: false,
utc: false,
}));
}
if df.fields().contains(&Field::new("weight", DataType::Utf8)) {
expressions.push(
col("weight")
.str()
.replace(lit("%"), lit(""), true)
.cast(DataType::Float64),
);
}
if df
.fields()
.contains(&Field::new("market_value", DataType::Utf8))
{
expressions.push(
col("market_value")
.str()
.replace(lit("$"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Float64)
.cast(DataType::Int64),
);
}
if df.fields().contains(&Field::new("shares", DataType::Utf8)) {
expressions.push(
col("shares")
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Int64),
);
}
df = df
.lazy()
.with_columns(expressions)
.filter(col("date").is_not_null())
.collect()?;
if !df.get_column_names().contains(&"share_price") {
df = df
.lazy()
.with_column(
(col("market_value").cast(DataType::Float64)
/ col("shares").cast(DataType::Float64))
.alias("share_price")
.cast(DataType::Float64)
.round(2),
)
.collect()?;
}
df = df.select([
"date",
"ticker",
"cusip",
"company",
"market_value",
"shares",
"share_price",
"weight",
])?;
Ok(df) Ok(df)
} }
@ -132,22 +198,32 @@ pub fn df_format(folder: Ticker, mut dfl: LazyFrame) -> Result<DataFrame, Box<dy
} }
pub fn get_csv(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> { pub fn get_csv(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> {
let data: Vec<u8>; let url = match ticker {
let request;
match ticker {
Ticker::ARKVC => { Ticker::ARKVC => {
request = Client::new() "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned()
.get("https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv")
} }
_ => { _ => {
request = Client::new().get(format!( format!(
"https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", "https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv",
ticker.value(), ticker.value(),
ticker.to_string() ticker
)) )
} }
};
let response = Client::builder()
.user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3")
.build()?.get(url).send()?;
if !response.status().is_success() {
return Err(format!(
"HTTP request failed with status code: {:?}",
response.status()
)
.into());
} }
data = request.send()?.text()?.bytes().collect();
let data: Vec<u8> = response.text()?.bytes().collect();
let df = CsvReader::new(Cursor::new(data)) let df = CsvReader::new(Cursor::new(data))
.has_header(true) .has_header(true)