implemented _parse_date function to formalise date parsing

made Series a subclass of UserList to improve compatibility
This commit is contained in:
Gourav Kumar 2022-02-21 13:09:58 +05:30
parent 053a93900a
commit cbace875c1

View File

@ -1,4 +1,5 @@
import datetime import datetime
from collections import UserList
from dataclasses import dataclass from dataclasses import dataclass
from numbers import Number from numbers import Number
from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union
@ -6,8 +7,8 @@ from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union
@dataclass @dataclass
class FincalOptions: class FincalOptions:
date_format: str = '%Y-%m-%d' date_format: str = "%Y-%m-%d"
closest: str = 'before' # after closest: str = "before" # after
@dataclass(frozen=True) @dataclass(frozen=True)
@ -20,12 +21,12 @@ class Frequency:
class AllFrequencies: class AllFrequencies:
D = Frequency('daily', 'days', 1, 1, 'D') D = Frequency("daily", "days", 1, 1, "D")
W = Frequency('weekly', 'days', 7, 7, 'W') W = Frequency("weekly", "days", 7, 7, "W")
M = Frequency('monthly', 'months', 1, 30, 'M') M = Frequency("monthly", "months", 1, 30, "M")
Q = Frequency('quarterly', 'months', 3, 91, 'Q') Q = Frequency("quarterly", "months", 3, 91, "Q")
H = Frequency('half-yearly', 'months', 6, 182, 'H') H = Frequency("half-yearly", "months", 6, 182, "H")
Y = Frequency('annual', 'years', 1, 365, 'Y') Y = Frequency("annual", "years", 1, 365, "Y")
def _preprocess_timeseries( def _preprocess_timeseries(
@ -33,9 +34,9 @@ def _preprocess_timeseries(
Sequence[Iterable[Union[str, datetime.datetime, float]]], Sequence[Iterable[Union[str, datetime.datetime, float]]],
Sequence[Mapping[str, Union[float, datetime.datetime]]], Sequence[Mapping[str, Union[float, datetime.datetime]]],
Sequence[Mapping[Union[str, datetime.datetime], float]], Sequence[Mapping[Union[str, datetime.datetime], float]],
Mapping[Union[str, datetime.datetime], float] Mapping[Union[str, datetime.datetime], float],
], ],
date_format: str date_format: str,
) -> List[Tuple[datetime.datetime, float]]: ) -> List[Tuple[datetime.datetime, float]]:
"""Converts any type of list to the correct type""" """Converts any type of list to the correct type"""
@ -75,12 +76,12 @@ def _preprocess_timeseries(
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
"""Checks the arguments and returns appropriate timedelta objects""" """Checks the arguments and returns appropriate timedelta objects"""
deltas = {'exact': 0, 'previous': -1, 'next': 1} deltas = {"exact": 0, "previous": -1, "next": 1}
if closest not in deltas.keys(): if closest not in deltas.keys():
raise ValueError(f"Invalid closest argument: {closest}") raise ValueError(f"Invalid closest argument: {closest}")
as_on_match = closest if as_on_match == 'closest' else as_on_match as_on_match = closest if as_on_match == "closest" else as_on_match
prior_match = closest if prior_match == 'closest' else prior_match prior_match = closest if prior_match == "closest" else prior_match
if as_on_match in deltas.keys(): if as_on_match in deltas.keys():
as_on_delta = datetime.timedelta(days=deltas[as_on_match]) as_on_delta = datetime.timedelta(days=deltas[as_on_match])
@ -95,6 +96,24 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
return as_on_delta, prior_delta return as_on_delta, prior_delta
def _parse_date(date: str, date_format: str = None):
"""Parses date and handles errors"""
if isinstance(date, (datetime.datetime, datetime.date)):
return datetime.datetime.fromordinal(date.toordinal())
if date_format is None:
date_format = FincalOptions.date_format
try:
date = datetime.datetime.strptime(date, date_format)
except TypeError:
raise Exception("Date does not seem to be valid date-like string")
except ValueError:
raise Exception("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?")
return date
class _IndexSlicer: class _IndexSlicer:
def __init__(self, parent_obj): def __init__(self, parent_obj):
self.parent = parent_obj self.parent = parent_obj
@ -112,7 +131,7 @@ class _IndexSlicer:
return item return item
class Series: class Series(UserList):
def __init__(self, data): def __init__(self, data):
if not isinstance(data, Sequence): if not isinstance(data, Sequence):
raise TypeError("Series only supports creation using Sequence types") raise TypeError("Series only supports creation using Sequence types")
@ -128,27 +147,26 @@ class Series:
data = [datetime.datetime.strptime(i, FincalOptions.date_format) for i in data] data = [datetime.datetime.strptime(i, FincalOptions.date_format) for i in data]
self.dtype = datetime.datetime self.dtype = datetime.datetime
except ValueError: except ValueError:
raise TypeError("Series does not support string data type except dates.\n" raise TypeError(
"Hint: Try setting the date format using FincalOptions.date_format") "Series does not support string data type except dates.\n"
elif isinstance(data[0], datetime.datetime): "Hint: Try setting the date format using FincalOptions.date_format"
)
elif isinstance(data[0], (datetime.datetime, datetime.date)):
self.dtype = datetime.datetime self.dtype = datetime.datetime
self.data = data self.data = [_parse_date(i) for i in data]
else: else:
raise TypeError(f"Cannot create series object from {type(data).__name__} of {type(data[0]).__name__}") raise TypeError(f"Cannot create series object from {type(data).__name__} of {type(data[0]).__name__}")
def __repr__(self): def __repr__(self):
return f"{self.__class__.__name__}({self.data})" return f"{self.__class__.__name__}({self.data})"
def __getitem__(self, n):
return self.data[n]
def __len__(self):
return len(self.data)
def __gt__(self, other): def __gt__(self, other):
if self.dtype == bool: if self.dtype == bool:
raise TypeError("> not supported for boolean series") raise TypeError("> not supported for boolean series")
if isinstance(other, (str, datetime.datetime, datetime.date)):
other = _parse_date(other)
if self.dtype == float and isinstance(other, Number) or isinstance(other, self.dtype): if self.dtype == float and isinstance(other, Number) or isinstance(other, self.dtype):
gt = Series([i > other for i in self.data]) gt = Series([i > other for i in self.data])
else: else:
@ -178,10 +196,7 @@ class TimeSeriesCore:
"""Defines the core building blocks of a TimeSeries object""" """Defines the core building blocks of a TimeSeries object"""
def __init__( def __init__(
self, self, data: List[Iterable], frequency: Literal["D", "W", "M", "Q", "H", "Y"], date_format: str = "%Y-%m-%d"
data: List[Iterable],
frequency: Literal['D', 'W', 'M', 'Q', 'H', 'Y'],
date_format: str = "%Y-%m-%d"
): ):
"""Instantiate a TimeSeries object """Instantiate a TimeSeries object
@ -241,42 +256,42 @@ class TimeSeriesCore:
printable = {} printable = {}
iter_f = iter(self.time_series) iter_f = iter(self.time_series)
first_n = [next(iter_f) for i in range(n//2)] first_n = [next(iter_f) for i in range(n // 2)]
iter_b = reversed(self.time_series) iter_b = reversed(self.time_series)
last_n = [next(iter_b) for i in range(n//2)] last_n = [next(iter_b) for i in range(n // 2)]
last_n.sort() last_n.sort()
printable['start'] = [str((i, self.time_series[i])) for i in first_n] printable["start"] = [str((i, self.time_series[i])) for i in first_n]
printable['end'] = [str((i, self.time_series[i])) for i in last_n] printable["end"] = [str((i, self.time_series[i])) for i in last_n]
return printable return printable
def __repr__(self): def __repr__(self):
if len(self.time_series) > 6: if len(self.time_series) > 6:
printable = self._get_printable_slice(6) printable = self._get_printable_slice(6)
printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format( printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format(
self.__class__.__name__, self.__class__.__name__,
',\n\t '.join(printable['start']), ",\n\t ".join(printable["start"]),
',\n\t '.join(printable['end']), ",\n\t ".join(printable["end"]),
repr(self.frequency.symbol) repr(self.frequency.symbol),
) )
else: else:
printable_str = "{}([{}], frequency={})".format( printable_str = "{}([{}], frequency={})".format(
self.__class__.__name__, self.__class__.__name__,
',\n\t'.join([str(i) for i in self.time_series.items()]), ",\n\t".join([str(i) for i in self.time_series.items()]),
repr(self.frequency.symbol) repr(self.frequency.symbol),
) )
return printable_str return printable_str
def __str__(self): def __str__(self):
if len(self.time_series) > 6: if len(self.time_series) > 6:
printable = self._get_printable_slice(6) printable = self._get_printable_slice(6)
printable_str = "[{}\n ...\n {}]".format( printable_str = "[{}\n ...\n {}]".format(
',\n '.join(printable['start']), ",\n ".join(printable["start"]),
',\n '.join(printable['end']), ",\n ".join(printable["end"]),
) )
else: else:
printable_str = "[{}]".format(',\n '.join([str(i) for i in self.time_series.items()])) printable_str = "[{}]".format(",\n ".join([str(i) for i in self.time_series.items()]))
return printable_str return printable_str
def __getitem__(self, key): def __getitem__(self, key):
@ -288,27 +303,25 @@ class TimeSeriesCore:
else: else:
dates_to_return = [self.dates[i] for i, j in enumerate(key) if j] dates_to_return = [self.dates[i] for i, j in enumerate(key) if j]
data_to_return = [(key, self.time_series[key]) for key in dates_to_return] data_to_return = [(key, self.time_series[key]) for key in dates_to_return]
return TimeSeriesCore(data_to_return) return TimeSeriesCore(data_to_return, frequency=self.frequency.symbol)
if isinstance(key, int): if isinstance(key, int):
raise KeyError(f"{key}. For index based slicing, use .iloc[{key}]") raise KeyError(f"{key}. For index based slicing, use .iloc[{key}]")
elif isinstance(key, datetime.datetime): elif isinstance(key, (datetime.datetime, datetime.date)):
key = _parse_date(key)
item = (key, self.time_series[key]) item = (key, self.time_series[key])
if isinstance(key, str): elif isinstance(key, str):
if key == 'dates': if key == "dates":
return self.dates return self.dates
elif key == 'values': elif key == "values":
return self.values return self.values
try:
dt_key = datetime.datetime.strptime(key, FincalOptions.date_format) dt_key = _parse_date(key)
item = (dt_key, self.time_series[dt_key]) item = (dt_key, self.time_series[dt_key])
except ValueError:
raise KeyError(f"{repr(key)}. If you passed a date as a string, "
"try setting the date format using Fincal.Options.date_format")
except KeyError:
raise KeyError(f"{repr(key)}. This date is not available.")
elif isinstance(key, Sequence): elif isinstance(key, Sequence):
item = [(k, self.time_series[k]) for k in key] keys = [_parse_date(i) for i in key]
item = [(k, self.time_series[k]) for k in keys]
else: else:
raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.") raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.")
return item return item