This commit is contained in:
Elijah McMorris 2023-06-07 02:23:14 +00:00
parent be08848b0d
commit 4074a97ae2
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8
2 changed files with 135 additions and 127 deletions

View file

@ -8,7 +8,6 @@
mod util;
use util::*;
// #[tokio::main]
// async fn main() {
// let mut scheduler = AsyncScheduler::new();
@ -38,13 +37,13 @@ use util::*;
// }
fn main() {
let dfn = df_format(Ticker::ARKF, read_parquet(Ticker::ARKF).unwrap()).unwrap();
let dfn = df_format(read_parquet(Ticker::ARKVC).unwrap()).unwrap();
println!("{:#?}", dfn);
// update_parquet(Ticker::ARKF).unwrap();
let update = df_format(Ticker::ARKF, get_csv(Ticker::ARKF).unwrap()).unwrap();
// update_parquet(Ticker::ARKVC).unwrap();
// let update = df_format(Ticker::ARKF, get_csv(Ticker::ARKF).unwrap()).unwrap();
// let update = get_csv(Ticker::ARKF).unwrap().collect().unwrap();
// let x = read_parquet(Ticker::ARKF).unwrap().collect().unwrap();
println!("{:#?}", update);
// let x = df_format(read_parquet(Ticker::ARKVC).unwrap()).unwrap();
// println!("{:#?}", x);
}

View file

@ -44,7 +44,7 @@ pub fn merge_csv_to_parquet(folder: Ticker) -> Result<(), Box<dyn Error>> {
let df = concat(dfs, false, true)?;
write_parquet(folder, df_format(folder, df)?)?;
write_parquet(folder, df_format(df)?)?;
Ok(())
}
@ -53,7 +53,12 @@ pub fn update_parquet(ticker: Ticker) -> Result<(), Box<dyn Error>> {
let mut df = read_parquet(ticker)?;
df = concat(vec![df, update], false, true)?.unique_stable(None, UniqueKeepStrategy::First);
df = concat(
vec![df_format(df)?.lazy(), df_format(update)?.lazy()],
false,
true,
)?
.unique_stable(None, UniqueKeepStrategy::First);
write_parquet(ticker, df.collect()?)?;
Ok(())
@ -74,127 +79,131 @@ pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box<dyn Er
Ok(())
}
pub fn df_format(ticker: Ticker, mut dfl: LazyFrame) -> Result<DataFrame, Box<dyn Error>> {
match ticker {
Ticker::ARKVC => {
dfl = dfl.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"]);
pub fn df_format(df: LazyFrame) -> Result<DataFrame, Box<dyn Error>> {
let mut df = df.collect()?;
let df = dfl
.with_columns(vec![
col("date").str().strptime(StrpTimeOptions {
date_dtype: DataType::Date,
fmt: Some("%m/%d/%Y".into()),
strict: false,
exact: true,
cache: false,
tz_aware: false,
utc: false,
}),
col("weight")
.str()
.extract(r"[0-9]*\.[0-9]+", 0)
.cast(DataType::Float64),
])
.filter(col("date").is_not_null())
.collect()?;
Ok(df)
}
_ => {
let mut df = dfl.collect()?;
let col_names = df.get_column_names();
if df.rename("market_value_($)", "market_value").is_ok() {}
if df.rename("market value ($)", "market_value").is_ok() {}
if df.rename("weight_(%)", "weight").is_ok() {}
if df.rename("weight (%)", "weight").is_ok() {}
if df.get_column_names().contains(&"fund") {
_ = df.drop_in_place("fund");
}
if df.get_column_names().contains(&"weight_rank") {
_ = df.drop_in_place("weight_rank");
}
let mut expressions: Vec<Expr> = vec![];
if !df.fields().contains(&Field::new("date", DataType::Date)) {
expressions.push(col("date").str().strptime(StrpTimeOptions {
date_dtype: DataType::Date,
fmt: Some("%m/%d/%Y".into()),
strict: false,
exact: true,
cache: false,
tz_aware: false,
utc: false,
}));
}
if df.fields().contains(&Field::new("weight", DataType::Utf8)) {
expressions.push(
col("weight")
.str()
.replace(lit("%"), lit(""), true)
.cast(DataType::Float64),
);
}
if df
.fields()
.contains(&Field::new("market_value", DataType::Utf8))
{
expressions.push(
col("market_value")
.str()
.replace(lit("$"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Float64)
.cast(DataType::Int64),
);
}
if df.fields().contains(&Field::new("shares", DataType::Utf8)) {
expressions.push(
col("shares")
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Int64),
);
}
df = df
.lazy()
.with_columns(expressions)
.filter(col("date").is_not_null())
.collect()?;
if !df.get_column_names().contains(&"share_price") {
df = df
.lazy()
.with_column(
(col("market_value").cast(DataType::Float64)
/ col("shares").cast(DataType::Float64))
.alias("share_price")
.cast(DataType::Float64)
.round(2),
)
.collect()?;
}
df = df.select([
"date",
"ticker",
"cusip",
"company",
"market_value",
"shares",
"share_price",
"weight",
])?;
Ok(df)
}
if df.get_column_names().contains(&"market_value_($)") {
df = df
.lazy()
.rename(
vec!["market_value_($), weight_(%)"],
vec!["market_value, weight"],
)
.collect()?;
}
if df.get_column_names().contains(&"market value ($)") {
df = df
.lazy()
.rename(
vec!["market value ($), weight (%)"],
vec!["market_value, weight"],
)
.collect()?;
}
if df.get_column_names().contains(&"CUSIP") {
df = df
.lazy()
.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"])
.collect()?;
}
// if df.rename("market_value_($)", "market_value").is_ok() {}
// if df.rename("market value ($)", "market_value").is_ok() {}
// if df.rename("weight_(%)", "weight").is_ok() {}
// if df.rename("weight (%)", "weight").is_ok() {}
// if df.rename("CUSIP", "cusip").is_ok() {}
if df.get_column_names().contains(&"fund") {
_ = df.drop_in_place("fund");
}
if df.get_column_names().contains(&"weight_rank") {
_ = df.drop_in_place("weight_rank");
}
let mut expressions: Vec<Expr> = vec![];
if !df.fields().contains(&Field::new("date", DataType::Date)) {
expressions.push(col("date").str().strptime(StrpTimeOptions {
date_dtype: DataType::Date,
fmt: Some("%m/%d/%Y".into()),
strict: false,
exact: true,
cache: false,
tz_aware: false,
utc: false,
}));
}
if df.fields().contains(&Field::new("weight", DataType::Utf8)) {
expressions.push(
col("weight")
.str()
.replace(lit("%"), lit(""), true)
.cast(DataType::Float64),
);
}
if df
.fields()
.contains(&Field::new("market_value", DataType::Utf8))
{
expressions.push(
col("market_value")
.str()
.replace(lit("$"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Float64)
.cast(DataType::Int64),
);
}
if df.fields().contains(&Field::new("shares", DataType::Utf8)) {
expressions.push(
col("shares")
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Int64),
);
}
df = df
.lazy()
.with_columns(expressions)
.filter(col("date").is_not_null())
.collect()?;
if !df.get_column_names().contains(&"share_price")
&& df.get_column_names().contains(&"market_value")
{
df = df
.lazy()
.with_column(
(col("market_value").cast(DataType::Float64)
/ col("shares").cast(DataType::Float64))
.alias("share_price")
.cast(DataType::Float64)
.round(2),
)
.collect()?
}
if df.get_column_names().contains(&"share_price") {
df = df.select([
"date",
"ticker",
"cusip",
"company",
"market_value",
"shares",
"share_price",
"weight",
])?;
} else {
df = df.select(["date", "ticker", "cusip", "company", "weight"])?;
}
Ok(df)
}
pub fn get_csv(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> {