fix csv merge, fix ark europe csv

This commit is contained in:
Elijah McMorris 2024-06-09 16:09:17 -07:00
parent 5575b2c073
commit 4e260f2fa2
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
2 changed files with 38 additions and 3 deletions

View file

@ -33,6 +33,10 @@ fn print_df(ticker: &Ticker, df: &DataFrame) {
fn csv_merge() -> Result<(), Error> { fn csv_merge() -> Result<(), Error> {
for ticker in Ticker::iter() { for ticker in Ticker::iter() {
if !std::path::Path::new(&format!("./data/csv/{}", ticker)).exists() {
continue;
}
let df = Ark::merge_old_csv_to_parquet(ticker, None)? let df = Ark::merge_old_csv_to_parquet(ticker, None)?
.format()? .format()?
.sort()? .sort()?

View file

@ -347,7 +347,7 @@ impl Ark {
vec!["company", "cusip", "weight"], vec!["company", "cusip", "weight"],
) )
.with_columns([ .with_columns([
Series::new("date", [chrono::Local::now().naive_local()]).lit(), Series::new("date", [chrono::Local::now().date_naive()]).lit(),
Series::new("ticker", [None::<String>]).lit(), Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(), Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(), Series::new("shares", [None::<i64>]).lit(),
@ -382,8 +382,33 @@ impl Ark {
Ok(df.into()) Ok(df.into())
} }
fn df_format_europe_csv(df: DF) -> Result<DF, Error> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"_duplicated_0") {
df = df.slice(2, df.height());
df = df
.clone()
.lazy()
.rename(df.get_column_names(), ["company", "cusip", "weight"])
.with_columns([
Series::new("date", [chrono::Local::now().date_naive()]).lit(),
Series::new("ticker", [None::<String>]).lit(),
Series::new("market_value", [None::<i64>]).lit(),
Series::new("shares", [None::<i64>]).lit(),
Series::new("share_price", [None::<f64>]).lit(),
])
.collect()?;
}
Ok(df.into())
}
pub fn df_format(df: DF) -> Result<DF, Error> { pub fn df_format(df: DF) -> Result<DF, Error> {
let mut df = Self::df_format_europe_arkfundsio(df)?.collect()?; let mut df = df.collect()?;
df = Self::df_format_europe_csv(df.into())?.collect()?;
df = Self::df_format_europe_arkfundsio(df.into())?.collect()?;
df = Self::df_format_21shares(df.into())?.collect()?; df = Self::df_format_21shares(df.into())?.collect()?;
df = Self::df_format_arkvx(df.into())?.collect()?; df = Self::df_format_arkvx(df.into())?.collect()?;
df = Self::df_format_europe(df.into())?.collect()?; df = Self::df_format_europe(df.into())?.collect()?;
@ -500,6 +525,13 @@ impl Ark {
); );
} }
if df.fields().contains(&Field::new(
"date",
DataType::Datetime(TimeUnit::Milliseconds, None),
)) {
expressions.push(col("date").cast(DataType::Date));
}
if df if df
.fields() .fields()
.contains(&Field::new("market_value", DataType::Utf8)) .contains(&Field::new("market_value", DataType::Utf8))
@ -836,7 +868,6 @@ impl Reader {
JsonReader::new(Cursor::new(json.to_string())).finish()? JsonReader::new(Cursor::new(json.to_string())).finish()?
} }
}; };
Ok(df) Ok(df)
} }
} }