From 56af7c33aa0f1b2a74ca088e33884ae629ff330b Mon Sep 17 00:00:00 2001 From: gouravkr Date: Sat, 19 Feb 2022 23:03:41 +0530 Subject: [PATCH] ffill now fills based on frequency create date series supports eomonth parameter --- fincal/fincal.py | 253 ++++++++++------------------------------------- 1 file changed, 50 insertions(+), 203 deletions(-) diff --git a/fincal/fincal.py b/fincal/fincal.py index d846612..b11b0cf 100644 --- a/fincal/fincal.py +++ b/fincal/fincal.py @@ -1,211 +1,37 @@ +from __future__ import annotations + import datetime -from dataclasses import dataclass -from typing import Dict, Iterable, List, Literal, Tuple, Union +from typing import List, Union from dateutil.relativedelta import relativedelta - -@dataclass -class Options: - date_format: str = '%Y-%m-%d' - closest: str = 'before' # after - - -@dataclass(frozen=True) -class Frequency: - name: str - freq_type: str - value: int - days: int - - -class AllFrequencies: - D = Frequency('daily', 'days', 1, 1) - W = Frequency('weekly', 'days', 7, 7) - M = Frequency('monthly', 'months', 1, 30) - Q = Frequency('quarterly', 'months', 3, 91) - H = Frequency('half-yearly', 'months', 6, 182) - Y = Frequency('annual', 'years', 1, 365) +from .core import AllFrequencies, Frequency, TimeSeriesCore, _preprocess_match_options def create_date_series( - start_date: datetime.datetime, - end_date: datetime.datetime, - frequency: Frequency + start_date: datetime.datetime, end_date: datetime.datetime, frequency: Frequency, eomonth: bool = False ) -> List[datetime.datetime]: """Creates a date series using a frequency""" - print(f"{start_date=}, {end_date=}") - datediff = (end_date - start_date).days/frequency.days+1 + if eomonth and frequency.days < AllFrequencies.M.days: + raise ValueError(f"eomonth cannot be set to True if frequency is higher than {AllFrequencies.M.name}") + + datediff = (end_date - start_date).days / frequency.days + 1 dates = [] for i in range(0, int(datediff)): - diff = {frequency.freq_type: frequency.value*i} - dates.append(start_date + relativedelta(**diff)) + diff = {frequency.freq_type: frequency.value * i} + date = start_date + relativedelta(**diff) + if eomonth: + if date.month == 12: + date = date.replace(day=31) + else: + date = date.replace(day=1).replace(month=date.month+1) - relativedelta(days=1) + dates.append(date) return dates -def _preprocess_timeseries( - data: Union[ - List[Iterable[Union[str, datetime.datetime, float]]], - List[Dict[str, Union[float, datetime.datetime]]], - List[Dict[Union[str, datetime.datetime], float]], - Dict[Union[str, datetime.datetime], float] - ], - date_format: str -) -> List[Tuple[datetime.datetime, float]]: - """Converts any type of list to the correct type""" - - if isinstance(data, list): - if isinstance(data[0], dict): - if len(data[0].keys()) == 2: - current_data = [tuple(i.values()) for i in data] - elif len(data[0].keys()) == 1: - current_data = [tuple(*i.items()) for i in data] - else: - raise TypeError("Could not parse the data") - current_data = _preprocess_timeseries(current_data, date_format) - - elif isinstance(data[0], Iterable): - if isinstance(data[0][0], str): - current_data = [] - for i in data: - row = datetime.datetime.strptime(i[0], date_format), i[1] - current_data.append(row) - elif isinstance(data[0][0], datetime.datetime): - current_data = [(i, j) for i, j in data] - else: - raise TypeError("Could not parse the data") - else: - raise TypeError("Could not parse the data") - - elif isinstance(data, dict): - current_data = [(k, v) for k, v in data.items()] - current_data = _preprocess_timeseries(current_data, date_format) - - else: - raise TypeError("Could not parse the data") - current_data.sort() - return current_data - - -def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: - """Checks the arguments and returns appropriate timedelta objects""" - - deltas = {'exact': 0, 'previous': -1, 'next': 1} - if closest not in deltas.keys(): - raise ValueError(f"Invalid closest argument: {closest}") - - as_on_match = closest if as_on_match == 'closest' else as_on_match - prior_match = closest if prior_match == 'closest' else prior_match - - if as_on_match in deltas.keys(): - as_on_delta = datetime.timedelta(days=deltas[as_on_match]) - else: - raise ValueError(f"Invalid as_on_match argument: {as_on_match}") - - if prior_match in deltas.keys(): - prior_delta = datetime.timedelta(days=deltas[prior_match]) - else: - raise ValueError(f"Invalid prior_match argument: {prior_match}") - - return as_on_delta, prior_delta - - -class TimeSeriesCore: - """Defines the core building blocks of a TimeSeries object""" - - def __init__( - self, - data: List[Iterable], - date_format: str = "%Y-%m-%d", - frequency=Literal['D', 'W', 'M', 'Q', 'H', 'Y'] - ): - """Instantiate a TimeSeries object - - Parameters - ---------- - data : List[tuple] - Time Series data in the form of list of tuples. - The first element of each tuple should be a date and second element should be a value. - - date_format : str, optional, default "%Y-%m-%d" - Specify the format of the date - Required only if the first argument of tuples is a string. Otherwise ignored. - - frequency : str, optional, default "infer" - The frequency of the time series. Default is infer. - The class will try to infer the frequency automatically and adjust to the closest member. - Note that inferring frequencies can fail if the data is too irregular. - Valid values are {D, W, M, Q, H, Y} - """ - - data = _preprocess_timeseries(data, date_format=date_format) - - self.time_series = dict(data) - self.dates = set(list(self.time_series)) - if len(self.dates) != len(data): - print("Warning: The input data contains duplicate dates which have been ignored.") - self.start_date = list(self.time_series)[0] - self.end_date = list(self.time_series)[-1] - self.frequency = getattr(AllFrequencies, frequency) - - def __repr__(self): - if len(self.time_series) > 6: - printable_data_1 = list(self.time_series)[:3] - printable_data_2 = list(self.time_series)[-3:] - printable_str = "TimeSeries([{}\n\t...\n\t{}])".format( - ',\n\t'.join([str((i, self.time_series[i])) for i in printable_data_1]), - ',\n\t'.join([str((i, self.time_series[i])) for i in printable_data_2]) - ) - else: - printable_data = self.time_series - printable_str = "TimeSeries([{}])".format(',\n\t'.join( - [str((i, self.time_series[i])) for i in printable_data])) - return printable_str - - def __str__(self): - if len(self.time_series) > 6: - printable_data_1 = list(self.time_series)[:3] - printable_data_2 = list(self.time_series)[-3:] - printable_str = "[{}\n ...\n {}]".format( - ',\n '.join([str((i, self.time_series[i])) for i in printable_data_1]), - ',\n '.join([str((i, self.time_series[i])) for i in printable_data_2]) - ) - else: - printable_data = self.time_series - printable_str = "[{}]".format(',\n '.join([str((i, self.time_series[i])) for i in printable_data])) - return printable_str - - def __getitem__(self, n): - all_keys = list(self.time_series.keys()) - if isinstance(n, int): - keys = [all_keys[n]] - else: - keys = all_keys[n] - item = [(key, self.time_series[key]) for key in keys] - if len(item) == 1: - return item[0] - - return item - - def __len__(self): - return len(self.time_series.keys()) - - def head(self, n: int = 6): - keys = list(self.time_series.keys()) - keys = keys[:n] - result = [(key, self.time_series[key]) for key in keys] - return result - - def tail(self, n: int = 6): - keys = list(self.time_series.keys()) - keys = keys[-n:] - result = [(key, self.time_series[key]) for key in keys] - return result - - class TimeSeries(TimeSeriesCore): """Container for TimeSeries objects""" @@ -216,12 +42,27 @@ class TimeSeries(TimeSeriesCore): res_string = "First date: {}\nLast date: {}\nNumber of rows: {}" return res_string.format(self.start_date, self.end_date, total_dates) - def ffill(self, inplace=False): - num_days = (self.end_date - self.start_date).days + 1 + def ffill(self, inplace: bool = False, limit: int = None) -> Union[TimeSeries, None]: + """Forward fill missing dates in the time series + + Parameters + ---------- + inplace : bool + Modify the time-series data in place and return None. + + limit : int, optional + Maximum number of periods to forward fill + + Returns + ------- + Returns a TimeSeries object if inplace is False, otherwise None + """ + + eomonth = True if self.frequency.days >= AllFrequencies.M.days else False + dates_to_fill = create_date_series(self.start_date, self.end_date, self.frequency, eomonth) new_ts = dict() - for i in range(num_days): - cur_date = self.start_date + datetime.timedelta(days=i) + for cur_date in dates_to_fill: try: cur_val = self.time_series[cur_date] except KeyError: @@ -255,11 +96,11 @@ class TimeSeries(TimeSeriesCore): def calculate_returns( self, as_on: datetime.datetime, - as_on_match: str = 'closest', - prior_match: str = 'closest', + as_on_match: str = "closest", + prior_match: str = "closest", closest: str = "previous", compounding: bool = True, - years: int = 1 + years: int = 1, ) -> float: """Method to calculate returns for a certain time-period as on a particular date @@ -328,8 +169,8 @@ class TimeSeries(TimeSeriesCore): from_date: datetime.date, to_date: datetime.date, frequency: str = "D", - as_on_match: str = 'closest', - prior_match: str = 'closest', + as_on_match: str = "closest", + prior_match: str = "closest", closest: str = "previous", compounding: bool = True, years: int = 1, @@ -343,14 +184,20 @@ class TimeSeries(TimeSeriesCore): rolling_returns = [] for i in dates: - returns = self.calculate_returns(as_on=i, compounding=compounding, years=years, as_on_match=as_on_match, - prior_match=prior_match, closest=closest) + returns = self.calculate_returns( + as_on=i, + compounding=compounding, + years=years, + as_on_match=as_on_match, + prior_match=prior_match, + closest=closest, + ) rolling_returns.append((i, returns)) self.rolling_returns = rolling_returns return self.rolling_returns -if __name__ == '__main__': +if __name__ == "__main__": date_series = [ datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2),