diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..08f4cbb --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +# [target.x86_64-unknown-linux-gnu] +# rustflags = ["-C", "link-arg=-fuse-ld=/usr/local/bin/mold"] diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 99521d0..510a66b 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/rust:bullseye RUN rustup target add x86_64-unknown-linux-musl && \ apt-get update && \ - apt install -y build-essential xz-utils musl-tools musl-dev gcc-multilib && \ + apt install -y build-essential xz-utils musl-tools musl-dev gcc-multilib pkg-config libssl-dev && \ rustup update RUN LAZYGIT_VERSION=$(curl -s "https://api.github.com/repos/jesseduffield/lazygit/releases/latest" | grep -Po '"tag_name": "v\K[0-9.]+') && \ diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 764ef89..74d2c75 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -14,7 +14,6 @@ "--name", "devcontainer-${containerWorkspaceFolderBasename}" ], - "initializeCommand": "docker rm -f devcontainer-${containerWorkspaceFolderBasename} || true", // Use 'mounts' to make the cargo cache persistent in a Docker Volume. "mounts": [ { @@ -29,8 +28,8 @@ "ghcr.io/devcontainers/features/git:1": {} // "ghcr.io/devcontainers/features/nix:1": { // "packages": [ - // "btop" - // // "lazygit" + // "btop", + // "lazygit" // ] // } }, @@ -41,7 +40,8 @@ // }, "postAttachCommand": { "AddGitSafeDir": "git config --global --add safe.directory /workspaces/${containerWorkspaceFolderBasename}", - "clippy": "cargo clippy" + "cargo_update": "cargo update", + "clippy": "cargo clippy --fix --allow-dirty" }, // Configure tool-specific properties. // "customizations": {}, diff --git a/Cargo.toml b/Cargo.toml index 361df35..3944ec3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ polars = { version = "0.28", features = [ "strings", "parquet", "round_series", + "lazy_regex", ] } reqwest = { version = "0.11", features = ["blocking"] } glob = { version = "0.3" } diff --git a/src/main.rs b/src/main.rs index 6f64ce7..532ce52 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,39 +1,50 @@ -use clokwerk::Interval::*; -use clokwerk::{AsyncScheduler, Job, TimeUnits}; -use polars::prelude::LazyFrame; -use polars::prelude::*; -use std::error::Error; -use std::result::Result; -use std::time::Duration; -use strum::IntoEnumIterator; +// use clokwerk::{AsyncScheduler, Job, TimeUnits}; +// use polars::prelude::LazyFrame; +// use polars::prelude::*; +// use std::error::Error; +// use std::result::Result; +// use std::time::Duration; +// use strum::IntoEnumIterator; mod util; use util::*; -#[tokio::main] -async fn main() { - let mut scheduler = AsyncScheduler::new(); - scheduler.every(1.day()).at("11:30 pm").run(|| async { - for x in Ticker::iter() { - let plan = || -> Result<(), Box> { - let df = LazyFrame::scan_parquet( - format!("data/old/{}/part.0.parquet", x), - ScanArgsParquet::default(), - )?; - let df = df_format(x, df)?; - write_parquet(x, df)?; - Ok(()) - }; +// #[tokio::main] +// async fn main() { +// let mut scheduler = AsyncScheduler::new(); +// scheduler.every(1.day()).at("11:30 pm").run(|| async { +// for x in Ticker::iter() { +// let plan = || -> Result<(), Box> { +// let df = LazyFrame::scan_parquet( +// format!("data/old/{}/part.0.parquet", x), +// ScanArgsParquet::default(), +// )?; +// let df = df_format(x, df)?; +// write_parquet(x, df)?; +// Ok(()) +// }; - if let Ok(_) = plan() {} - } - }); +// if plan().is_ok() {} +// } +// }); - let dfn = read_parquet(Ticker::ARKF).unwrap().collect().unwrap(); +// let dfn = read_parquet(Ticker::ARKF).unwrap().collect().unwrap(); +// println!("{:#?}", dfn); +// loop { +// scheduler.run_pending().await; +// // tokio::time::sleep(Duration::from_millis(10)).await; +// tokio::time::sleep(Duration::from_secs(1)).await; +// } +// } + +fn main() { + let dfn = df_format(Ticker::ARKF, read_parquet(Ticker::ARKF).unwrap()).unwrap(); println!("{:#?}", dfn); - loop { - scheduler.run_pending().await; - // tokio::time::sleep(Duration::from_millis(10)).await; - tokio::time::sleep(Duration::from_secs(1)).await; - } + + // update_parquet(Ticker::ARKF).unwrap(); + let update = df_format(Ticker::ARKF, get_csv(Ticker::ARKF).unwrap()).unwrap(); + // let update = get_csv(Ticker::ARKF).unwrap().collect().unwrap(); + + // let x = read_parquet(Ticker::ARKF).unwrap().collect().unwrap(); + println!("{:#?}", update); } diff --git a/src/util.rs b/src/util.rs index c20ae74..140d84e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -38,7 +38,7 @@ impl Ticker { pub fn merge_csv_to_parquet(folder: Ticker) -> Result<(), Box> { let mut dfs = vec![]; - for x in glob(&format!("data/csv/{}/*", folder.to_string()))?.filter_map(Result::ok) { + for x in glob(&format!("data/csv/{}/*", folder))?.filter_map(Result::ok) { dfs.push(LazyCsvReader::new(x).finish()?); } @@ -61,24 +61,21 @@ pub fn update_parquet(ticker: Ticker) -> Result<(), Box> { pub fn read_parquet(ticker: Ticker) -> Result> { let df = LazyFrame::scan_parquet( - format!("data/parquet/{}.parquet", ticker.to_string()), + format!("data/parquet/{}.parquet", ticker), ScanArgsParquet::default(), )?; Ok(df) } pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box> { - ParquetWriter::new(File::create(format!( - "data/parquet/{}.parquet", - ticker.to_string() - ))?) - .finish(&mut df)?; + ParquetWriter::new(File::create(format!("data/parquet/{}.parquet", ticker))?) + .finish(&mut df)?; Ok(()) } -pub fn df_format(folder: Ticker, mut dfl: LazyFrame) -> Result> { - match folder { +pub fn df_format(ticker: Ticker, mut dfl: LazyFrame) -> Result> { + match ticker { Ticker::ARKVC => { dfl = dfl.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]); @@ -105,26 +102,95 @@ pub fn df_format(folder: Ticker, mut dfl: LazyFrame) -> Result { let mut df = dfl.collect()?; + let col_names = df.get_column_names(); - if let Ok(_) = df.rename("market_value_($)", "market_value") {} - if let Ok(_) = df.rename("weight_(%)", "weight") {} - - if let Ok(x) = df - .clone() - .lazy() - .with_column(col("date").cast(DataType::Date)) - .filter(col("date").is_not_null()) - .collect() - { - df = x - } else if let Ok(x) = df - .clone() - .lazy() - .filter(col("date").is_not_null()) - .collect() - { - df = x + if df.rename("market_value_($)", "market_value").is_ok() {} + if df.rename("market value ($)", "market_value").is_ok() {} + if df.rename("weight_(%)", "weight").is_ok() {} + if df.rename("weight (%)", "weight").is_ok() {} + if df.get_column_names().contains(&"fund") { + _ = df.drop_in_place("fund"); } + if df.get_column_names().contains(&"weight_rank") { + _ = df.drop_in_place("weight_rank"); + } + + let mut expressions: Vec = vec![]; + + if !df.fields().contains(&Field::new("date", DataType::Date)) { + expressions.push(col("date").str().strptime(StrpTimeOptions { + date_dtype: DataType::Date, + fmt: Some("%m/%d/%Y".into()), + strict: false, + exact: true, + cache: false, + tz_aware: false, + utc: false, + })); + } + + if df.fields().contains(&Field::new("weight", DataType::Utf8)) { + expressions.push( + col("weight") + .str() + .replace(lit("%"), lit(""), true) + .cast(DataType::Float64), + ); + } + + if df + .fields() + .contains(&Field::new("market_value", DataType::Utf8)) + { + expressions.push( + col("market_value") + .str() + .replace(lit("$"), lit(""), true) + .str() + .replace_all(lit(","), lit(""), true) + .cast(DataType::Float64) + .cast(DataType::Int64), + ); + } + + if df.fields().contains(&Field::new("shares", DataType::Utf8)) { + expressions.push( + col("shares") + .str() + .replace_all(lit(","), lit(""), true) + .cast(DataType::Int64), + ); + } + + df = df + .lazy() + .with_columns(expressions) + .filter(col("date").is_not_null()) + .collect()?; + + if !df.get_column_names().contains(&"share_price") { + df = df + .lazy() + .with_column( + (col("market_value").cast(DataType::Float64) + / col("shares").cast(DataType::Float64)) + .alias("share_price") + .cast(DataType::Float64) + .round(2), + ) + .collect()?; + } + + df = df.select([ + "date", + "ticker", + "cusip", + "company", + "market_value", + "shares", + "share_price", + "weight", + ])?; Ok(df) } @@ -132,22 +198,32 @@ pub fn df_format(folder: Ticker, mut dfl: LazyFrame) -> Result Result> { - let data: Vec; - let request; - match ticker { + let url = match ticker { Ticker::ARKVC => { - request = Client::new() - .get("https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv") + "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned() } _ => { - request = Client::new().get(format!( + format!( "https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", ticker.value(), - ticker.to_string() - )) + ticker + ) } + }; + + let response = Client::builder() + .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3") + .build()?.get(url).send()?; + + if !response.status().is_success() { + return Err(format!( + "HTTP request failed with status code: {:?}", + response.status() + ) + .into()); } - data = request.send()?.text()?.bytes().collect(); + + let data: Vec = response.text()?.bytes().collect(); let df = CsvReader::new(Cursor::new(data)) .has_header(true)