From 4e260f2fa2eb200b764df6eff4578ef0407f0fe8 Mon Sep 17 00:00:00 2001 From: NexVeridian Date: Sun, 9 Jun 2024 16:09:17 -0700 Subject: [PATCH] fix csv merge, fix ark europe csv --- src/main.rs | 4 ++++ src/util.rs | 37 ++++++++++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 4ea4338..6d1f8a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,6 +33,10 @@ fn print_df(ticker: &Ticker, df: &DataFrame) { fn csv_merge() -> Result<(), Error> { for ticker in Ticker::iter() { + if !std::path::Path::new(&format!("./data/csv/{}", ticker)).exists() { + continue; + } + let df = Ark::merge_old_csv_to_parquet(ticker, None)? .format()? .sort()? diff --git a/src/util.rs b/src/util.rs index 2459aad..9a8c4a9 100644 --- a/src/util.rs +++ b/src/util.rs @@ -347,7 +347,7 @@ impl Ark { vec!["company", "cusip", "weight"], ) .with_columns([ - Series::new("date", [chrono::Local::now().naive_local()]).lit(), + Series::new("date", [chrono::Local::now().date_naive()]).lit(), Series::new("ticker", [None::]).lit(), Series::new("market_value", [None::]).lit(), Series::new("shares", [None::]).lit(), @@ -382,8 +382,33 @@ impl Ark { Ok(df.into()) } + fn df_format_europe_csv(df: DF) -> Result { + let mut df = df.collect()?; + + if df.get_column_names().contains(&"_duplicated_0") { + df = df.slice(2, df.height()); + + df = df + .clone() + .lazy() + .rename(df.get_column_names(), ["company", "cusip", "weight"]) + .with_columns([ + Series::new("date", [chrono::Local::now().date_naive()]).lit(), + Series::new("ticker", [None::]).lit(), + Series::new("market_value", [None::]).lit(), + Series::new("shares", [None::]).lit(), + Series::new("share_price", [None::]).lit(), + ]) + .collect()?; + } + + Ok(df.into()) + } + pub fn df_format(df: DF) -> Result { - let mut df = Self::df_format_europe_arkfundsio(df)?.collect()?; + let mut df = df.collect()?; + df = Self::df_format_europe_csv(df.into())?.collect()?; + df = Self::df_format_europe_arkfundsio(df.into())?.collect()?; df = Self::df_format_21shares(df.into())?.collect()?; df = Self::df_format_arkvx(df.into())?.collect()?; df = Self::df_format_europe(df.into())?.collect()?; @@ -500,6 +525,13 @@ impl Ark { ); } + if df.fields().contains(&Field::new( + "date", + DataType::Datetime(TimeUnit::Milliseconds, None), + )) { + expressions.push(col("date").cast(DataType::Date)); + } + if df .fields() .contains(&Field::new("market_value", DataType::Utf8)) @@ -836,7 +868,6 @@ impl Reader { JsonReader::new(Cursor::new(json.to_string())).finish()? } }; - Ok(df) } }