|
|
@ -29,6 +29,32 @@ class AllFrequencies: |
|
|
|
Y = Frequency("annual", "years", 1, 365, "Y") |
|
|
|
|
|
|
|
|
|
|
|
class DateNotFoundError(Exception): |
|
|
|
"""Exception to be raised when date is not found""" |
|
|
|
|
|
|
|
def __init__(self, message, date): |
|
|
|
message = f"{message}: {date}" |
|
|
|
super().__init__(message) |
|
|
|
|
|
|
|
|
|
|
|
def _parse_date(date: str, date_format: str = None): |
|
|
|
"""Parses date and handles errors""" |
|
|
|
|
|
|
|
if isinstance(date, (datetime.datetime, datetime.date)): |
|
|
|
return datetime.datetime.fromordinal(date.toordinal()) |
|
|
|
|
|
|
|
if date_format is None: |
|
|
|
date_format = FincalOptions.date_format |
|
|
|
|
|
|
|
try: |
|
|
|
date = datetime.datetime.strptime(date, date_format) |
|
|
|
except TypeError: |
|
|
|
raise ValueError("Date does not seem to be valid date-like string") |
|
|
|
except ValueError: |
|
|
|
raise ValueError("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?") |
|
|
|
return date |
|
|
|
|
|
|
|
|
|
|
|
def _preprocess_timeseries( |
|
|
|
data: Union[ |
|
|
|
Sequence[Iterable[Union[str, datetime.datetime, float]]], |
|
|
@ -40,37 +66,26 @@ def _preprocess_timeseries( |
|
|
|
) -> List[Tuple[datetime.datetime, float]]: |
|
|
|
"""Converts any type of list to the correct type""" |
|
|
|
|
|
|
|
if isinstance(data, Sequence): |
|
|
|
if isinstance(data[0], Mapping): |
|
|
|
if len(data[0].keys()) == 2: |
|
|
|
current_data = [tuple(i.values()) for i in data] |
|
|
|
elif len(data[0].keys()) == 1: |
|
|
|
current_data = [tuple(*i.items()) for i in data] |
|
|
|
else: |
|
|
|
raise TypeError("Could not parse the data") |
|
|
|
current_data = _preprocess_timeseries(current_data, date_format) |
|
|
|
|
|
|
|
elif isinstance(data[0], Sequence): |
|
|
|
if isinstance(data[0][0], str): |
|
|
|
current_data = [] |
|
|
|
for i in data: |
|
|
|
row = datetime.datetime.strptime(i[0], date_format), i[1] |
|
|
|
current_data.append(row) |
|
|
|
elif isinstance(data[0][0], datetime.datetime): |
|
|
|
current_data = [(i, j) for i, j in data] |
|
|
|
else: |
|
|
|
raise TypeError("Could not parse the data") |
|
|
|
else: |
|
|
|
raise TypeError("Could not parse the data") |
|
|
|
|
|
|
|
elif isinstance(data, Mapping): |
|
|
|
if isinstance(data, Mapping): |
|
|
|
current_data = [(k, v) for k, v in data.items()] |
|
|
|
current_data = _preprocess_timeseries(current_data, date_format) |
|
|
|
return _preprocess_timeseries(current_data, date_format) |
|
|
|
|
|
|
|
if not isinstance(data, Sequence): |
|
|
|
raise TypeError("Could not parse the data") |
|
|
|
|
|
|
|
if isinstance(data[0], Sequence): |
|
|
|
return sorted([(_parse_date(i, date_format), j) for i, j in data]) |
|
|
|
|
|
|
|
if not isinstance(data[0], Mapping): |
|
|
|
raise TypeError("Could not parse the data") |
|
|
|
|
|
|
|
if len(data[0]) == 1: |
|
|
|
current_data = [tuple(*i.items()) for i in data] |
|
|
|
elif len(data[0]) == 2: |
|
|
|
current_data = [tuple(i.values()) for i in data] |
|
|
|
else: |
|
|
|
raise TypeError("Could not parse the data") |
|
|
|
current_data.sort() |
|
|
|
return current_data |
|
|
|
return _preprocess_timeseries(current_data, date_format) |
|
|
|
|
|
|
|
|
|
|
|
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: |
|
|
@ -78,7 +93,7 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) |
|
|
|
|
|
|
|
deltas = {"exact": 0, "previous": -1, "next": 1} |
|
|
|
if closest not in deltas.keys(): |
|
|
|
raise ValueError(f"Invalid closest argument: {closest}") |
|
|
|
raise ValueError(f"Invalid argument for closest: {closest}") |
|
|
|
|
|
|
|
as_on_match = closest if as_on_match == "closest" else as_on_match |
|
|
|
prior_match = closest if prior_match == "closest" else prior_match |
|
|
@ -96,33 +111,29 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) |
|
|
|
return as_on_delta, prior_delta |
|
|
|
|
|
|
|
|
|
|
|
def _parse_date(date: str, date_format: str = None): |
|
|
|
"""Parses date and handles errors""" |
|
|
|
def _find_closest_date(data, date, delta, if_not_found): |
|
|
|
"""Helper function to find data for the closest available date""" |
|
|
|
|
|
|
|
if isinstance(date, (datetime.datetime, datetime.date)): |
|
|
|
return datetime.datetime.fromordinal(date.toordinal()) |
|
|
|
row = data.get(date, None) |
|
|
|
if row is not None: |
|
|
|
return date, row |
|
|
|
|
|
|
|
if date_format is None: |
|
|
|
date_format = FincalOptions.date_format |
|
|
|
if delta: |
|
|
|
return _find_closest_date(data, date + delta, delta, if_not_found) |
|
|
|
|
|
|
|
try: |
|
|
|
date = datetime.datetime.strptime(date, date_format) |
|
|
|
except TypeError: |
|
|
|
raise ValueError("Date does not seem to be valid date-like string") |
|
|
|
except ValueError: |
|
|
|
raise ValueError("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?") |
|
|
|
return date |
|
|
|
if if_not_found == "fail": |
|
|
|
raise DateNotFoundError("Data not found for date", date) |
|
|
|
if if_not_found == "nan": |
|
|
|
return date, float("NaN") |
|
|
|
|
|
|
|
raise ValueError(f"Invalid argument for if_not_found: {if_not_found}") |
|
|
|
|
|
|
|
|
|
|
|
def _interval_to_years(interval_type: Literal['years', 'months', 'day'], interval_value: int) -> int: |
|
|
|
def _interval_to_years(interval_type: Literal["years", "months", "day"], interval_value: int) -> int: |
|
|
|
"""Converts any time period to years for use with compounding functions""" |
|
|
|
|
|
|
|
day_conversion_factor = { |
|
|
|
'years': 1, |
|
|
|
'months': 12, |
|
|
|
'days': 365 |
|
|
|
} |
|
|
|
years = interval_value/day_conversion_factor[interval_type] |
|
|
|
year_conversion_factor = {"years": 1, "months": 12, "days": 365} |
|
|
|
years = interval_value / year_conversion_factor[interval_type] |
|
|
|
return years |
|
|
|
|
|
|
|
|
|
|
|