Compare commits
3 Commits
33feb62015
...
8e8146a07d
Author | SHA1 | Date | |
---|---|---|---|
8e8146a07d | |||
ce6326f4b5 | |||
e06626dbca |
@ -5,7 +5,7 @@ import inspect
|
||||
from collections import UserDict, UserList
|
||||
from dataclasses import dataclass
|
||||
from numbers import Number
|
||||
from typing import Iterable, List, Literal, Mapping, Sequence, Union
|
||||
from typing import Callable, Iterable, List, Literal, Mapping, Sequence, Type
|
||||
|
||||
from .utils import FincalOptions, _parse_date, _preprocess_timeseries
|
||||
|
||||
@ -42,15 +42,15 @@ def date_parser(*pos):
|
||||
|
||||
def parse_dates(func):
|
||||
def wrapper_func(*args, **kwargs):
|
||||
date_format = kwargs.get("date_format", None)
|
||||
args = list(args)
|
||||
sig = inspect.signature(func)
|
||||
params = [i[0] for i in sig.parameters.items()]
|
||||
date_format: str = kwargs.get("date_format", None)
|
||||
args: list = list(args)
|
||||
sig: inspect.Signature = inspect.signature(func)
|
||||
params: list = [i[0] for i in sig.parameters.items()]
|
||||
|
||||
for j in pos:
|
||||
kwarg = params[j]
|
||||
kwarg: str = params[j]
|
||||
date = kwargs.get(kwarg, None)
|
||||
in_args = False
|
||||
in_args: bool = False
|
||||
if date is None:
|
||||
try:
|
||||
date = args[j]
|
||||
@ -61,7 +61,7 @@ def date_parser(*pos):
|
||||
if date is None:
|
||||
continue
|
||||
|
||||
parsed_date = _parse_date(date, date_format)
|
||||
parsed_date: datetime.datetime = _parse_date(date, date_format)
|
||||
if not in_args:
|
||||
kwargs[kwarg] = parsed_date
|
||||
else:
|
||||
@ -90,9 +90,9 @@ class _IndexSlicer:
|
||||
|
||||
def __getitem__(self, n):
|
||||
if isinstance(n, int):
|
||||
keys = [self.parent.dates[n]]
|
||||
keys: list = [self.parent.dates[n]]
|
||||
else:
|
||||
keys = self.parent.dates[n]
|
||||
keys: list = self.parent.dates[n]
|
||||
item = [(key, self.parent.data[key]) for key in keys]
|
||||
if len(item) == 1:
|
||||
return item[0]
|
||||
@ -105,11 +105,11 @@ class Series(UserList):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data,
|
||||
data: Sequence,
|
||||
data_type: Literal["date", "number", "bool"],
|
||||
date_format: str = None,
|
||||
):
|
||||
types_dict = {
|
||||
types_dict: dict = {
|
||||
"date": datetime.datetime,
|
||||
"datetime": datetime.datetime,
|
||||
"datetime.datetime": datetime.datetime,
|
||||
@ -128,11 +128,11 @@ class Series(UserList):
|
||||
if data_type in ["date", "datetime", "datetime.datetime"]:
|
||||
data = [_parse_date(i, date_format) for i in data]
|
||||
else:
|
||||
func = types_dict[data_type]
|
||||
data = [func(i) for i in data]
|
||||
func: Callable = types_dict[data_type]
|
||||
data: list = [func(i) for i in data]
|
||||
|
||||
self.dtype = types_dict[data_type]
|
||||
self.data = data
|
||||
self.dtype: Type = types_dict[data_type]
|
||||
self.data: Sequence = data
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.__class__.__name__}({self.data}, data_type='{self.dtype.__name__}')"
|
||||
@ -212,30 +212,32 @@ class TimeSeriesCore(UserDict):
|
||||
"""Defines the core building blocks of a TimeSeries object"""
|
||||
|
||||
def __init__(
|
||||
self, data: List[Iterable], frequency: Literal["D", "W", "M", "Q", "H", "Y"], date_format: str = "%Y-%m-%d"
|
||||
self,
|
||||
data: List[Iterable] | Mapping,
|
||||
frequency: Literal["D", "W", "M", "Q", "H", "Y"],
|
||||
date_format: str = "%Y-%m-%d",
|
||||
):
|
||||
"""Instantiate a TimeSeriesCore object
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : List[tuple]
|
||||
Time Series data in the form of list of tuples.
|
||||
data : List[Iterable] | Mapping
|
||||
Time Series data in the form of list of tuples or dictionary.
|
||||
The first element of each tuple should be a date and second element should be a value.
|
||||
In case of dictionary, the key should be the date.
|
||||
|
||||
frequency : str
|
||||
The frequency of the time series.
|
||||
Valid values are {D, W, M, Q, H, Y}
|
||||
|
||||
date_format : str, optional, default "%Y-%m-%d"
|
||||
Specify the format of the date
|
||||
Required only if the first argument of tuples is a string. Otherwise ignored.
|
||||
|
||||
frequency : str, optional, default "infer"
|
||||
The frequency of the time series. Default is infer.
|
||||
The class will try to infer the frequency automatically and adjust to the closest member.
|
||||
Note that inferring frequencies can fail if the data is too irregular.
|
||||
Valid values are {D, W, M, Q, H, Y}
|
||||
"""
|
||||
|
||||
data = _preprocess_timeseries(data, date_format=date_format)
|
||||
|
||||
self.data = dict(data)
|
||||
super().__init__(dict(data))
|
||||
if len(self.data) != len(data):
|
||||
print("Warning: The input data contains duplicate dates which have been ignored.")
|
||||
self.frequency: Frequency = getattr(AllFrequencies, frequency)
|
||||
@ -322,10 +324,10 @@ class TimeSeriesCore(UserDict):
|
||||
return printable_str
|
||||
|
||||
@date_parser(1)
|
||||
def _get_item_from_date(self, date: Union[str, datetime.datetime]):
|
||||
def _get_item_from_date(self, date: str | datetime.datetime):
|
||||
return date, self.data[date]
|
||||
|
||||
def _get_item_from_key(self, key: Union[str, datetime.datetime]):
|
||||
def _get_item_from_key(self, key: str | datetime.datetime):
|
||||
if isinstance(key, int):
|
||||
raise KeyError(f"{key}. \nHint: use .iloc[{key}] for index based slicing.")
|
||||
|
||||
@ -334,7 +336,7 @@ class TimeSeriesCore(UserDict):
|
||||
|
||||
return self._get_item_from_date(key)
|
||||
|
||||
def _get_item_from_list(self, date_list: Sequence[Union[str, datetime.datetime]]):
|
||||
def _get_item_from_list(self, date_list: Sequence[str | datetime.datetime]):
|
||||
data_to_return = [self._get_item_from_key(key) for key in date_list]
|
||||
return self.__class__(data_to_return, frequency=self.frequency.symbol)
|
||||
|
||||
@ -362,6 +364,9 @@ class TimeSeriesCore(UserDict):
|
||||
|
||||
raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.")
|
||||
|
||||
# def __setitem__(self, key, value):
|
||||
# pass
|
||||
|
||||
def __iter__(self):
|
||||
self.n = 0
|
||||
return self
|
||||
@ -379,7 +384,7 @@ class TimeSeriesCore(UserDict):
|
||||
return super().__contains__(key)
|
||||
|
||||
@date_parser(1)
|
||||
def get(self, date: Union[str, datetime.datetime], default=None, closest=None):
|
||||
def get(self, date: str | datetime.datetime, default=None, closest=None):
|
||||
|
||||
if closest is None:
|
||||
closest = FincalOptions.get_closest
|
||||
|
@ -5,7 +5,7 @@ import datetime
|
||||
import math
|
||||
import pathlib
|
||||
import statistics
|
||||
from typing import Iterable, List, Literal, Mapping, Tuple, TypedDict, Union
|
||||
from typing import Iterable, List, Literal, Mapping, Tuple, TypedDict
|
||||
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
@ -26,8 +26,8 @@ class MaxDrawdown(TypedDict):
|
||||
|
||||
@date_parser(0, 1)
|
||||
def create_date_series(
|
||||
start_date: Union[str, datetime.datetime],
|
||||
end_date: Union[str, datetime.datetime],
|
||||
start_date: str | datetime.datetime,
|
||||
end_date: str | datetime.datetime,
|
||||
frequency: Literal["D", "W", "M", "Q", "H", "Y"],
|
||||
eomonth: bool = False,
|
||||
skip_weekends: bool = False,
|
||||
@ -130,7 +130,7 @@ class TimeSeries(TimeSeriesCore):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: Union[List[Iterable], Mapping],
|
||||
data: List[Iterable] | Mapping,
|
||||
frequency: Literal["D", "W", "M", "Q", "H", "Y"],
|
||||
date_format: str = "%Y-%m-%d",
|
||||
):
|
||||
@ -145,7 +145,7 @@ class TimeSeries(TimeSeriesCore):
|
||||
res_string: str = "First date: {}\nLast date: {}\nNumber of rows: {}"
|
||||
return res_string.format(self.start_date, self.end_date, total_dates)
|
||||
|
||||
def ffill(self, inplace: bool = False, limit: int = None, skip_weekends: bool = False) -> Union[TimeSeries, None]:
|
||||
def ffill(self, inplace: bool = False, limit: int = None, skip_weekends: bool = False) -> TimeSeries | None:
|
||||
"""Forward fill missing dates in the time series
|
||||
|
||||
Parameters
|
||||
@ -183,7 +183,7 @@ class TimeSeries(TimeSeriesCore):
|
||||
|
||||
return self.__class__(new_ts, frequency=self.frequency.symbol)
|
||||
|
||||
def bfill(self, inplace: bool = False, limit: int = None, skip_weekends: bool = False) -> Union[TimeSeries, None]:
|
||||
def bfill(self, inplace: bool = False, limit: int = None, skip_weekends: bool = False) -> TimeSeries | None:
|
||||
"""Backward fill missing dates in the time series
|
||||
|
||||
Parameters
|
||||
@ -225,7 +225,7 @@ class TimeSeries(TimeSeriesCore):
|
||||
@date_parser(1)
|
||||
def calculate_returns(
|
||||
self,
|
||||
as_on: Union[str, datetime.datetime],
|
||||
as_on: str | datetime.datetime,
|
||||
return_actual_date: bool = True,
|
||||
as_on_match: str = "closest",
|
||||
prior_match: str = "closest",
|
||||
@ -318,8 +318,8 @@ class TimeSeries(TimeSeriesCore):
|
||||
@date_parser(1, 2)
|
||||
def calculate_rolling_returns(
|
||||
self,
|
||||
from_date: Union[datetime.date, str],
|
||||
to_date: Union[datetime.date, str],
|
||||
from_date: datetime.date | str,
|
||||
to_date: datetime.date | str,
|
||||
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
|
||||
as_on_match: str = "closest",
|
||||
prior_match: str = "closest",
|
||||
@ -427,8 +427,8 @@ class TimeSeries(TimeSeriesCore):
|
||||
@date_parser(1, 2)
|
||||
def volatility(
|
||||
self,
|
||||
from_date: Union[datetime.date, str] = None,
|
||||
to_date: Union[datetime.date, str] = None,
|
||||
from_date: datetime.date | str = None,
|
||||
to_date: datetime.date | str = None,
|
||||
annualize_volatility: bool = True,
|
||||
traded_days: int = None,
|
||||
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
|
||||
@ -600,12 +600,32 @@ class TimeSeries(TimeSeriesCore):
|
||||
ensure_coverage=True,
|
||||
)
|
||||
|
||||
closest = "previous" if method == "ffill" else "next"
|
||||
closest: str = "previous" if method == "ffill" else "next"
|
||||
new_ts: dict = {dt: self.get(dt, closest=closest)[1] for dt in new_dates}
|
||||
output_ts: TimeSeries = TimeSeries(new_ts, frequency=to_frequency.symbol)
|
||||
|
||||
return output_ts
|
||||
|
||||
def sync(self, other: TimeSeries, fill_method: Literal["ffill", "bfill"] = "ffill"):
|
||||
"""Synchronize two TimeSeries objects
|
||||
|
||||
This will ensure that both time series have the same frequency and same set of dates.
|
||||
The frequency will be set to the higher of the two objects.
|
||||
Dates will be taken from the class on which the method is called.
|
||||
"""
|
||||
|
||||
if not isinstance(other, TimeSeries):
|
||||
raise TypeError("Only objects of type TimeSeries can be passed for sync")
|
||||
|
||||
if self.frequency.days > other.frequency.days:
|
||||
other = other.expand(to_frequency=self.frequency.symbol, method=fill_method)
|
||||
if self.frequency.days < other.frequency.days:
|
||||
self = self.expand(to_frequency=other.frequency.symbol, method=fill_method)
|
||||
|
||||
for dt, val in self.data.items():
|
||||
if dt not in other:
|
||||
pass # Need to create setitem first before implementing this
|
||||
|
||||
|
||||
def _preprocess_csv(file_path: str | pathlib.Path, delimiter: str = ",", encoding: str = "utf-8") -> List[list]:
|
||||
"""Preprocess csv data"""
|
||||
@ -617,8 +637,8 @@ def _preprocess_csv(file_path: str | pathlib.Path, delimiter: str = ",", encodin
|
||||
raise ValueError("File not found. Check the file path")
|
||||
|
||||
with open(file_path, "r", encoding=encoding) as file:
|
||||
reader = csv.reader(file, delimiter=delimiter)
|
||||
csv_data = list(reader)
|
||||
reader: csv.reader = csv.reader(file, delimiter=delimiter)
|
||||
csv_data: list = list(reader)
|
||||
|
||||
csv_data = [i for i in csv_data if i] # remove blank rows
|
||||
if not csv_data:
|
||||
|
@ -1,6 +1,6 @@
|
||||
import datetime
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union
|
||||
from typing import List, Literal, Mapping, Sequence, Tuple
|
||||
|
||||
from .exceptions import DateNotFoundError, DateOutOfRangeError
|
||||
|
||||
@ -32,18 +32,15 @@ def _parse_date(date: str, date_format: str = None):
|
||||
|
||||
|
||||
def _preprocess_timeseries(
|
||||
data: Union[
|
||||
Sequence[Iterable[Union[str, datetime.datetime, float]]],
|
||||
Sequence[Mapping[str, Union[float, datetime.datetime]]],
|
||||
Sequence[Mapping[Union[str, datetime.datetime], float]],
|
||||
Mapping[Union[str, datetime.datetime], float],
|
||||
],
|
||||
data: Sequence[Tuple[str | datetime.datetime, float]]
|
||||
| Sequence[Mapping[str | datetime.datetime, float]]
|
||||
| Mapping[str | datetime.datetime, float],
|
||||
date_format: str,
|
||||
) -> List[Tuple[datetime.datetime, float]]:
|
||||
"""Converts any type of list to the correct type"""
|
||||
|
||||
if isinstance(data, Mapping):
|
||||
current_data = [(k, v) for k, v in data.items()]
|
||||
current_data: List[tuple] = [(k, v) for k, v in data.items()]
|
||||
return _preprocess_timeseries(current_data, date_format)
|
||||
|
||||
if not isinstance(data, Sequence):
|
||||
@ -56,31 +53,31 @@ def _preprocess_timeseries(
|
||||
raise TypeError("Could not parse the data")
|
||||
|
||||
if len(data[0]) == 1:
|
||||
current_data = [tuple(*i.items()) for i in data]
|
||||
current_data: List[tuple] = [tuple(*i.items()) for i in data]
|
||||
elif len(data[0]) == 2:
|
||||
current_data = [tuple(i.values()) for i in data]
|
||||
current_data: List[tuple] = [tuple(i.values()) for i in data]
|
||||
else:
|
||||
raise TypeError("Could not parse the data")
|
||||
return _preprocess_timeseries(current_data, date_format)
|
||||
|
||||
|
||||
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
|
||||
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> Tuple[datetime.timedelta]:
|
||||
"""Checks the arguments and returns appropriate timedelta objects"""
|
||||
|
||||
deltas = {"exact": 0, "previous": -1, "next": 1}
|
||||
if closest not in deltas.keys():
|
||||
raise ValueError(f"Invalid argument for closest: {closest}")
|
||||
|
||||
as_on_match = closest if as_on_match == "closest" else as_on_match
|
||||
prior_match = closest if prior_match == "closest" else prior_match
|
||||
as_on_match: str = closest if as_on_match == "closest" else as_on_match
|
||||
prior_match: str = closest if prior_match == "closest" else prior_match
|
||||
|
||||
if as_on_match in deltas.keys():
|
||||
as_on_delta = datetime.timedelta(days=deltas[as_on_match])
|
||||
as_on_delta: datetime.timedelta = datetime.timedelta(days=deltas[as_on_match])
|
||||
else:
|
||||
raise ValueError(f"Invalid as_on_match argument: {as_on_match}")
|
||||
|
||||
if prior_match in deltas.keys():
|
||||
prior_delta = datetime.timedelta(days=deltas[prior_match])
|
||||
prior_delta: datetime.timedelta = datetime.timedelta(days=deltas[prior_match])
|
||||
else:
|
||||
raise ValueError(f"Invalid prior_match argument: {prior_match}")
|
||||
|
||||
@ -101,7 +98,7 @@ def _find_closest_date(
|
||||
if delta.days > 0 and date > max(data):
|
||||
raise DateOutOfRangeError(date, "max")
|
||||
|
||||
row = data.get(date, None)
|
||||
row: tuple = data.get(date, None)
|
||||
if row is not None:
|
||||
return date, row
|
||||
|
||||
@ -119,6 +116,6 @@ def _find_closest_date(
|
||||
def _interval_to_years(interval_type: Literal["years", "months", "day"], interval_value: int) -> float:
|
||||
"""Converts any time period to years for use with compounding functions"""
|
||||
|
||||
year_conversion_factor = {"years": 1, "months": 12, "days": 365}
|
||||
years = interval_value / year_conversion_factor[interval_type]
|
||||
year_conversion_factor: dict = {"years": 1, "months": 12, "days": 365}
|
||||
years: float = interval_value / year_conversion_factor[interval_type]
|
||||
return years
|
||||
|
@ -229,6 +229,23 @@ class TestTimeSeriesBasics:
|
||||
bf = ts.bfill()
|
||||
assert bf["2021-01-03"][1] == 240
|
||||
|
||||
def test_fill_weekly(self):
|
||||
ts_data = create_test_data(frequency=AllFrequencies.W, num=10)
|
||||
ts_data.pop(2)
|
||||
ts_data.pop(6)
|
||||
ts = TimeSeries(ts_data, frequency="W")
|
||||
assert len(ts) == 8
|
||||
|
||||
ff = ts.ffill()
|
||||
assert len(ff) == 10
|
||||
assert "2017-01-15" in ff
|
||||
assert ff["2017-01-15"][1] == ff["2017-01-08"][1]
|
||||
|
||||
bf = ts.bfill()
|
||||
assert len(ff) == 10
|
||||
assert "2017-01-15" in bf
|
||||
assert bf["2017-01-15"][1] == bf["2017-01-22"][1]
|
||||
|
||||
|
||||
class TestReturns:
|
||||
def test_returns_calc(self):
|
||||
|
Loading…
Reference in New Issue
Block a user