Compare commits

...

2 Commits

2 changed files with 72 additions and 21 deletions

View File

@ -4,7 +4,7 @@ from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union
@dataclass
class Options:
class FincalOptions:
date_format: str = '%Y-%m-%d'
closest: str = 'before' # after
@ -94,6 +94,23 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
return as_on_delta, prior_delta
class IndexSlicer:
def __init__(self, parent_obj):
self.parent = parent_obj
def __getitem__(self, n):
all_keys = list(self.parent.time_series)
if isinstance(n, int):
keys = [all_keys[n]]
else:
keys = all_keys[n]
item = [(key, self.parent.time_series[key]) for key in keys]
if len(item) == 1:
return item[0]
return item
class TimeSeriesCore:
"""Defines the core building blocks of a TimeSeries object"""
@ -132,7 +149,7 @@ class TimeSeriesCore:
self.end_date = self.dates[-1]
self.frequency = getattr(AllFrequencies, frequency)
def _get_slice(self, n: int):
def _get_printable_slice(self, n: int):
"""Returns a slice of the dataframe from beginning and end"""
printable = {}
@ -149,7 +166,7 @@ class TimeSeriesCore:
def __repr__(self):
if len(self.time_series) > 6:
printable = self._get_slice(6)
printable = self._get_printable_slice(6)
printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format(
self.__class__.__name__,
',\n\t '.join(printable['start']),
@ -166,7 +183,7 @@ class TimeSeriesCore:
def __str__(self):
if len(self.time_series) > 6:
printable = self._get_slice(6)
printable = self._get_printable_slice(6)
printable_str = "[{}\n ...\n {}]".format(
',\n '.join(printable['start']),
',\n '.join(printable['end']),
@ -175,29 +192,47 @@ class TimeSeriesCore:
printable_str = "[{}]".format(',\n '.join([str(i) for i in self.time_series.items()]))
return printable_str
def __getitem__(self, n):
all_keys = list(self.time_series)
if isinstance(n, int):
keys = [all_keys[n]]
def __getitem__(self, key):
if isinstance(key, int):
raise KeyError(f"{key}. For index based slicing, use .iloc[{key}]")
elif isinstance(key, datetime.datetime):
item = (key, self.time_series[key])
if isinstance(key, str):
try:
dt_key = datetime.datetime.strptime(key, FincalOptions.date_format)
item = (dt_key, self.time_series[dt_key])
except ValueError:
raise KeyError(f"{repr(key)}. If you passed a date as a string, "
"try setting the date format using Fincal.Options.date_format")
except KeyError:
raise KeyError(f"{repr(key)}. This date is not available.")
elif isinstance(key, Sequence):
item = [(k, self.time_series[k]) for k in key]
else:
keys = all_keys[n]
item = [(key, self.time_series[key]) for key in keys]
if len(item) == 1:
return item[0]
raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.")
return item
def __len__(self):
return len(self.time_series)
def head(self, n: int = 6):
"""Returns the first n items of the TimeSeries object"""
keys = list(self.time_series.keys())
keys = keys[:n]
result = [(key, self.time_series[key]) for key in keys]
return result
def tail(self, n: int = 6):
"""Returns the last n items of the TimeSeries object"""
keys = list(self.time_series.keys())
keys = keys[-n:]
result = [(key, self.time_series[key]) for key in keys]
return result
@property
def iloc(self):
"""Returns an item or a set of items based on index"""
return IndexSlicer(self)

View File

@ -77,23 +77,39 @@ class TimeSeries(TimeSeriesCore):
return new_ts
def bfill(self, inplace=False):
num_days = (self.end_date - self.start_date).days + 1
def bfill(self, inplace: bool = False, limit: int = None) -> Union[TimeSeries, None]:
"""Backward fill missing dates in the time series
new_ts = dict()
for i in range(num_days):
cur_date = self.end_date - datetime.timedelta(days=i)
Parameters
----------
inplace : bool
Modify the time-series data in place and return None.
limit : int, optional
Maximum number of periods to back fill
Returns
-------
Returns a TimeSeries object if inplace is False, otherwise None
"""
eomonth = True if self.frequency.days >= AllFrequencies.M.days else False
dates_to_fill = create_date_series(self.start_date, self.end_date, self.frequency.symbol, eomonth)
dates_to_fill.append(self.end_date)
bfill_ts = dict()
for cur_date in reversed(dates_to_fill):
try:
cur_val = self.time_series[cur_date]
except KeyError:
pass
new_ts.update({cur_date: cur_val})
bfill_ts.update({cur_date: cur_val})
new_ts = {k: bfill_ts[k] for k in reversed(bfill_ts)}
if inplace:
self.time_series = new_ts
return None
return dict(reversed(new_ts.items()))
return new_ts
def calculate_returns(
self,