2022-02-19 17:33:00 +00:00
|
|
|
import datetime
|
|
|
|
from dataclasses import dataclass
|
2022-02-20 17:19:45 +00:00
|
|
|
from numbers import Number
|
2022-02-20 06:06:56 +00:00
|
|
|
from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union
|
2022-02-19 17:33:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
2022-02-20 13:30:39 +00:00
|
|
|
class FincalOptions:
|
2022-02-19 17:33:00 +00:00
|
|
|
date_format: str = '%Y-%m-%d'
|
|
|
|
closest: str = 'before' # after
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
class Frequency:
|
|
|
|
name: str
|
|
|
|
freq_type: str
|
|
|
|
value: int
|
|
|
|
days: int
|
2022-02-20 10:36:34 +00:00
|
|
|
symbol: str
|
2022-02-19 17:33:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
class AllFrequencies:
|
2022-02-20 10:36:34 +00:00
|
|
|
D = Frequency('daily', 'days', 1, 1, 'D')
|
|
|
|
W = Frequency('weekly', 'days', 7, 7, 'W')
|
|
|
|
M = Frequency('monthly', 'months', 1, 30, 'M')
|
|
|
|
Q = Frequency('quarterly', 'months', 3, 91, 'Q')
|
|
|
|
H = Frequency('half-yearly', 'months', 6, 182, 'H')
|
|
|
|
Y = Frequency('annual', 'years', 1, 365, 'Y')
|
2022-02-19 17:33:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
def _preprocess_timeseries(
|
|
|
|
data: Union[
|
2022-02-20 04:10:45 +00:00
|
|
|
Sequence[Iterable[Union[str, datetime.datetime, float]]],
|
|
|
|
Sequence[Mapping[str, Union[float, datetime.datetime]]],
|
|
|
|
Sequence[Mapping[Union[str, datetime.datetime], float]],
|
|
|
|
Mapping[Union[str, datetime.datetime], float]
|
2022-02-19 17:33:00 +00:00
|
|
|
],
|
|
|
|
date_format: str
|
|
|
|
) -> List[Tuple[datetime.datetime, float]]:
|
|
|
|
"""Converts any type of list to the correct type"""
|
|
|
|
|
2022-02-20 04:10:45 +00:00
|
|
|
if isinstance(data, Sequence):
|
|
|
|
if isinstance(data[0], Mapping):
|
2022-02-19 17:33:00 +00:00
|
|
|
if len(data[0].keys()) == 2:
|
|
|
|
current_data = [tuple(i.values()) for i in data]
|
|
|
|
elif len(data[0].keys()) == 1:
|
|
|
|
current_data = [tuple(*i.items()) for i in data]
|
|
|
|
else:
|
|
|
|
raise TypeError("Could not parse the data")
|
|
|
|
current_data = _preprocess_timeseries(current_data, date_format)
|
|
|
|
|
2022-02-20 04:10:45 +00:00
|
|
|
elif isinstance(data[0], Sequence):
|
2022-02-19 17:33:00 +00:00
|
|
|
if isinstance(data[0][0], str):
|
|
|
|
current_data = []
|
|
|
|
for i in data:
|
|
|
|
row = datetime.datetime.strptime(i[0], date_format), i[1]
|
|
|
|
current_data.append(row)
|
|
|
|
elif isinstance(data[0][0], datetime.datetime):
|
|
|
|
current_data = [(i, j) for i, j in data]
|
|
|
|
else:
|
|
|
|
raise TypeError("Could not parse the data")
|
|
|
|
else:
|
|
|
|
raise TypeError("Could not parse the data")
|
|
|
|
|
2022-02-20 04:10:45 +00:00
|
|
|
elif isinstance(data, Mapping):
|
2022-02-19 17:33:00 +00:00
|
|
|
current_data = [(k, v) for k, v in data.items()]
|
|
|
|
current_data = _preprocess_timeseries(current_data, date_format)
|
|
|
|
|
|
|
|
else:
|
|
|
|
raise TypeError("Could not parse the data")
|
|
|
|
current_data.sort()
|
|
|
|
return current_data
|
|
|
|
|
|
|
|
|
|
|
|
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
|
|
|
|
"""Checks the arguments and returns appropriate timedelta objects"""
|
|
|
|
|
|
|
|
deltas = {'exact': 0, 'previous': -1, 'next': 1}
|
|
|
|
if closest not in deltas.keys():
|
|
|
|
raise ValueError(f"Invalid closest argument: {closest}")
|
|
|
|
|
|
|
|
as_on_match = closest if as_on_match == 'closest' else as_on_match
|
|
|
|
prior_match = closest if prior_match == 'closest' else prior_match
|
|
|
|
|
|
|
|
if as_on_match in deltas.keys():
|
|
|
|
as_on_delta = datetime.timedelta(days=deltas[as_on_match])
|
|
|
|
else:
|
|
|
|
raise ValueError(f"Invalid as_on_match argument: {as_on_match}")
|
|
|
|
|
|
|
|
if prior_match in deltas.keys():
|
|
|
|
prior_delta = datetime.timedelta(days=deltas[prior_match])
|
|
|
|
else:
|
|
|
|
raise ValueError(f"Invalid prior_match argument: {prior_match}")
|
|
|
|
|
|
|
|
return as_on_delta, prior_delta
|
|
|
|
|
|
|
|
|
2022-02-21 02:56:29 +00:00
|
|
|
class _IndexSlicer:
|
2022-02-20 13:30:39 +00:00
|
|
|
def __init__(self, parent_obj):
|
|
|
|
self.parent = parent_obj
|
|
|
|
|
|
|
|
def __getitem__(self, n):
|
|
|
|
all_keys = list(self.parent.time_series)
|
|
|
|
if isinstance(n, int):
|
|
|
|
keys = [all_keys[n]]
|
|
|
|
else:
|
|
|
|
keys = all_keys[n]
|
|
|
|
item = [(key, self.parent.time_series[key]) for key in keys]
|
|
|
|
if len(item) == 1:
|
|
|
|
return item[0]
|
|
|
|
|
|
|
|
return item
|
|
|
|
|
|
|
|
|
2022-02-20 17:19:45 +00:00
|
|
|
class Series:
|
|
|
|
def __init__(self, data):
|
|
|
|
if not isinstance(data, Sequence):
|
|
|
|
raise TypeError("Series only supports creation using Sequence types")
|
|
|
|
|
|
|
|
if isinstance(data[0], bool):
|
|
|
|
self.data = data
|
|
|
|
self.dtype = bool
|
|
|
|
elif isinstance(data[0], Number):
|
|
|
|
self.dtype = float
|
|
|
|
self.data = [float(i) for i in data]
|
|
|
|
elif isinstance(data[0], str):
|
|
|
|
try:
|
|
|
|
data = [datetime.datetime.strptime(i, FincalOptions.date_format) for i in data]
|
|
|
|
self.dtype = datetime.datetime
|
|
|
|
except ValueError:
|
2022-02-21 02:56:29 +00:00
|
|
|
raise TypeError("Series does not support string data type except dates.\n"
|
|
|
|
"Hint: Try setting the date format using FincalOptions.date_format")
|
2022-02-20 17:19:45 +00:00
|
|
|
elif isinstance(data[0], datetime.datetime):
|
|
|
|
self.dtype = datetime.datetime
|
|
|
|
self.data = data
|
|
|
|
else:
|
|
|
|
raise TypeError(f"Cannot create series object from {type(data).__name__} of {type(data[0]).__name__}")
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return f"{self.__class__.__name__}({self.data})"
|
|
|
|
|
|
|
|
def __getitem__(self, n):
|
|
|
|
return self.data[n]
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return len(self.data)
|
|
|
|
|
|
|
|
def __gt__(self, other):
|
|
|
|
if self.dtype == bool:
|
|
|
|
raise TypeError("> not supported for boolean series")
|
|
|
|
|
|
|
|
if self.dtype == float and isinstance(other, Number) or isinstance(other, self.dtype):
|
|
|
|
gt = Series([i > other for i in self.data])
|
|
|
|
else:
|
|
|
|
raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}")
|
|
|
|
|
|
|
|
return gt
|
|
|
|
|
|
|
|
def __lt__(self, other):
|
|
|
|
if self.dtype == bool:
|
|
|
|
raise TypeError("< not supported for boolean series")
|
|
|
|
|
|
|
|
if self.dtype == float and isinstance(other, Number) or isinstance(other, self.dtype):
|
|
|
|
lt = Series([i < other for i in self.data])
|
|
|
|
else:
|
|
|
|
raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}")
|
|
|
|
return lt
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
if self.dtype == float and isinstance(other, Number) or isinstance(other, self.dtype):
|
|
|
|
eq = Series([i == other for i in self.data])
|
|
|
|
else:
|
|
|
|
raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}")
|
|
|
|
return eq
|
|
|
|
|
|
|
|
|
2022-02-19 17:33:00 +00:00
|
|
|
class TimeSeriesCore:
|
|
|
|
"""Defines the core building blocks of a TimeSeries object"""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
data: List[Iterable],
|
2022-02-20 10:36:34 +00:00
|
|
|
frequency: Literal['D', 'W', 'M', 'Q', 'H', 'Y'],
|
|
|
|
date_format: str = "%Y-%m-%d"
|
2022-02-19 17:33:00 +00:00
|
|
|
):
|
|
|
|
"""Instantiate a TimeSeries object
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
data : List[tuple]
|
|
|
|
Time Series data in the form of list of tuples.
|
|
|
|
The first element of each tuple should be a date and second element should be a value.
|
|
|
|
|
|
|
|
date_format : str, optional, default "%Y-%m-%d"
|
|
|
|
Specify the format of the date
|
|
|
|
Required only if the first argument of tuples is a string. Otherwise ignored.
|
|
|
|
|
|
|
|
frequency : str, optional, default "infer"
|
|
|
|
The frequency of the time series. Default is infer.
|
|
|
|
The class will try to infer the frequency automatically and adjust to the closest member.
|
|
|
|
Note that inferring frequencies can fail if the data is too irregular.
|
|
|
|
Valid values are {D, W, M, Q, H, Y}
|
|
|
|
"""
|
|
|
|
|
|
|
|
data = _preprocess_timeseries(data, date_format=date_format)
|
|
|
|
|
|
|
|
self.time_series = dict(data)
|
2022-02-20 03:49:43 +00:00
|
|
|
if len(self.time_series) != len(data):
|
2022-02-19 17:33:00 +00:00
|
|
|
print("Warning: The input data contains duplicate dates which have been ignored.")
|
|
|
|
self.frequency = getattr(AllFrequencies, frequency)
|
2022-02-20 16:06:44 +00:00
|
|
|
self.iter_num = -1
|
2022-02-20 16:22:33 +00:00
|
|
|
self._dates = None
|
|
|
|
self._values = None
|
|
|
|
self._start_date = None
|
|
|
|
self._end_date = None
|
|
|
|
|
|
|
|
@property
|
|
|
|
def dates(self):
|
|
|
|
if self._dates is None or len(self._dates) != len(self.time_series):
|
|
|
|
self._dates = list(self.time_series.keys())
|
|
|
|
|
2022-02-20 17:19:45 +00:00
|
|
|
return Series(self._dates)
|
2022-02-20 16:22:33 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def values(self):
|
|
|
|
if self._values is None or len(self._values) != len(self.time_series):
|
|
|
|
self._values = list(self.time_series.values())
|
|
|
|
|
2022-02-20 17:19:45 +00:00
|
|
|
return Series(self._values)
|
2022-02-20 16:22:33 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def start_date(self):
|
|
|
|
return self.dates[0]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def end_date(self):
|
|
|
|
return self.dates[-1]
|
2022-02-19 17:33:00 +00:00
|
|
|
|
2022-02-20 13:30:39 +00:00
|
|
|
def _get_printable_slice(self, n: int):
|
2022-02-20 10:36:34 +00:00
|
|
|
"""Returns a slice of the dataframe from beginning and end"""
|
|
|
|
|
|
|
|
printable = {}
|
|
|
|
iter_f = iter(self.time_series)
|
|
|
|
first_n = [next(iter_f) for i in range(n//2)]
|
|
|
|
|
|
|
|
iter_b = reversed(self.time_series)
|
|
|
|
last_n = [next(iter_b) for i in range(n//2)]
|
|
|
|
last_n.sort()
|
|
|
|
|
|
|
|
printable['start'] = [str((i, self.time_series[i])) for i in first_n]
|
|
|
|
printable['end'] = [str((i, self.time_series[i])) for i in last_n]
|
|
|
|
return printable
|
|
|
|
|
2022-02-19 17:33:00 +00:00
|
|
|
def __repr__(self):
|
|
|
|
if len(self.time_series) > 6:
|
2022-02-20 13:30:39 +00:00
|
|
|
printable = self._get_printable_slice(6)
|
2022-02-20 10:36:34 +00:00
|
|
|
printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format(
|
|
|
|
self.__class__.__name__,
|
|
|
|
',\n\t '.join(printable['start']),
|
|
|
|
',\n\t '.join(printable['end']),
|
|
|
|
repr(self.frequency.symbol)
|
2022-02-19 17:33:00 +00:00
|
|
|
)
|
|
|
|
else:
|
2022-02-20 10:36:34 +00:00
|
|
|
printable_str = "{}([{}], frequency={})".format(
|
|
|
|
self.__class__.__name__,
|
|
|
|
',\n\t'.join([str(i) for i in self.time_series.items()]),
|
|
|
|
repr(self.frequency.symbol)
|
|
|
|
)
|
2022-02-19 17:33:00 +00:00
|
|
|
return printable_str
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
if len(self.time_series) > 6:
|
2022-02-20 13:30:39 +00:00
|
|
|
printable = self._get_printable_slice(6)
|
2022-02-19 17:33:00 +00:00
|
|
|
printable_str = "[{}\n ...\n {}]".format(
|
2022-02-20 10:36:34 +00:00
|
|
|
',\n '.join(printable['start']),
|
|
|
|
',\n '.join(printable['end']),
|
2022-02-19 17:33:00 +00:00
|
|
|
)
|
|
|
|
else:
|
2022-02-20 10:36:34 +00:00
|
|
|
printable_str = "[{}]".format(',\n '.join([str(i) for i in self.time_series.items()]))
|
2022-02-19 17:33:00 +00:00
|
|
|
return printable_str
|
|
|
|
|
2022-02-20 13:30:39 +00:00
|
|
|
def __getitem__(self, key):
|
2022-02-20 17:19:45 +00:00
|
|
|
if isinstance(key, Series):
|
|
|
|
if not key.dtype == bool:
|
|
|
|
raise ValueError(f"Cannot slice {self.__class__.__name__} using a Series of {key.dtype.__name__}")
|
|
|
|
elif len(key) != len(self.dates):
|
|
|
|
raise Exception(f"Length of Series: {len(key)} did not match length of object: {len(self.dates)}")
|
|
|
|
else:
|
|
|
|
dates_to_return = [self.dates[i] for i, j in enumerate(key) if j]
|
|
|
|
data_to_return = [(key, self.time_series[key]) for key in dates_to_return]
|
|
|
|
return TimeSeriesCore(data_to_return)
|
|
|
|
|
2022-02-20 13:30:39 +00:00
|
|
|
if isinstance(key, int):
|
|
|
|
raise KeyError(f"{key}. For index based slicing, use .iloc[{key}]")
|
|
|
|
elif isinstance(key, datetime.datetime):
|
|
|
|
item = (key, self.time_series[key])
|
|
|
|
if isinstance(key, str):
|
2022-02-20 16:22:33 +00:00
|
|
|
if key == 'dates':
|
|
|
|
return self.dates
|
|
|
|
elif key == 'values':
|
2022-02-21 02:56:29 +00:00
|
|
|
return self.values
|
2022-02-20 13:30:39 +00:00
|
|
|
try:
|
|
|
|
dt_key = datetime.datetime.strptime(key, FincalOptions.date_format)
|
|
|
|
item = (dt_key, self.time_series[dt_key])
|
|
|
|
except ValueError:
|
|
|
|
raise KeyError(f"{repr(key)}. If you passed a date as a string, "
|
|
|
|
"try setting the date format using Fincal.Options.date_format")
|
|
|
|
except KeyError:
|
|
|
|
raise KeyError(f"{repr(key)}. This date is not available.")
|
|
|
|
elif isinstance(key, Sequence):
|
|
|
|
item = [(k, self.time_series[k]) for k in key]
|
2022-02-19 17:33:00 +00:00
|
|
|
else:
|
2022-02-20 13:30:39 +00:00
|
|
|
raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.")
|
2022-02-19 17:33:00 +00:00
|
|
|
return item
|
|
|
|
|
|
|
|
def __len__(self):
|
2022-02-20 06:06:56 +00:00
|
|
|
return len(self.time_series)
|
2022-02-19 17:33:00 +00:00
|
|
|
|
2022-02-20 16:06:44 +00:00
|
|
|
def __iter__(self):
|
|
|
|
self.n = 0
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self):
|
|
|
|
if self.n >= len(self.dates):
|
|
|
|
raise StopIteration
|
|
|
|
else:
|
|
|
|
key = self.dates[self.n]
|
|
|
|
self.n += 1
|
|
|
|
return key, self.time_series[key]
|
|
|
|
|
2022-02-19 17:33:00 +00:00
|
|
|
def head(self, n: int = 6):
|
2022-02-20 13:30:39 +00:00
|
|
|
"""Returns the first n items of the TimeSeries object"""
|
|
|
|
|
2022-02-19 17:33:00 +00:00
|
|
|
keys = list(self.time_series.keys())
|
|
|
|
keys = keys[:n]
|
|
|
|
result = [(key, self.time_series[key]) for key in keys]
|
|
|
|
return result
|
|
|
|
|
|
|
|
def tail(self, n: int = 6):
|
2022-02-20 13:30:39 +00:00
|
|
|
"""Returns the last n items of the TimeSeries object"""
|
|
|
|
|
2022-02-19 17:33:00 +00:00
|
|
|
keys = list(self.time_series.keys())
|
|
|
|
keys = keys[-n:]
|
|
|
|
result = [(key, self.time_series[key]) for key in keys]
|
|
|
|
return result
|
2022-02-20 13:30:39 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def iloc(self):
|
|
|
|
"""Returns an item or a set of items based on index"""
|
|
|
|
|
2022-02-21 02:56:29 +00:00
|
|
|
return _IndexSlicer(self)
|