1018 lines
35 KiB
Python
1018 lines
35 KiB
Python
from __future__ import annotations
|
|
|
|
import datetime
|
|
import inspect
|
|
import warnings
|
|
from collections import UserList
|
|
from dataclasses import dataclass
|
|
from numbers import Number
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Iterable,
|
|
List,
|
|
Literal,
|
|
Mapping,
|
|
Sequence,
|
|
Tuple,
|
|
Type,
|
|
)
|
|
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
from .utils import PyfactsOptions, _parse_date, _preprocess_timeseries
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Frequency:
|
|
name: str
|
|
freq_type: str
|
|
value: int
|
|
days: int
|
|
symbol: str
|
|
|
|
|
|
def date_parser(*pos):
|
|
"""Decorator to parse dates in any function
|
|
|
|
Accepts the 0-indexed position of the parameter for which date parsing needs to be done.
|
|
Works even if function is used with keyword arguments while not maintaining parameter order.
|
|
|
|
Example:
|
|
--------
|
|
>>> @date_parser(2, 3)
|
|
>>> def calculate_difference(diff_units='days', return_type='int', date1, date2):
|
|
... diff = date2 - date1
|
|
... if return_type == 'int':
|
|
... return diff.days
|
|
... return diff
|
|
...
|
|
>>> calculate_difference(date1='2019-01-01', date2='2020-01-01')
|
|
datetime.timedelta(365)
|
|
|
|
Each of the dates is automatically parsed into a datetime.datetime object from string.
|
|
"""
|
|
|
|
def parse_dates(func):
|
|
def wrapper_func(*args, **kwargs):
|
|
date_format: str = kwargs.get("date_format", None)
|
|
args: list = list(args)
|
|
sig: inspect.Signature = inspect.signature(func)
|
|
params: list = [i[0] for i in sig.parameters.items()]
|
|
|
|
for j in pos:
|
|
kwarg: str = params[j]
|
|
date = kwargs.get(kwarg, None)
|
|
in_args: bool = False
|
|
if date is None:
|
|
try:
|
|
date = args[j]
|
|
except IndexError:
|
|
pass
|
|
in_args = True
|
|
|
|
if date is None:
|
|
continue
|
|
|
|
parsed_date: datetime.datetime = _parse_date(date, date_format)
|
|
if not in_args:
|
|
kwargs[kwarg] = parsed_date
|
|
else:
|
|
args[j] = parsed_date
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapper_func
|
|
|
|
return parse_dates
|
|
|
|
|
|
class AllFrequencies:
|
|
D = Frequency("daily", "days", 1, 1, "D")
|
|
W = Frequency("weekly", "days", 7, 7, "W")
|
|
M = Frequency("monthly", "months", 1, 30, "M")
|
|
Q = Frequency("quarterly", "months", 3, 91, "Q")
|
|
H = Frequency("half-yearly", "months", 6, 182, "H")
|
|
Y = Frequency("annual", "years", 1, 365, "Y")
|
|
|
|
|
|
class _IndexSlicer:
|
|
"""Class to create a slice using iloc in TimeSeriesCore"""
|
|
|
|
def __init__(self, parent_obj: object):
|
|
self.parent = parent_obj
|
|
|
|
def __getitem__(self, n):
|
|
if isinstance(n, int):
|
|
keys: list = [self.parent.dates[n]]
|
|
else:
|
|
keys: list = self.parent.dates[n]
|
|
item = [(key, self.parent.data[key]) for key in keys]
|
|
if len(item) == 1:
|
|
return item[0]
|
|
|
|
return self.parent.__class__(item, self.parent.frequency.symbol)
|
|
|
|
def __setitem__(self, key, value):
|
|
raise NotImplementedError(
|
|
"iloc cannot be used for setting a value as value will always be inserted in order of date"
|
|
)
|
|
|
|
|
|
class Series(UserList):
|
|
"""Container for a series of objects, all objects must be of the same type"""
|
|
|
|
def __init__(
|
|
self,
|
|
data: Sequence,
|
|
dtype: Literal["date", "number", "bool"] = None,
|
|
date_format: str = None,
|
|
):
|
|
types_dict: dict = {
|
|
"date": datetime.datetime,
|
|
"datetime": datetime.datetime,
|
|
"datetime.datetime": datetime.datetime,
|
|
"float": float,
|
|
"int": float,
|
|
"number": float,
|
|
"bool": bool,
|
|
"Decimal": bool,
|
|
}
|
|
|
|
if not isinstance(data, Sequence):
|
|
raise TypeError("Series object can only be created using Sequence types")
|
|
|
|
if dtype is None:
|
|
if isinstance(data[0], (Number, datetime.datetime, datetime.date, bool)):
|
|
dtype = data[0].__class__.__name__.lower()
|
|
|
|
if dtype not in types_dict.keys():
|
|
raise ValueError("Unsupported value for data type")
|
|
|
|
if dtype in ["date", "datetime", "datetime.datetime"]:
|
|
data = [_parse_date(i, date_format) for i in data]
|
|
else:
|
|
func: Callable = types_dict[dtype]
|
|
data: list = [func(i) for i in data]
|
|
|
|
self.dtype: Type = types_dict[dtype]
|
|
self.data: Sequence = data
|
|
|
|
def __repr__(self):
|
|
return f"{self.__class__.__name__}({self.data}, data_type='{self.dtype.__name__}')"
|
|
|
|
def __getitem__(self, i):
|
|
if isinstance(i, slice):
|
|
return self.__class__(self.data[i], str(self.dtype.__name__))
|
|
else:
|
|
return self.data[i]
|
|
|
|
def _comparison_validator(self, other, skip_bool: bool = False):
|
|
"""Validates other before making comparison"""
|
|
|
|
if isinstance(other, (str, datetime.datetime, datetime.date)):
|
|
other = _parse_date(other)
|
|
return other
|
|
|
|
if self.dtype == bool and not skip_bool:
|
|
raise TypeError("Comparison operation not supported for boolean series")
|
|
|
|
elif isinstance(other, Series):
|
|
if len(self) != len(other):
|
|
raise ValueError("Length of Series must be same for comparison")
|
|
|
|
elif self.dtype != float and isinstance(other, Number):
|
|
raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}")
|
|
|
|
return other
|
|
|
|
def __gt__(self, other):
|
|
other = self._comparison_validator(other)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j > other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i > other for i in self.data], "bool")
|
|
|
|
def __ge__(self, other):
|
|
other = self._comparison_validator(other)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j >= other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i >= other for i in self.data], "bool")
|
|
|
|
def __lt__(self, other):
|
|
other = self._comparison_validator(other)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j < other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i < other for i in self.data], "bool")
|
|
|
|
def __le__(self, other):
|
|
other = self._comparison_validator(other)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j <= other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i <= other for i in self.data], "bool")
|
|
|
|
def __eq__(self, other):
|
|
other = self._comparison_validator(other)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j == other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i == other for i in self.data], "bool")
|
|
|
|
def __ne__(self, other):
|
|
other = self._comparison_validator(other)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j != other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i != other for i in self.data], "bool")
|
|
|
|
def __and__(self, other):
|
|
other = self._comparison_validator(other, skip_bool=True)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j and other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i and other for i in self.data], "bool")
|
|
|
|
def __or__(self, other):
|
|
other = self._comparison_validator(other, skip_bool=True)
|
|
|
|
if isinstance(other, Series):
|
|
return Series([j or other[i] for i, j in enumerate(self)], "bool")
|
|
|
|
return Series([i or other for i in self.data], "bool")
|
|
|
|
def _math_validator(self, other):
|
|
|
|
if not isinstance(other, (Series, Number, datetime.timedelta, relativedelta, datetime.datetime, datetime.date)):
|
|
return NotImplemented
|
|
|
|
if isinstance(other, Series):
|
|
if len(self) != len(other):
|
|
raise ValueError("Arithmatic operations cannot be performed on objects of different lengths.")
|
|
|
|
if self.dtype == bool or other.dtype == bool:
|
|
raise TypeError("Arithmatic operations cannot be performed on boolean series.")
|
|
|
|
if self.dtype == float and not other.dtype == float:
|
|
raise TypeError(
|
|
"Arithmatic operation cannot be performed between "
|
|
f"'{self.dtype.__name__}' and '{other.dtype.__name__}'"
|
|
)
|
|
|
|
if self.dtype == datetime.datetime:
|
|
raise TypeError(
|
|
"Arithmatic operation cannot be performed between '"
|
|
f"'{self.dtype.__name__}' and '{other.dtype.__name__}'"
|
|
)
|
|
|
|
return
|
|
|
|
elif self.dtype == float and not isinstance(other, Number):
|
|
raise TypeError(
|
|
f"Arithmatic operation cannot be performed between '{self.dtype}' and '{other.__class__.__name__}'"
|
|
)
|
|
|
|
elif self.dtype == datetime.datetime and not isinstance(other, (datetime.timedelta, relativedelta)):
|
|
raise TypeError(
|
|
f"Arithmatic operation cannot be performed between '{self.dtype.__name__}' and "
|
|
f"'{other.__class__.__name__}'\nHint: Try using timedelta or relativedelta objects."
|
|
)
|
|
|
|
return other
|
|
|
|
def __add__(self, other):
|
|
if self._math_validator(other) == NotImplemented:
|
|
return NotImplemented
|
|
|
|
if isinstance(other, Series):
|
|
return self.__class__([j + other[i] for i, j in enumerate(self)], self.dtype.__name__)
|
|
|
|
if isinstance(other, (Number, datetime.timedelta, relativedelta)):
|
|
return self.__class__([i + other for i in self], self.dtype.__name__)
|
|
|
|
|
|
def _validate_frequency(
|
|
data: List[Tuple[datetime.datetime, float]],
|
|
provided_frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
|
|
raise_error: bool = True,
|
|
):
|
|
"""Checks the data and returns the expected frequency."""
|
|
if provided_frequency is not None:
|
|
provided_frequency = getattr(AllFrequencies, provided_frequency)
|
|
start_date = data[0][0]
|
|
end_date = data[-1][0]
|
|
overall_gap = (end_date - start_date).days
|
|
num_data_points = len(data)
|
|
# days_per_data = num_data_points / overall_gap
|
|
|
|
expected_data_points = {
|
|
"D": (round(overall_gap * 0.6, 0), round(overall_gap * 1.05 + 1, 0)),
|
|
"W": (round(overall_gap / 7 * 0.7, 0), round(overall_gap / 7 * 1.05 + 1, 0)),
|
|
"M": (round(overall_gap / 30 * 0.8, 0), round(overall_gap / 30 * 1.05 + 1, 0)),
|
|
"Q": (round(overall_gap / 92 * 0.85, 0), round(overall_gap / 92 * 1.05 + 1, 0)),
|
|
"H": (round(overall_gap / 182 * 0.85, 0), round(overall_gap / 182 * 1.05 + 1, 0)),
|
|
"A": (round(overall_gap / 365 * 0.85, 0), round(overall_gap / 365 * 1.05 + 1, 0)),
|
|
}
|
|
|
|
for frequency, (min, max) in expected_data_points.items():
|
|
if min <= num_data_points <= max:
|
|
expected_frequency = frequency
|
|
break
|
|
else:
|
|
if raise_error:
|
|
raise ValueError("Data does not match any known frequency. Perhaps you have too many missing data points.")
|
|
else:
|
|
expected_frequency = provided_frequency.symbol
|
|
|
|
expected_data_points = expected_data_points[expected_frequency]
|
|
if provided_frequency is None:
|
|
frequency_match = None
|
|
elif provided_frequency.symbol == expected_frequency:
|
|
frequency_match = True
|
|
else:
|
|
frequency_match = False
|
|
|
|
return {
|
|
"gap": overall_gap,
|
|
"expected_data_points": expected_data_points,
|
|
"actual_data_points": num_data_points,
|
|
"expected_frequency": expected_frequency,
|
|
"frequency_match": frequency_match,
|
|
}
|
|
|
|
|
|
@Mapping.register
|
|
class TimeSeriesCore:
|
|
"""Defines the core building blocks of a TimeSeries object"""
|
|
|
|
def __init__(
|
|
self,
|
|
ts_data: List[Iterable] | Mapping,
|
|
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
|
|
validate_frequency: bool = True,
|
|
date_format: str = "%Y-%m-%d",
|
|
):
|
|
"""Instantiate a TimeSeriesCore object
|
|
|
|
Parameters
|
|
----------
|
|
ts_data : List[Iterable] | Mapping
|
|
Time Series data in the form of list of tuples or dictionary.
|
|
The first element of each tuple should be a date and second element should be a value.
|
|
In case of dictionary, the key should be the date.
|
|
|
|
frequency : str, optional
|
|
The frequency of the time series.
|
|
Valid values are {D, W, M, Q, H, Y}
|
|
If no frequency is provided, it will be inferred from the data.
|
|
Frequency assignment uses approximation and hence the assignment may be incorrect if
|
|
there are fewer than 12 data points.
|
|
|
|
validate_frequency: boolean, default True
|
|
Whether the provided frequency should be validated against the data.
|
|
When set to True, if the expected number of data points are not withint the expected limits,
|
|
it will raise an Exception and object creation will fail.
|
|
Validation is performed only if data contains at least 12 data points, as a fewer number of
|
|
data points are not sufficient to determine the frequency correctly.
|
|
This parameter will be ignored if frequency is not provided.
|
|
refer core._validate_frequency for more details.
|
|
|
|
date_format : str, optional, default "%Y-%m-%d"
|
|
Specify the format of the date
|
|
Required only if the first argument of tuples is a string. Otherwise ignored.
|
|
"""
|
|
|
|
ts_data = _preprocess_timeseries(ts_data, date_format=date_format)
|
|
|
|
validation = _validate_frequency(data=ts_data, provided_frequency=frequency, raise_error=validate_frequency)
|
|
if frequency is None:
|
|
frequency = validation["expected_frequency"]
|
|
|
|
self.frequency = getattr(AllFrequencies, frequency)
|
|
|
|
if validate_frequency and len(ts_data) >= 12:
|
|
if validation["frequency_match"] is not None and not validation["frequency_match"]:
|
|
raise ValueError(
|
|
f"Data appears to be of frquency {validation['expected_frequency']!r}, "
|
|
f"but {frequency!r} was provided. Pass the correct frequency."
|
|
"\nPass validate_frequency=False to disable this validation."
|
|
)
|
|
|
|
self.data = dict(ts_data)
|
|
if len(self.data) != len(ts_data):
|
|
warnings.warn("The input data contains duplicate dates which have been ignored.")
|
|
# self.frequency: Frequency = getattr(AllFrequencies, frequency)
|
|
self.iter_num: int = -1
|
|
self._dates: list = None
|
|
self._values: list = None
|
|
self._start_date: datetime.datetime = None
|
|
self._end_date: datetime.datetime = None
|
|
|
|
@property
|
|
def dates(self) -> Series:
|
|
"""Get a list of all the dates in the TimeSeries object"""
|
|
|
|
if self._dates is None or len(self._dates) != len(self.data):
|
|
self._dates = list(self.data.keys())
|
|
|
|
return Series(self._dates, "date")
|
|
|
|
@property
|
|
def values(self) -> Series:
|
|
"""Get a list of all the Values in the TimeSeries object"""
|
|
|
|
if self._values is None or len(self._values) != len(self.data):
|
|
self._values = list(self.data.values())
|
|
|
|
return Series(self._values, "number")
|
|
|
|
@property
|
|
def start_date(self) -> datetime.datetime:
|
|
"""The first date in the TimeSeries object"""
|
|
|
|
return self.dates[0]
|
|
|
|
@property
|
|
def end_date(self) -> datetime.datetime:
|
|
"""The last date in the TimeSeries object"""
|
|
|
|
return self.dates[-1]
|
|
|
|
def _get_printable_slice(self, n: int):
|
|
"""Helper function for __repr__ and __str__
|
|
|
|
Returns a slice of the dataframe from beginning and end.
|
|
"""
|
|
|
|
printable = {}
|
|
iter_f = iter(self.data)
|
|
first_n = [next(iter_f) for i in range(n // 2)]
|
|
|
|
iter_b = reversed(self.data)
|
|
last_n = [next(iter_b) for i in range(n // 2)]
|
|
last_n.sort()
|
|
|
|
printable["start"] = [str((i, self.data[i])) for i in first_n]
|
|
printable["end"] = [str((i, self.data[i])) for i in last_n]
|
|
return printable
|
|
|
|
def __repr__(self):
|
|
if len(self.data) > 6:
|
|
printable = self._get_printable_slice(6)
|
|
printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format(
|
|
self.__class__.__name__,
|
|
",\n\t ".join(printable["start"]),
|
|
",\n\t ".join(printable["end"]),
|
|
repr(self.frequency.symbol),
|
|
)
|
|
else:
|
|
printable_str = "{}([{}], frequency={})".format(
|
|
self.__class__.__name__,
|
|
",\n\t".join([str(i) for i in self.data.items()]),
|
|
repr(self.frequency.symbol),
|
|
)
|
|
return printable_str
|
|
|
|
def __str__(self):
|
|
if len(self.data) > 6:
|
|
printable = self._get_printable_slice(6)
|
|
printable_str = "[{}\n ...\n {}]".format(
|
|
",\n ".join(printable["start"]),
|
|
",\n ".join(printable["end"]),
|
|
)
|
|
else:
|
|
printable_str = "[{}]".format(",\n ".join([str(i) for i in self.data.items()]))
|
|
return printable_str
|
|
|
|
@date_parser(1)
|
|
def _get_item_from_date(self, date: str | datetime.datetime):
|
|
"""Helper function to retrieve item using a date"""
|
|
|
|
return self.get(date, raise_error=True)
|
|
|
|
def _get_item_from_key(self, key: str | datetime.datetime):
|
|
"""Helper function to implement special keys"""
|
|
|
|
if isinstance(key, int):
|
|
raise KeyError(f"{key}. \nHint: use .iloc[{key}] for index based slicing.")
|
|
|
|
if key in ["dates", "values"]:
|
|
return getattr(self, key)
|
|
|
|
return self._get_item_from_date(key)
|
|
|
|
def _get_item_from_list(self, date_list: Sequence[str | datetime.datetime]):
|
|
"""Helper function to retrieve items using a list"""
|
|
|
|
data_to_return = [self._get_item_from_key(key) for key in date_list]
|
|
return self.__class__(data_to_return, frequency=self.frequency.symbol, validate_frequency=False)
|
|
|
|
def _get_item_from_series(self, series: Series):
|
|
"""Helper function to retrieve item using a Series object
|
|
|
|
A Series of type bool of equal length to the time series can be used.
|
|
A Series of dates can be used to filter out a set of dates.
|
|
"""
|
|
if series.dtype == bool:
|
|
if len(series) != len(self.dates):
|
|
raise ValueError(f"Length of Series: {len(series)} did not match length of object: {len(self.dates)}")
|
|
dates_to_return = [self.dates[i] for i, j in enumerate(series) if j]
|
|
elif series.dtype == datetime.datetime:
|
|
dates_to_return = list(series)
|
|
else:
|
|
raise TypeError(f"Cannot slice {self.__class__.__name__} using a Series of {series.dtype.__name__}")
|
|
|
|
return self._get_item_from_list(dates_to_return)
|
|
|
|
def __getitem__(self, key):
|
|
if isinstance(key, (int, str, datetime.datetime, datetime.date)):
|
|
return self._get_item_from_key(key)
|
|
|
|
if isinstance(key, Series):
|
|
return self._get_item_from_series(key)
|
|
|
|
if isinstance(key, Sequence):
|
|
return self._get_item_from_list(key)
|
|
|
|
raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.")
|
|
|
|
@date_parser(1)
|
|
def __setitem__(self, key: str | datetime.datetime, value: Number) -> None:
|
|
if not isinstance(value, Number):
|
|
raise TypeError("Only numerical values can be stored in TimeSeries")
|
|
|
|
if key in self.data:
|
|
self.data[key] = float(value)
|
|
else:
|
|
self.data.update({key: float(value)})
|
|
self.data = dict(sorted(self.data.items()))
|
|
|
|
@date_parser(1)
|
|
def __delitem__(self, key):
|
|
del self.data[key]
|
|
|
|
def _comparison_validator(self, other):
|
|
"""Validates the data before comparison is performed"""
|
|
|
|
if not isinstance(other, (Number, Series, TimeSeriesCore)):
|
|
raise TypeError(
|
|
f"Comparison cannot be performed between '{self.__class__.__name__}' and '{other.__class__.__name__}'"
|
|
)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
if any(self.dates != other.dates):
|
|
raise ValueError(
|
|
"Only objects with same set of dates can be compared.\n"
|
|
"Hint: use TimeSeries.sync() method to sync dates of two TimeSeries objects."
|
|
)
|
|
|
|
if isinstance(other, Series):
|
|
if other.dtype != float:
|
|
raise TypeError("Only Series of type float can be used for comparison")
|
|
|
|
if len(self) != len(other):
|
|
raise ValueError("Length of series does not match length of object")
|
|
|
|
def __gt__(self, other):
|
|
self._comparison_validator(other)
|
|
|
|
if isinstance(other, Number):
|
|
data = {k: v > other for k, v in self.data.items()}
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
data = {dt: val > other[dt][1] for dt, val in self.data.items()}
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val > other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
|
|
return self.__class__(data, frequency=self.frequency.symbol)
|
|
|
|
def __ge__(self, other):
|
|
self._comparison_validator(other)
|
|
|
|
if isinstance(other, Number):
|
|
data = {k: v >= other for k, v in self.data.items()}
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
data = {dt: val >= other[dt][1] for dt, val in self.data.items()}
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val >= other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
|
|
return self.__class__(data, frequency=self.frequency.symbol)
|
|
|
|
def __lt__(self, other):
|
|
self._comparison_validator(other)
|
|
|
|
if isinstance(other, Number):
|
|
data = {k: v < other for k, v in self.data.items()}
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
data = {dt: val < other[dt][1] for dt, val in self.data.items()}
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val < other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
|
|
return self.__class__(data, frequency=self.frequency.symbol)
|
|
|
|
def __le__(self, other):
|
|
self._comparison_validator(other)
|
|
|
|
if isinstance(other, Number):
|
|
data = {k: v <= other for k, v in self.data.items()}
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
data = {dt: val <= other[dt][1] for dt, val in self.data.items()}
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val <= other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
|
|
return self.__class__(data, frequency=self.frequency.symbol)
|
|
|
|
def __eq__(self, other):
|
|
self._comparison_validator(other)
|
|
|
|
if isinstance(other, Number):
|
|
data = {k: v == other for k, v in self.data.items()}
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
data = {dt: val == other[dt][1] for dt, val in self.data.items()}
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val == other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
|
|
return self.__class__(data, frequency=self.frequency.symbol)
|
|
|
|
def __ne__(self, other):
|
|
self._comparison_validator(other)
|
|
|
|
if isinstance(other, Number):
|
|
data = {k: v != other for k, v in self.data.items()}
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
data = {dt: val != other[dt][1] for dt, val in self.data.items()}
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val != other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
|
|
return self.__class__(data, frequency=self.frequency.symbol)
|
|
|
|
def __iter__(self):
|
|
self.n = 0
|
|
return self
|
|
|
|
def __next__(self):
|
|
if self.n >= len(self.dates):
|
|
raise StopIteration
|
|
else:
|
|
key = self.dates[self.n]
|
|
self.n += 1
|
|
return key, self.data[key]
|
|
|
|
def __len__(self):
|
|
return len(self.data)
|
|
|
|
@date_parser(1)
|
|
def __contains__(self, key: object) -> bool:
|
|
return key in self.data
|
|
|
|
def _arithmatic_validator(self, other):
|
|
"""Validates input data before performing math operatios"""
|
|
|
|
if not isinstance(other, (Number, Series, TimeSeriesCore)):
|
|
raise TypeError(
|
|
"Cannot perform mathematical operations between "
|
|
f"'{self.__class__.__name__}' and '{other.__class__.__name__}'"
|
|
)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
if len(other) != len(self):
|
|
raise ValueError("Can only perform mathematical operations between objects of same length.")
|
|
if any(self.dates != other.dates):
|
|
raise ValueError("Can only perform mathematical operations between objects having same dates.")
|
|
|
|
if isinstance(other, Series):
|
|
if other.dtype != float:
|
|
raise TypeError(
|
|
"Cannot perform mathematical operations with "
|
|
f"'{other.__class__.__name__}' of type '{other.dtype}'"
|
|
)
|
|
if len(other) != len(self):
|
|
raise ValueError("Can only perform mathematical operations between objects of same length.")
|
|
|
|
def __add__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val + other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val + other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __sub__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val - other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val - other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __truediv__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val / other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val / other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __floordiv__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val // other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val // other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __mul__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val * other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val * other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __mod__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val % other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val % other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __pow__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val ** other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val**other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __radd__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val + other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val + other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __rsub__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: other[i] - val for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: other - val for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __rtruediv__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: other[i] / val for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: other / val for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __rfloordiv__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: other[i] // val for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: other // val for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __rmul__(self, other):
|
|
self._arithmatic_validator(other)
|
|
|
|
if isinstance(other, TimeSeriesCore):
|
|
other = other.values
|
|
|
|
if isinstance(other, Series):
|
|
data = {dt: val * other[i] for i, (dt, val) in enumerate(self.data.items())}
|
|
elif isinstance(other, Number):
|
|
data = {dt: val * other for dt, val in self.data.items()}
|
|
|
|
return self.__class__(data, self.frequency.symbol)
|
|
|
|
def __rpow__(self, _):
|
|
raise NotImplementedError("This operation is not supported.")
|
|
|
|
@date_parser(1)
|
|
def get(
|
|
self,
|
|
date: str | datetime.datetime,
|
|
default: Any = None,
|
|
closest: Literal["previous", "next"] = None,
|
|
limit: int = 1000,
|
|
raise_error: bool = False,
|
|
) -> tuple | Any:
|
|
"""Get a value for a particular key. Return a default value on KeyError
|
|
|
|
Parameters
|
|
----------
|
|
date:
|
|
Date for which the value needs to be fetched.
|
|
|
|
default: Optional, Default None
|
|
Default value to be returned in case the date is not found. Default None.
|
|
|
|
closest:
|
|
Look for previous or next value when date is not found.
|
|
If not specified, the value set in FincalOptions is used
|
|
|
|
limit:
|
|
Maximum number of days to look for the closest available date.
|
|
If exceeded without finding a date, default value will be returned.
|
|
|
|
raise_error : bool, optional
|
|
Whether to raise an error and ignore the default value.
|
|
Meant for use with __getitem__.
|
|
|
|
Returns
|
|
-------
|
|
tuple | Any
|
|
_description_
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If the argument for closest is not valid.
|
|
|
|
KeyError
|
|
if raise_error is true and date is not found
|
|
"""
|
|
|
|
if closest is None:
|
|
closest = PyfactsOptions.get_closest
|
|
|
|
time_delta_dict = {"exact": 0, "previous": -1, "next": 1}
|
|
|
|
if closest not in time_delta_dict:
|
|
raise ValueError(f"Invalid argument from closest {closest!r}")
|
|
delta = relativedelta(days=time_delta_dict[closest])
|
|
|
|
for _ in range(limit):
|
|
try:
|
|
return date, self.data[date]
|
|
except KeyError:
|
|
if not delta:
|
|
break
|
|
date += delta
|
|
|
|
if raise_error:
|
|
raise KeyError(date)
|
|
return default
|
|
|
|
@property
|
|
def iloc(self) -> Mapping:
|
|
"""Returns an item or a set of items based on index
|
|
|
|
supports slicing using numerical index.
|
|
Accepts integers or Python slice objects
|
|
|
|
Usage
|
|
-----
|
|
>>> ts = TimeSeries(data, frequency='D')
|
|
>>> ts.iloc[0] # get the first value
|
|
>>> ts.iloc[-1] # get the last value
|
|
>>> ts.iloc[:3] # get the first 3 values
|
|
>>> ts.illoc[-3:] # get the last 3 values
|
|
>>> ts.iloc[5:10] # get five values starting from the fifth value
|
|
>>> ts.iloc[::2] # get every alternate date
|
|
"""
|
|
|
|
return _IndexSlicer(self)
|
|
|
|
def head(self, n: int = 6) -> TimeSeriesCore:
|
|
"""Returns the first n items of the TimeSeries object"""
|
|
|
|
return self.iloc[:n]
|
|
|
|
def tail(self, n: int = 6) -> TimeSeriesCore:
|
|
"""Returns the last n items of the TimeSeries object"""
|
|
|
|
return self.iloc[-n:]
|
|
|
|
def items(self):
|
|
return self.data.items()
|
|
|
|
def update(self, items: dict):
|
|
for k, v in items.items():
|
|
self[k] = v
|
|
|
|
def to_dict(self, date_as_string: bool = False, string_date_format: str = "default") -> dict:
|
|
"""Convert time series to a dictionary
|
|
|
|
Parameters
|
|
----------
|
|
date_as_string: boolean, default False
|
|
Whether date should be converted to string.
|
|
If False, then the output will contain datetime.datetime objects
|
|
|
|
string_date_format: datetime library compatible format string
|
|
If date is to be output as string, pass the format here.
|
|
If it is left as default, the format set in fincal_options will be used.
|
|
"""
|
|
|
|
if not date_as_string:
|
|
return self.data
|
|
|
|
if string_date_format == "default":
|
|
string_date_format = PyfactsOptions.date_format
|
|
|
|
data = {datetime.datetime.strftime(dt, string_date_format): val for dt, val in self.data.items()}
|
|
return data
|
|
|
|
def to_list(self, date_as_string: bool = False, string_date_format: str = "default") -> List[tuple]:
|
|
"""Convert time series to a list of tuples
|
|
|
|
Parameters
|
|
----------
|
|
date_as_string: boolean, optional
|
|
Whether date should be converted to string.
|
|
If False, then the output will contain datetime.datetime objects
|
|
|
|
string_date_format : str, optional
|
|
If date is to be output as string, pass the format here.
|
|
If it is left as default, the format set in fincal_options will be used.
|
|
"""
|
|
|
|
if not date_as_string:
|
|
return list(self.data.items())
|
|
|
|
if string_date_format == "default":
|
|
string_date_format = PyfactsOptions.date_format
|
|
|
|
data = [(datetime.datetime.strftime(dt, string_date_format), val) for dt, val in self.data.items()]
|
|
return data
|