|
|
@ -6,7 +6,17 @@ import warnings |
|
|
|
from collections import UserList |
|
|
|
from dataclasses import dataclass |
|
|
|
from numbers import Number |
|
|
|
from typing import Any, Callable, Iterable, List, Literal, Mapping, Sequence, Type |
|
|
|
from typing import ( |
|
|
|
Any, |
|
|
|
Callable, |
|
|
|
Iterable, |
|
|
|
List, |
|
|
|
Literal, |
|
|
|
Mapping, |
|
|
|
Sequence, |
|
|
|
Tuple, |
|
|
|
Type, |
|
|
|
) |
|
|
|
|
|
|
|
from dateutil.relativedelta import relativedelta |
|
|
|
|
|
|
@ -289,6 +299,51 @@ class Series(UserList): |
|
|
|
return self.__class__([i + other for i in self], self.dtype.__name__) |
|
|
|
|
|
|
|
|
|
|
|
def _validate_frequency( |
|
|
|
data: List[Tuple[datetime.datetime, float]], provided_frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None |
|
|
|
): |
|
|
|
"""Checks the data and returns the expected frequency.""" |
|
|
|
if provided_frequency is not None: |
|
|
|
provided_frequency = getattr(AllFrequencies, provided_frequency) |
|
|
|
start_date = data[0][0] |
|
|
|
end_date = data[-1][0] |
|
|
|
overall_gap = (end_date - start_date).days |
|
|
|
num_data_points = len(data) |
|
|
|
# days_per_data = num_data_points / overall_gap |
|
|
|
|
|
|
|
expected_data_points = { |
|
|
|
"D": (round(overall_gap * 0.6, 0), round(overall_gap * 1.05 + 1, 0)), |
|
|
|
"W": (round(overall_gap / 7 * 0.7, 0), round(overall_gap / 7 * 1.05 + 1, 0)), |
|
|
|
"M": (round(overall_gap / 30 * 0.8, 0), round(overall_gap / 30 * 1.05 + 1, 0)), |
|
|
|
"Q": (round(overall_gap / 92 * 0.85, 0), round(overall_gap / 92 * 1.05 + 1, 0)), |
|
|
|
"H": (round(overall_gap / 182 * 0.85, 0), round(overall_gap / 182 * 1.05 + 1, 0)), |
|
|
|
"A": (round(overall_gap / 365 * 0.85, 0), round(overall_gap / 365 * 1.05 + 1, 0)), |
|
|
|
} |
|
|
|
|
|
|
|
for frequency, (min, max) in expected_data_points.items(): |
|
|
|
if min <= num_data_points <= max: |
|
|
|
expected_frequency = frequency |
|
|
|
break |
|
|
|
else: |
|
|
|
raise ValueError("Data does not match any known frequency. Perhaps you have too many missing data points.") |
|
|
|
|
|
|
|
expected_data_points = expected_data_points[expected_frequency] |
|
|
|
if provided_frequency is None: |
|
|
|
frequency_match = None |
|
|
|
elif provided_frequency.symbol == expected_frequency: |
|
|
|
frequency_match = True |
|
|
|
else: |
|
|
|
frequency_match = False |
|
|
|
|
|
|
|
return { |
|
|
|
"gap": overall_gap, |
|
|
|
"expected_data_points": expected_data_points, |
|
|
|
"actual_data_points": num_data_points, |
|
|
|
"expected_frequency": expected_frequency, |
|
|
|
"frequency_match": frequency_match, |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Mapping.register |
|
|
|
class TimeSeriesCore: |
|
|
|
"""Defines the core building blocks of a TimeSeries object""" |
|
|
@ -296,7 +351,8 @@ class TimeSeriesCore: |
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
ts_data: List[Iterable] | Mapping, |
|
|
|
frequency: Literal["D", "W", "M", "Q", "H", "Y"], |
|
|
|
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None, |
|
|
|
validate_frequency: bool = True, |
|
|
|
date_format: str = "%Y-%m-%d", |
|
|
|
): |
|
|
|
"""Instantiate a TimeSeriesCore object |
|
|
@ -308,9 +364,21 @@ class TimeSeriesCore: |
|
|
|
The first element of each tuple should be a date and second element should be a value. |
|
|
|
In case of dictionary, the key should be the date. |
|
|
|
|
|
|
|
frequency : str |
|
|
|
frequency : str, optional |
|
|
|
The frequency of the time series. |
|
|
|
Valid values are {D, W, M, Q, H, Y} |
|
|
|
If no frequency is provided, it will be inferred from the data. |
|
|
|
Frequency assignment uses approximation and hence the assignment may be incorrect if |
|
|
|
there are fewer than 12 data points. |
|
|
|
|
|
|
|
validate_frequency: boolean, default True |
|
|
|
Whether the provided frequency should be validated against the data. |
|
|
|
When set to True, if the expected number of data points are not withint the expected limits, |
|
|
|
it will raise an Exception and object creation will fail. |
|
|
|
Validation is performed only if data contains at least 12 data points, as a fewer number of |
|
|
|
data points are not sufficient to determine the frequency correctly. |
|
|
|
This parameter will be ignored if frequency is not provided. |
|
|
|
refer core._validate_frequency for more details. |
|
|
|
|
|
|
|
date_format : str, optional, default "%Y-%m-%d" |
|
|
|
Specify the format of the date |
|
|
@ -319,10 +387,24 @@ class TimeSeriesCore: |
|
|
|
|
|
|
|
ts_data = _preprocess_timeseries(ts_data, date_format=date_format) |
|
|
|
|
|
|
|
validation = _validate_frequency(data=ts_data, provided_frequency=frequency) |
|
|
|
if frequency is None: |
|
|
|
frequency = validation["expected_frequency"] |
|
|
|
|
|
|
|
self.frequency = getattr(AllFrequencies, frequency) |
|
|
|
|
|
|
|
if validate_frequency and len(ts_data) >= 12: |
|
|
|
if validation["frequency_match"] is not None and not validation["frequency_match"]: |
|
|
|
raise ValueError( |
|
|
|
f"Data appears to be of frquency {validation['expected_frequency']!r}, " |
|
|
|
f"but {frequency!r} was provided. Pass the correct frequency." |
|
|
|
"\nPass validate_frequency=False to disable this validation." |
|
|
|
) |
|
|
|
|
|
|
|
self.data = dict(ts_data) |
|
|
|
if len(self.data) != len(ts_data): |
|
|
|
warnings.warn("The input data contains duplicate dates which have been ignored.") |
|
|
|
self.frequency: Frequency = getattr(AllFrequencies, frequency) |
|
|
|
# self.frequency: Frequency = getattr(AllFrequencies, frequency) |
|
|
|
self.iter_num: int = -1 |
|
|
|
self._dates: list = None |
|
|
|
self._values: list = None |
|
|
|