From cbace875c14d24837d657505ae7b63395b82f4fa Mon Sep 17 00:00:00 2001 From: Gourav Kumar Date: Mon, 21 Feb 2022 13:09:58 +0530 Subject: [PATCH] implemented _parse_date function to formalise date parsing made Series a subclass of UserList to improve compatibility --- fincal/core.py | 131 +++++++++++++++++++++++++++---------------------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/fincal/core.py b/fincal/core.py index 6f8bbab..915eb6a 100644 --- a/fincal/core.py +++ b/fincal/core.py @@ -1,4 +1,5 @@ import datetime +from collections import UserList from dataclasses import dataclass from numbers import Number from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union @@ -6,8 +7,8 @@ from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union @dataclass class FincalOptions: - date_format: str = '%Y-%m-%d' - closest: str = 'before' # after + date_format: str = "%Y-%m-%d" + closest: str = "before" # after @dataclass(frozen=True) @@ -20,12 +21,12 @@ class Frequency: class AllFrequencies: - D = Frequency('daily', 'days', 1, 1, 'D') - W = Frequency('weekly', 'days', 7, 7, 'W') - M = Frequency('monthly', 'months', 1, 30, 'M') - Q = Frequency('quarterly', 'months', 3, 91, 'Q') - H = Frequency('half-yearly', 'months', 6, 182, 'H') - Y = Frequency('annual', 'years', 1, 365, 'Y') + D = Frequency("daily", "days", 1, 1, "D") + W = Frequency("weekly", "days", 7, 7, "W") + M = Frequency("monthly", "months", 1, 30, "M") + Q = Frequency("quarterly", "months", 3, 91, "Q") + H = Frequency("half-yearly", "months", 6, 182, "H") + Y = Frequency("annual", "years", 1, 365, "Y") def _preprocess_timeseries( @@ -33,9 +34,9 @@ def _preprocess_timeseries( Sequence[Iterable[Union[str, datetime.datetime, float]]], Sequence[Mapping[str, Union[float, datetime.datetime]]], Sequence[Mapping[Union[str, datetime.datetime], float]], - Mapping[Union[str, datetime.datetime], float] + Mapping[Union[str, datetime.datetime], float], ], - date_format: str + date_format: str, ) -> List[Tuple[datetime.datetime, float]]: """Converts any type of list to the correct type""" @@ -75,12 +76,12 @@ def _preprocess_timeseries( def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: """Checks the arguments and returns appropriate timedelta objects""" - deltas = {'exact': 0, 'previous': -1, 'next': 1} + deltas = {"exact": 0, "previous": -1, "next": 1} if closest not in deltas.keys(): raise ValueError(f"Invalid closest argument: {closest}") - as_on_match = closest if as_on_match == 'closest' else as_on_match - prior_match = closest if prior_match == 'closest' else prior_match + as_on_match = closest if as_on_match == "closest" else as_on_match + prior_match = closest if prior_match == "closest" else prior_match if as_on_match in deltas.keys(): as_on_delta = datetime.timedelta(days=deltas[as_on_match]) @@ -95,6 +96,24 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) return as_on_delta, prior_delta +def _parse_date(date: str, date_format: str = None): + """Parses date and handles errors""" + + if isinstance(date, (datetime.datetime, datetime.date)): + return datetime.datetime.fromordinal(date.toordinal()) + + if date_format is None: + date_format = FincalOptions.date_format + + try: + date = datetime.datetime.strptime(date, date_format) + except TypeError: + raise Exception("Date does not seem to be valid date-like string") + except ValueError: + raise Exception("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?") + return date + + class _IndexSlicer: def __init__(self, parent_obj): self.parent = parent_obj @@ -112,7 +131,7 @@ class _IndexSlicer: return item -class Series: +class Series(UserList): def __init__(self, data): if not isinstance(data, Sequence): raise TypeError("Series only supports creation using Sequence types") @@ -128,27 +147,26 @@ class Series: data = [datetime.datetime.strptime(i, FincalOptions.date_format) for i in data] self.dtype = datetime.datetime except ValueError: - raise TypeError("Series does not support string data type except dates.\n" - "Hint: Try setting the date format using FincalOptions.date_format") - elif isinstance(data[0], datetime.datetime): + raise TypeError( + "Series does not support string data type except dates.\n" + "Hint: Try setting the date format using FincalOptions.date_format" + ) + elif isinstance(data[0], (datetime.datetime, datetime.date)): self.dtype = datetime.datetime - self.data = data + self.data = [_parse_date(i) for i in data] else: raise TypeError(f"Cannot create series object from {type(data).__name__} of {type(data[0]).__name__}") def __repr__(self): return f"{self.__class__.__name__}({self.data})" - def __getitem__(self, n): - return self.data[n] - - def __len__(self): - return len(self.data) - def __gt__(self, other): if self.dtype == bool: raise TypeError("> not supported for boolean series") + if isinstance(other, (str, datetime.datetime, datetime.date)): + other = _parse_date(other) + if self.dtype == float and isinstance(other, Number) or isinstance(other, self.dtype): gt = Series([i > other for i in self.data]) else: @@ -178,10 +196,7 @@ class TimeSeriesCore: """Defines the core building blocks of a TimeSeries object""" def __init__( - self, - data: List[Iterable], - frequency: Literal['D', 'W', 'M', 'Q', 'H', 'Y'], - date_format: str = "%Y-%m-%d" + self, data: List[Iterable], frequency: Literal["D", "W", "M", "Q", "H", "Y"], date_format: str = "%Y-%m-%d" ): """Instantiate a TimeSeries object @@ -241,42 +256,42 @@ class TimeSeriesCore: printable = {} iter_f = iter(self.time_series) - first_n = [next(iter_f) for i in range(n//2)] + first_n = [next(iter_f) for i in range(n // 2)] iter_b = reversed(self.time_series) - last_n = [next(iter_b) for i in range(n//2)] + last_n = [next(iter_b) for i in range(n // 2)] last_n.sort() - printable['start'] = [str((i, self.time_series[i])) for i in first_n] - printable['end'] = [str((i, self.time_series[i])) for i in last_n] + printable["start"] = [str((i, self.time_series[i])) for i in first_n] + printable["end"] = [str((i, self.time_series[i])) for i in last_n] return printable def __repr__(self): if len(self.time_series) > 6: printable = self._get_printable_slice(6) printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format( - self.__class__.__name__, - ',\n\t '.join(printable['start']), - ',\n\t '.join(printable['end']), - repr(self.frequency.symbol) - ) + self.__class__.__name__, + ",\n\t ".join(printable["start"]), + ",\n\t ".join(printable["end"]), + repr(self.frequency.symbol), + ) else: printable_str = "{}([{}], frequency={})".format( - self.__class__.__name__, - ',\n\t'.join([str(i) for i in self.time_series.items()]), - repr(self.frequency.symbol) - ) + self.__class__.__name__, + ",\n\t".join([str(i) for i in self.time_series.items()]), + repr(self.frequency.symbol), + ) return printable_str def __str__(self): if len(self.time_series) > 6: printable = self._get_printable_slice(6) printable_str = "[{}\n ...\n {}]".format( - ',\n '.join(printable['start']), - ',\n '.join(printable['end']), - ) + ",\n ".join(printable["start"]), + ",\n ".join(printable["end"]), + ) else: - printable_str = "[{}]".format(',\n '.join([str(i) for i in self.time_series.items()])) + printable_str = "[{}]".format(",\n ".join([str(i) for i in self.time_series.items()])) return printable_str def __getitem__(self, key): @@ -288,27 +303,25 @@ class TimeSeriesCore: else: dates_to_return = [self.dates[i] for i, j in enumerate(key) if j] data_to_return = [(key, self.time_series[key]) for key in dates_to_return] - return TimeSeriesCore(data_to_return) + return TimeSeriesCore(data_to_return, frequency=self.frequency.symbol) if isinstance(key, int): raise KeyError(f"{key}. For index based slicing, use .iloc[{key}]") - elif isinstance(key, datetime.datetime): + elif isinstance(key, (datetime.datetime, datetime.date)): + key = _parse_date(key) item = (key, self.time_series[key]) - if isinstance(key, str): - if key == 'dates': + elif isinstance(key, str): + if key == "dates": return self.dates - elif key == 'values': + elif key == "values": return self.values - try: - dt_key = datetime.datetime.strptime(key, FincalOptions.date_format) - item = (dt_key, self.time_series[dt_key]) - except ValueError: - raise KeyError(f"{repr(key)}. If you passed a date as a string, " - "try setting the date format using Fincal.Options.date_format") - except KeyError: - raise KeyError(f"{repr(key)}. This date is not available.") + + dt_key = _parse_date(key) + item = (dt_key, self.time_series[dt_key]) + elif isinstance(key, Sequence): - item = [(k, self.time_series[k]) for k in key] + keys = [_parse_date(i) for i in key] + item = [(k, self.time_series[k]) for k in keys] else: raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.") return item