diff --git a/fincal/core.py b/fincal/core.py index a6d1772..3b1db0a 100644 --- a/fincal/core.py +++ b/fincal/core.py @@ -29,71 +29,12 @@ class AllFrequencies: Y = Frequency("annual", "years", 1, 365, "Y") -def _preprocess_timeseries( - data: Union[ - Sequence[Iterable[Union[str, datetime.datetime, float]]], - Sequence[Mapping[str, Union[float, datetime.datetime]]], - Sequence[Mapping[Union[str, datetime.datetime], float]], - Mapping[Union[str, datetime.datetime], float], - ], - date_format: str, -) -> List[Tuple[datetime.datetime, float]]: - """Converts any type of list to the correct type""" +class DateNotFoundError(Exception): + """Exception to be raised when date is not found""" - if isinstance(data, Sequence): - if isinstance(data[0], Mapping): - if len(data[0].keys()) == 2: - current_data = [tuple(i.values()) for i in data] - elif len(data[0].keys()) == 1: - current_data = [tuple(*i.items()) for i in data] - else: - raise TypeError("Could not parse the data") - current_data = _preprocess_timeseries(current_data, date_format) - - elif isinstance(data[0], Sequence): - if isinstance(data[0][0], str): - current_data = [] - for i in data: - row = datetime.datetime.strptime(i[0], date_format), i[1] - current_data.append(row) - elif isinstance(data[0][0], datetime.datetime): - current_data = [(i, j) for i, j in data] - else: - raise TypeError("Could not parse the data") - else: - raise TypeError("Could not parse the data") - - elif isinstance(data, Mapping): - current_data = [(k, v) for k, v in data.items()] - current_data = _preprocess_timeseries(current_data, date_format) - - else: - raise TypeError("Could not parse the data") - current_data.sort() - return current_data - - -def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: - """Checks the arguments and returns appropriate timedelta objects""" - - deltas = {"exact": 0, "previous": -1, "next": 1} - if closest not in deltas.keys(): - raise ValueError(f"Invalid closest argument: {closest}") - - as_on_match = closest if as_on_match == "closest" else as_on_match - prior_match = closest if prior_match == "closest" else prior_match - - if as_on_match in deltas.keys(): - as_on_delta = datetime.timedelta(days=deltas[as_on_match]) - else: - raise ValueError(f"Invalid as_on_match argument: {as_on_match}") - - if prior_match in deltas.keys(): - prior_delta = datetime.timedelta(days=deltas[prior_match]) - else: - raise ValueError(f"Invalid prior_match argument: {prior_match}") - - return as_on_delta, prior_delta + def __init__(self, message, date): + message = f"{message}: {date}" + super().__init__(message) def _parse_date(date: str, date_format: str = None): @@ -114,15 +55,85 @@ def _parse_date(date: str, date_format: str = None): return date -def _interval_to_years(interval_type: Literal['years', 'months', 'day'], interval_value: int) -> int: +def _preprocess_timeseries( + data: Union[ + Sequence[Iterable[Union[str, datetime.datetime, float]]], + Sequence[Mapping[str, Union[float, datetime.datetime]]], + Sequence[Mapping[Union[str, datetime.datetime], float]], + Mapping[Union[str, datetime.datetime], float], + ], + date_format: str, +) -> List[Tuple[datetime.datetime, float]]: + """Converts any type of list to the correct type""" + + if isinstance(data, Mapping): + current_data = [(k, v) for k, v in data.items()] + return _preprocess_timeseries(current_data, date_format) + + if not isinstance(data, Sequence): + raise TypeError("Could not parse the data") + + if isinstance(data[0], Sequence): + return sorted([(_parse_date(i, date_format), j) for i, j in data]) + + if not isinstance(data[0], Mapping): + raise TypeError("Could not parse the data") + + if len(data[0]) == 1: + current_data = [tuple(*i.items()) for i in data] + elif len(data[0]) == 2: + current_data = [tuple(i.values()) for i in data] + else: + raise TypeError("Could not parse the data") + return _preprocess_timeseries(current_data, date_format) + + +def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: + """Checks the arguments and returns appropriate timedelta objects""" + + deltas = {"exact": 0, "previous": -1, "next": 1} + if closest not in deltas.keys(): + raise ValueError(f"Invalid argument for closest: {closest}") + + as_on_match = closest if as_on_match == "closest" else as_on_match + prior_match = closest if prior_match == "closest" else prior_match + + if as_on_match in deltas.keys(): + as_on_delta = datetime.timedelta(days=deltas[as_on_match]) + else: + raise ValueError(f"Invalid as_on_match argument: {as_on_match}") + + if prior_match in deltas.keys(): + prior_delta = datetime.timedelta(days=deltas[prior_match]) + else: + raise ValueError(f"Invalid prior_match argument: {prior_match}") + + return as_on_delta, prior_delta + + +def _find_closest_date(data, date, delta, if_not_found): + """Helper function to find data for the closest available date""" + + row = data.get(date, None) + if row is not None: + return date, row + + if delta: + return _find_closest_date(data, date + delta, delta, if_not_found) + + if if_not_found == "fail": + raise DateNotFoundError("Data not found for date", date) + if if_not_found == "nan": + return date, float("NaN") + + raise ValueError(f"Invalid argument for if_not_found: {if_not_found}") + + +def _interval_to_years(interval_type: Literal["years", "months", "day"], interval_value: int) -> int: """Converts any time period to years for use with compounding functions""" - day_conversion_factor = { - 'years': 1, - 'months': 12, - 'days': 365 - } - years = interval_value/day_conversion_factor[interval_type] + year_conversion_factor = {"years": 1, "months": 12, "days": 365} + years = interval_value / year_conversion_factor[interval_type] return years