This commit is contained in:
Elijah McMorris 2023-06-14 07:05:49 +00:00
parent 5b05079edb
commit 6e4c536f68
Signed by: NexVeridian
SSH key fingerprint: SHA256:bsA1SKZxuEcEVHAy3gY1HUeM5ykRJl0U0kQHQn0hMg8

View file

@ -21,7 +21,6 @@ pub enum Ticker {
ARKW, ARKW,
ARKX, ARKX,
} }
impl Ticker { impl Ticker {
pub fn value(&self) -> &str { pub fn value(&self) -> &str {
match *self { match *self {
@ -36,234 +35,289 @@ impl Ticker {
} }
} }
pub fn merge_csv_to_parquet(ticker: Ticker) -> Result<(), Box<dyn Error>> { pub enum DF {
let mut dfs = vec![]; LazyFrame(LazyFrame),
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) { DataFrame(DataFrame),
dfs.push(LazyCsvReader::new(x).finish()?); }
impl From<LazyFrame> for DF {
fn from(lf: LazyFrame) -> Self {
DF::LazyFrame(lf)
} }
let mut df = concat(dfs, false, true)?; }
impl From<DataFrame> for DF {
if read_parquet(ticker).is_ok() { fn from(df: DataFrame) -> Self {
let df_old = read_parquet(ticker)?; DF::DataFrame(df)
df = concat_df(vec![df_old, df])?; }
write_parquet(ticker, df_sort(df.collect()?)?)?; }
} else { impl DF {
write_parquet(ticker, df_format(df)?)?; pub fn collect(self) -> Result<DataFrame, Box<dyn Error>> {
match self {
DF::LazyFrame(x) => Ok(x.collect()?),
DF::DataFrame(x) => Ok(x),
}
}
pub fn lazy(self) -> LazyFrame {
match self {
DF::LazyFrame(x) => x,
DF::DataFrame(x) => x.lazy(),
}
} }
Ok(())
} }
pub enum Source { pub enum Source {
Read,
Ark, Ark,
ApiIncremental, ApiIncremental,
ApiFull, ApiFull,
} }
struct Ark {
df: DF,
ticker: Ticker,
}
impl Ark {
pub fn new(source: Source, ticker: Ticker) -> Result<Self, Box<dyn Error>> {
let mut ark = Self {
df: Self::read_parquet(ticker)?.into(),
ticker,
};
pub fn update_parquet(ticker: Ticker, source: Source) -> Result<(), Box<dyn Error>> { let update = match source {
let mut df = read_parquet(ticker)?; Source::Read => None,
Source::Ark => Some(ark.get_csv_ark()?),
Source::ApiIncremental => {
let last_day = ark
.df
.collect()?
.column("date")
.unwrap()
.max()
.and_then(NaiveDate::from_num_days_from_ce_opt);
Some(ark.get_api(last_day)?)
}
Source::ApiFull => Some(ark.get_api(None)?),
};
let update = match source { if let Some(update) = update {
Source::Ark => get_csv_ark(ticker)?, ark.df = Self::concat_df(vec![ark.df, update.into()])?.into();
Source::ApiIncremental => {
let last_day = df
.clone()
.collect()?
.column("date")
.unwrap()
.max()
.and_then(NaiveDate::from_num_days_from_ce_opt);
get_api(ticker, last_day)?
} }
Source::ApiFull => get_api(ticker, None)?, Ok(ark)
};
df = concat_df(vec![df, update])?;
write_parquet(ticker, df_sort(df.collect()?)?)?;
Ok(())
}
fn concat_df(mut dfs: Vec<LazyFrame>) -> Result<LazyFrame, Box<dyn Error>> {
// with dedupe and format
for x in &mut dfs {
*x = df_format(x.to_owned())?.lazy();
}
let mut df = concat(dfs, false, true)?;
df = df.unique_stable(None, UniqueKeepStrategy::First);
Ok(df)
}
pub fn read_parquet(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> {
let df = LazyFrame::scan_parquet(
format!("data/parquet/{}.parquet", ticker),
ScanArgsParquet::default(),
)?;
Ok(df)
}
pub fn write_parquet(ticker: Ticker, mut df: DataFrame) -> Result<(), Box<dyn Error>> {
ParquetWriter::new(File::create(format!("data/parquet/{}.parquet", ticker))?)
.finish(&mut df)?;
Ok(())
}
pub fn df_sort(df: DataFrame) -> Result<DataFrame, Box<dyn Error>> {
let df = df.sort(["date", "weight"], vec![false, true])?;
Ok(df)
}
pub fn df_format(df: LazyFrame) -> Result<DataFrame, Box<dyn Error>> {
let mut df = df.collect()?;
if df.get_column_names().contains(&"market_value_($)") {
df = df
.lazy()
.rename(
vec!["market_value_($)", "weight_(%)"],
vec!["market_value", "weight"],
)
.collect()?;
}
if df.get_column_names().contains(&"market value ($)") {
df = df
.lazy()
.rename(
vec!["market value ($)", "weight (%)"],
vec!["market_value", "weight"],
)
.collect()?;
}
if df.get_column_names().contains(&"CUSIP") {
df = df
.lazy()
.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"])
.collect()?;
} }
// if df.rename("market_value_($)", "market_value").is_ok() {} pub fn write_parquet(mut self) -> Result<Self, Box<dyn Error>> {
// if df.rename("market value ($)", "market_value").is_ok() {} ParquetWriter::new(File::create(format!(
// if df.rename("weight_(%)", "weight").is_ok() {} "data/parquet/{}.parquet",
// if df.rename("weight (%)", "weight").is_ok() {} self.ticker
// if df.rename("CUSIP", "cusip").is_ok() {} ))?)
.finish(&mut self.df.collect()?)?;
if df.get_column_names().contains(&"fund") { Ok(self)
_ = df.drop_in_place("fund");
}
if df.get_column_names().contains(&"weight_rank") {
_ = df.drop_in_place("weight_rank");
} }
let mut expressions: Vec<Expr> = vec![]; pub fn df_sort(&self) -> Result<&Self, Box<dyn Error>> {
let df = self
if !df.fields().contains(&Field::new("date", DataType::Date)) { .df
expressions.push(col("date").str().strptime(
DataType::Date,
StrptimeOptions {
// format: Some("%m/%d/%Y".into()),
format: None,
strict: false,
exact: true,
cache: true,
},
));
}
if df.fields().contains(&Field::new("weight", DataType::Utf8)) {
expressions.push(
col("weight")
.str()
.replace(lit("%"), lit(""), true)
.cast(DataType::Float64),
);
}
if df
.fields()
.contains(&Field::new("market_value", DataType::Utf8))
{
expressions.push(
col("market_value")
.str()
.replace(lit("$"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Float64)
.cast(DataType::Int64),
);
}
if df.fields().contains(&Field::new("shares", DataType::Utf8)) {
expressions.push(
col("shares")
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Int64),
);
}
df = df
.lazy()
.with_columns(expressions)
.filter(col("date").is_not_null())
.collect()?;
if !df.get_column_names().contains(&"share_price")
&& df.get_column_names().contains(&"market_value")
{
df = df
.lazy()
.with_column(
(col("market_value").cast(DataType::Float64)
/ col("shares").cast(DataType::Float64))
.alias("share_price")
.cast(DataType::Float64)
.round(2),
)
.collect()? .collect()?
.sort(["date", "weight"], vec![false, true])?;
self.df = df.into();
Ok(self)
} }
if df.get_column_names().contains(&"share_price") { fn read_parquet(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> {
df = df.select([ let df = LazyFrame::scan_parquet(
"date", format!("data/parquet/{}.parquet", ticker),
"ticker", ScanArgsParquet::default(),
"cusip", )?;
"company", Ok(df)
"market_value",
"shares",
"share_price",
"weight",
])?;
} else {
df = df.select(["date", "ticker", "cusip", "company", "weight"])?;
} }
Ok(df) fn concat_df(mut dfs: Vec<DF>) -> Result<DF, Box<dyn Error>> {
} // with dedupe and format
// for x in &mut dfs {
// *x = Self::df_format(x.to_owned())?.lazy();
// }
let mut df = concat(dfs, false, true)?;
df = df.unique_stable(None, UniqueKeepStrategy::First);
Ok(df.into())
}
pub fn get_api(ticker: Ticker, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> { fn df_format(df: LazyFrame) -> Result<DataFrame, Box<dyn Error>> {
let url = match (ticker, last_day) { let mut df = df.collect()?;
(Ticker::ARKVC, Some(last_day)) => format!(
"https://api.nexveridian.com/arkvc_holdings?end={}", if df.get_column_names().contains(&"market_value_($)") {
last_day df = df
), .lazy()
(ticker, Some(last_day)) => format!( .rename(
"https://api.nexveridian.com/ark_holdings?ticker={}&end={}", vec!["market_value_($)", "weight_(%)"],
ticker, last_day vec!["market_value", "weight"],
), )
(Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(), .collect()?;
(ticker, None) => { }
format!("https://api.nexveridian.com/ark_holdings?ticker={}", ticker) if df.get_column_names().contains(&"market value ($)") {
df = df
.lazy()
.rename(
vec!["market value ($)", "weight (%)"],
vec!["market_value", "weight"],
)
.collect()?;
}
if df.get_column_names().contains(&"CUSIP") {
df = df
.lazy()
.rename(vec!["CUSIP", "weight (%)"], vec!["cusip", "weight"])
.collect()?;
} }
};
Reader::Json.get_data_url(url)
}
pub fn get_csv_ark(ticker: Ticker) -> Result<LazyFrame, Box<dyn Error>> { // if df.rename("market_value_($)", "market_value").is_ok() {}
let url = match ticker { // if df.rename("market value ($)", "market_value").is_ok() {}
Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(), // if df.rename("weight_(%)", "weight").is_ok() {}
_ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", ticker.value(), ticker), // if df.rename("weight (%)", "weight").is_ok() {}
}; // if df.rename("CUSIP", "cusip").is_ok() {}
Reader::Csv.get_data_url(url)
if df.get_column_names().contains(&"fund") {
_ = df.drop_in_place("fund");
}
if df.get_column_names().contains(&"weight_rank") {
_ = df.drop_in_place("weight_rank");
}
let mut expressions: Vec<Expr> = vec![];
if !df.fields().contains(&Field::new("date", DataType::Date)) {
expressions.push(col("date").str().strptime(
DataType::Date,
StrptimeOptions {
// format: Some("%m/%d/%Y".into()),
format: None,
strict: false,
exact: true,
cache: true,
},
));
}
if df.fields().contains(&Field::new("weight", DataType::Utf8)) {
expressions.push(
col("weight")
.str()
.replace(lit("%"), lit(""), true)
.cast(DataType::Float64),
);
}
if df
.fields()
.contains(&Field::new("market_value", DataType::Utf8))
{
expressions.push(
col("market_value")
.str()
.replace(lit("$"), lit(""), true)
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Float64)
.cast(DataType::Int64),
);
}
if df.fields().contains(&Field::new("shares", DataType::Utf8)) {
expressions.push(
col("shares")
.str()
.replace_all(lit(","), lit(""), true)
.cast(DataType::Int64),
);
}
df = df
.lazy()
.with_columns(expressions)
.filter(col("date").is_not_null())
.collect()?;
if !df.get_column_names().contains(&"share_price")
&& df.get_column_names().contains(&"market_value")
{
df = df
.lazy()
.with_column(
(col("market_value").cast(DataType::Float64)
/ col("shares").cast(DataType::Float64))
.alias("share_price")
.cast(DataType::Float64)
.round(2),
)
.collect()?
}
if df.get_column_names().contains(&"share_price") {
df = df.select([
"date",
"ticker",
"cusip",
"company",
"market_value",
"shares",
"share_price",
"weight",
])?;
} else if !df
.get_column_names()
.eq(&["date", "ticker", "cusip", "company", "weight"])
{
df = df.select(["date", "ticker", "cusip", "company", "weight"])?;
}
Ok(df)
}
pub fn get_api(self, last_day: Option<NaiveDate>) -> Result<LazyFrame, Box<dyn Error>> {
let tic = self.ticker;
let url = match (tic, last_day) {
(self::Ticker::ARKVC, Some(last_day)) => format!(
"https://api.nexveridian.com/arkvc_holdings?end={}",
last_day
),
(tic, Some(last_day)) => format!(
"https://api.nexveridian.com/ark_holdings?ticker={}&end={}",
tic.value(),
last_day
),
(self::Ticker::ARKVC, None) => "https://api.nexveridian.com/arkvc_holdings".to_owned(),
(tic, None) => {
format!(
"https://api.nexveridian.com/ark_holdings?ticker={}",
tic.value()
)
}
};
Reader::Json.get_data_url(url)
}
pub fn get_csv_ark(self) -> Result<LazyFrame, Box<dyn Error>> {
let url = match self.ticker {
self::Ticker::ARKVC => "https://ark-ventures.com/wp-content/uploads/funds-etf-csv/ARK_VENTURE_FUND_HOLDINGS.csv".to_owned(),
_ => format!("https://ark-funds.com/wp-content/uploads/funds-etf-csv/ARK_{}_ETF_{}_HOLDINGS.csv", self.ticker.value(), self.ticker),
};
Reader::Csv.get_data_url(url)
}
pub fn merge_old_csv_to_parquet(ticker: Ticker) -> Result<(), Box<dyn Error>> {
let mut dfs = vec![];
for x in glob(&format!("data/csv/{}/*", ticker))?.filter_map(Result::ok) {
dfs.push(LazyCsvReader::new(x).finish()?);
}
let mut df = concat(dfs, false, true)?;
if Self::read_parquet(ticker).is_ok() {
let df_old = Self::read_parquet(ticker)?;
df = Self::concat_df(vec![df_old, df])?;
write_parquet(ticker, df_sort(df.collect()?)?)?;
} else {
write_parquet(ticker, df_format(df)?)?;
}
Ok(())
}
} }
pub enum Reader { pub enum Reader {