Compare commits

...

2 Commits

2 changed files with 72 additions and 21 deletions

View File

@ -4,7 +4,7 @@ from typing import Iterable, List, Literal, Mapping, Sequence, Tuple, Union
@dataclass @dataclass
class Options: class FincalOptions:
date_format: str = '%Y-%m-%d' date_format: str = '%Y-%m-%d'
closest: str = 'before' # after closest: str = 'before' # after
@ -94,6 +94,23 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
return as_on_delta, prior_delta return as_on_delta, prior_delta
class IndexSlicer:
def __init__(self, parent_obj):
self.parent = parent_obj
def __getitem__(self, n):
all_keys = list(self.parent.time_series)
if isinstance(n, int):
keys = [all_keys[n]]
else:
keys = all_keys[n]
item = [(key, self.parent.time_series[key]) for key in keys]
if len(item) == 1:
return item[0]
return item
class TimeSeriesCore: class TimeSeriesCore:
"""Defines the core building blocks of a TimeSeries object""" """Defines the core building blocks of a TimeSeries object"""
@ -132,7 +149,7 @@ class TimeSeriesCore:
self.end_date = self.dates[-1] self.end_date = self.dates[-1]
self.frequency = getattr(AllFrequencies, frequency) self.frequency = getattr(AllFrequencies, frequency)
def _get_slice(self, n: int): def _get_printable_slice(self, n: int):
"""Returns a slice of the dataframe from beginning and end""" """Returns a slice of the dataframe from beginning and end"""
printable = {} printable = {}
@ -149,7 +166,7 @@ class TimeSeriesCore:
def __repr__(self): def __repr__(self):
if len(self.time_series) > 6: if len(self.time_series) > 6:
printable = self._get_slice(6) printable = self._get_printable_slice(6)
printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format( printable_str = "{}([{}\n\t ...\n\t {}], frequency={})".format(
self.__class__.__name__, self.__class__.__name__,
',\n\t '.join(printable['start']), ',\n\t '.join(printable['start']),
@ -166,7 +183,7 @@ class TimeSeriesCore:
def __str__(self): def __str__(self):
if len(self.time_series) > 6: if len(self.time_series) > 6:
printable = self._get_slice(6) printable = self._get_printable_slice(6)
printable_str = "[{}\n ...\n {}]".format( printable_str = "[{}\n ...\n {}]".format(
',\n '.join(printable['start']), ',\n '.join(printable['start']),
',\n '.join(printable['end']), ',\n '.join(printable['end']),
@ -175,29 +192,47 @@ class TimeSeriesCore:
printable_str = "[{}]".format(',\n '.join([str(i) for i in self.time_series.items()])) printable_str = "[{}]".format(',\n '.join([str(i) for i in self.time_series.items()]))
return printable_str return printable_str
def __getitem__(self, n): def __getitem__(self, key):
all_keys = list(self.time_series) if isinstance(key, int):
if isinstance(n, int): raise KeyError(f"{key}. For index based slicing, use .iloc[{key}]")
keys = [all_keys[n]] elif isinstance(key, datetime.datetime):
item = (key, self.time_series[key])
if isinstance(key, str):
try:
dt_key = datetime.datetime.strptime(key, FincalOptions.date_format)
item = (dt_key, self.time_series[dt_key])
except ValueError:
raise KeyError(f"{repr(key)}. If you passed a date as a string, "
"try setting the date format using Fincal.Options.date_format")
except KeyError:
raise KeyError(f"{repr(key)}. This date is not available.")
elif isinstance(key, Sequence):
item = [(k, self.time_series[k]) for k in key]
else: else:
keys = all_keys[n] raise TypeError(f"Invalid type {repr(type(key).__name__)} for slicing.")
item = [(key, self.time_series[key]) for key in keys]
if len(item) == 1:
return item[0]
return item return item
def __len__(self): def __len__(self):
return len(self.time_series) return len(self.time_series)
def head(self, n: int = 6): def head(self, n: int = 6):
"""Returns the first n items of the TimeSeries object"""
keys = list(self.time_series.keys()) keys = list(self.time_series.keys())
keys = keys[:n] keys = keys[:n]
result = [(key, self.time_series[key]) for key in keys] result = [(key, self.time_series[key]) for key in keys]
return result return result
def tail(self, n: int = 6): def tail(self, n: int = 6):
"""Returns the last n items of the TimeSeries object"""
keys = list(self.time_series.keys()) keys = list(self.time_series.keys())
keys = keys[-n:] keys = keys[-n:]
result = [(key, self.time_series[key]) for key in keys] result = [(key, self.time_series[key]) for key in keys]
return result return result
@property
def iloc(self):
"""Returns an item or a set of items based on index"""
return IndexSlicer(self)

View File

@ -77,23 +77,39 @@ class TimeSeries(TimeSeriesCore):
return new_ts return new_ts
def bfill(self, inplace=False): def bfill(self, inplace: bool = False, limit: int = None) -> Union[TimeSeries, None]:
num_days = (self.end_date - self.start_date).days + 1 """Backward fill missing dates in the time series
new_ts = dict() Parameters
for i in range(num_days): ----------
cur_date = self.end_date - datetime.timedelta(days=i) inplace : bool
Modify the time-series data in place and return None.
limit : int, optional
Maximum number of periods to back fill
Returns
-------
Returns a TimeSeries object if inplace is False, otherwise None
"""
eomonth = True if self.frequency.days >= AllFrequencies.M.days else False
dates_to_fill = create_date_series(self.start_date, self.end_date, self.frequency.symbol, eomonth)
dates_to_fill.append(self.end_date)
bfill_ts = dict()
for cur_date in reversed(dates_to_fill):
try: try:
cur_val = self.time_series[cur_date] cur_val = self.time_series[cur_date]
except KeyError: except KeyError:
pass pass
new_ts.update({cur_date: cur_val}) bfill_ts.update({cur_date: cur_val})
new_ts = {k: bfill_ts[k] for k in reversed(bfill_ts)}
if inplace: if inplace:
self.time_series = new_ts self.time_series = new_ts
return None return None
return dict(reversed(new_ts.items())) return new_ts
def calculate_returns( def calculate_returns(
self, self,