diff --git a/fincal/fincal.py b/fincal/fincal.py index ae263d9..c060383 100644 --- a/fincal/fincal.py +++ b/fincal/fincal.py @@ -1,6 +1,6 @@ import datetime from dataclasses import dataclass -from typing import List +from typing import Dict, Iterable, List, Literal, Tuple, Union from dateutil.relativedelta import relativedelta @@ -46,10 +46,59 @@ def create_date_series( return dates +def _preprocess_timeseries( + data: Union[ + List[Iterable[Union[str, datetime.datetime, float]]], + List[Dict[str, Union[float, datetime.datetime]]], + List[Dict[Union[str, datetime.datetime], float]], + Dict[Union[str, datetime.datetime], float] + ], + date_format: str +) -> List[Tuple[datetime.datetime, float]]: + """Converts any type of list to the correct type""" + + if isinstance(data, list): + if isinstance(data[0], dict): + if len(data[0].keys()) == 2: + current_data = [tuple(i.values()) for i in data] + elif len(data[0].keys()) == 1: + current_data = [tuple(*i.items()) for i in data] + else: + raise TypeError("Could not parse the data") + current_data = _preprocess_timeseries(current_data, date_format) + + elif isinstance(data[0], Iterable): + if isinstance(data[0][0], str): + current_data = [] + for i in data: + row = datetime.datetime.strptime(i[0], date_format), i[1] + current_data.append(row) + elif isinstance(data[0][0], datetime.datetime): + current_data = [(i, j) for i, j in data] + else: + raise TypeError("Could not parse the data") + else: + raise TypeError("Could not parse the data") + + elif isinstance(data, dict): + current_data = [(k, v) for k, v in data.items()] + current_data = _preprocess_timeseries(current_data, date_format) + + else: + raise TypeError("Could not parse the data") + current_data.sort() + return current_data + + class TimeSeries: """Container for TimeSeries objects""" - def __init__(self, data: List[tuple], date_format: str = "%Y-%m-%d", frequency="D"): + def __init__( + self, + data: List[Iterable], + date_format: str = "%Y-%m-%d", + frequency=Literal['D', 'W', 'M', 'Q', 'H', 'Y'] + ): """Instantiate a TimeSeries object Parameters @@ -69,11 +118,11 @@ class TimeSeries: Valid values are {D, W, M, Q, H, Y} """ - time_series = [(datetime.datetime.strptime(i[0], date_format), i[1]) for i in data] - time_series.sort() - self.time_series = dict(time_series) + data = _preprocess_timeseries(data, date_format=date_format) + + self.time_series = dict(data) self.dates = set(list(self.time_series)) - if len(self.dates) != len(time_series): + if len(self.dates) != len(data): print("Warning: The input data contains duplicate dates which have been ignored.") self.start_date = list(self.time_series)[0] self.end_date = list(self.time_series)[-1] @@ -123,7 +172,7 @@ class TimeSeries: cur_val = self.time_series[cur_date] except KeyError: pass - new_ts.update({cur_date: cur_val}) # type: ignore + new_ts.update({cur_date: cur_val}) if inplace: self.time_series = new_ts @@ -141,7 +190,7 @@ class TimeSeries: cur_val = self.time_series[cur_date] except KeyError: pass - new_ts.update({cur_date: cur_val}) # type: ignore + new_ts.update({cur_date: cur_val}) if inplace: self.time_series = new_ts @@ -151,7 +200,7 @@ class TimeSeries: def calculate_returns( self, as_on: datetime.datetime, closest: str = "previous", compounding: bool = True, years: int = 1 - ) -> int: + ) -> float: """Method to calculate returns for a certain time-period as on a particular date >>> calculate_returns(datetime.date(2020, 1, 1), years=1) """ @@ -167,7 +216,7 @@ class TimeSeries: elif closest == "next": delta = 1 else: - raise ValueError(f"Invalid value for closes parameter: {closest}") + raise ValueError(f"Invalid value for closest parameter: {closest}") while True: try: