Browse Source

Added custom error, refactored preprocess_timeseries

Added _find_closes_date function
switch-to-decimal
Gourav Kumar 2 years ago
parent
commit
1be38ce7d4
  1. 107
      fincal/core.py

107
fincal/core.py

@ -29,6 +29,32 @@ class AllFrequencies:
Y = Frequency("annual", "years", 1, 365, "Y") Y = Frequency("annual", "years", 1, 365, "Y")
class DateNotFoundError(Exception):
"""Exception to be raised when date is not found"""
def __init__(self, message, date):
message = f"{message}: {date}"
super().__init__(message)
def _parse_date(date: str, date_format: str = None):
"""Parses date and handles errors"""
if isinstance(date, (datetime.datetime, datetime.date)):
return datetime.datetime.fromordinal(date.toordinal())
if date_format is None:
date_format = FincalOptions.date_format
try:
date = datetime.datetime.strptime(date, date_format)
except TypeError:
raise ValueError("Date does not seem to be valid date-like string")
except ValueError:
raise ValueError("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?")
return date
def _preprocess_timeseries( def _preprocess_timeseries(
data: Union[ data: Union[
Sequence[Iterable[Union[str, datetime.datetime, float]]], Sequence[Iterable[Union[str, datetime.datetime, float]]],
@ -40,37 +66,26 @@ def _preprocess_timeseries(
) -> List[Tuple[datetime.datetime, float]]: ) -> List[Tuple[datetime.datetime, float]]:
"""Converts any type of list to the correct type""" """Converts any type of list to the correct type"""
if isinstance(data, Sequence): if isinstance(data, Mapping):
if isinstance(data[0], Mapping):
if len(data[0].keys()) == 2:
current_data = [tuple(i.values()) for i in data]
elif len(data[0].keys()) == 1:
current_data = [tuple(*i.items()) for i in data]
else:
raise TypeError("Could not parse the data")
current_data = _preprocess_timeseries(current_data, date_format)
elif isinstance(data[0], Sequence):
if isinstance(data[0][0], str):
current_data = []
for i in data:
row = datetime.datetime.strptime(i[0], date_format), i[1]
current_data.append(row)
elif isinstance(data[0][0], datetime.datetime):
current_data = [(i, j) for i, j in data]
else:
raise TypeError("Could not parse the data")
else:
raise TypeError("Could not parse the data")
elif isinstance(data, Mapping):
current_data = [(k, v) for k, v in data.items()] current_data = [(k, v) for k, v in data.items()]
current_data = _preprocess_timeseries(current_data, date_format) return _preprocess_timeseries(current_data, date_format)
if not isinstance(data, Sequence):
raise TypeError("Could not parse the data")
if isinstance(data[0], Sequence):
return sorted([(_parse_date(i, date_format), j) for i, j in data])
if not isinstance(data[0], Mapping):
raise TypeError("Could not parse the data")
if len(data[0]) == 1:
current_data = [tuple(*i.items()) for i in data]
elif len(data[0]) == 2:
current_data = [tuple(i.values()) for i in data]
else: else:
raise TypeError("Could not parse the data") raise TypeError("Could not parse the data")
current_data.sort() return _preprocess_timeseries(current_data, date_format)
return current_data
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
@ -78,7 +93,7 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
deltas = {"exact": 0, "previous": -1, "next": 1} deltas = {"exact": 0, "previous": -1, "next": 1}
if closest not in deltas.keys(): if closest not in deltas.keys():
raise ValueError(f"Invalid closest argument: {closest}") raise ValueError(f"Invalid argument for closest: {closest}")
as_on_match = closest if as_on_match == "closest" else as_on_match as_on_match = closest if as_on_match == "closest" else as_on_match
prior_match = closest if prior_match == "closest" else prior_match prior_match = closest if prior_match == "closest" else prior_match
@ -96,33 +111,29 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
return as_on_delta, prior_delta return as_on_delta, prior_delta
def _parse_date(date: str, date_format: str = None): def _find_closest_date(data, date, delta, if_not_found):
"""Parses date and handles errors""" """Helper function to find data for the closest available date"""
if isinstance(date, (datetime.datetime, datetime.date)): row = data.get(date, None)
return datetime.datetime.fromordinal(date.toordinal()) if row is not None:
return date, row
if date_format is None: if delta:
date_format = FincalOptions.date_format return _find_closest_date(data, date + delta, delta, if_not_found)
try: if if_not_found == "fail":
date = datetime.datetime.strptime(date, date_format) raise DateNotFoundError("Data not found for date", date)
except TypeError: if if_not_found == "nan":
raise ValueError("Date does not seem to be valid date-like string") return date, float("NaN")
except ValueError:
raise ValueError("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?") raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
return date
def _interval_to_years(interval_type: Literal['years', 'months', 'day'], interval_value: int) -> int: def _interval_to_years(interval_type: Literal["years", "months", "day"], interval_value: int) -> int:
"""Converts any time period to years for use with compounding functions""" """Converts any time period to years for use with compounding functions"""
day_conversion_factor = { year_conversion_factor = {"years": 1, "months": 12, "days": 365}
'years': 1, years = interval_value / year_conversion_factor[interval_type]
'months': 12,
'days': 365
}
years = interval_value/day_conversion_factor[interval_type]
return years return years

Loading…
Cancel
Save