import datetime from dataclasses import dataclass from typing import List from dateutil.relativedelta import relativedelta @dataclass class Options: date_format: str = '%Y-%m-%d' closest: str = 'before' # after class Frequency: def __init__(self, name, interval_type, interval_value, interval_days_value): self.name = name self.type = interval_type self.value = interval_value self.days = interval_days_value def __repr__(self): return f"Frequency({self.name}, {self.type}, {self.value}, {self.days})" class AllFrequencies: D = Frequency('daily', 'days', 1, 1) W = Frequency('weekly', 'days', 7, 7) M = Frequency('monthly', 'months', 1, 30) Q = Frequency('quarterly', 'months', 3, 91) H = Frequency('half-yearly', 'months', 6, 182) Y = Frequency('annual', 'years', 1, 365) def create_date_series( start_date: datetime.datetime, end_date: datetime.datetime, frequency: Frequency ) -> List[datetime.datetime]: """Creates a date series using a frequency""" print(f"{start_date=}, {end_date=}") datediff = (end_date - start_date).days/frequency.days+1 dates = [] for i in range(0, int(datediff)): diff = {frequency.type: frequency.value*i} dates.append(start_date + relativedelta(**diff)) return dates class TimeSeries: """Container for TimeSeries objects""" def __init__(self, data: List[tuple], date_format: str = "%Y-%m-%d", frequency="D"): """Instantiate a TimeSeries object Parameters ---------- data : List[tuple] Time Series data in the form of list of tuples. The first element of each tuple should be a date and second element should be a value. date_format : str, optional, default "%Y-%m-%d" Specify the format of the date Required only if the first argument of tuples is a string. Otherwise ignored. frequency : str, optional, default "infer" The frequency of the time series. Default is infer. The class will try to infer the frequency automatically and adjust to the closest member. Note that inferring frequencies can fail if the data is too irregular. Valid values are {D, W, M, Q, H, Y} """ time_series = [(datetime.datetime.strptime(i[0], date_format), i[1]) for i in data] time_series.sort() self.time_series = dict(time_series) self.dates = set(list(self.time_series)) if len(self.dates) != len(time_series): print("Warning: The input data contains duplicate dates which have been ignored.") self.start_date = list(self.time_series)[0] self.end_date = list(self.time_series)[-1] self.frequency = getattr(AllFrequencies, frequency) def __repr__(self): if len(self.time_series) > 6: printable_data_1 = list(self.time_series)[:3] printable_data_2 = list(self.time_series)[-3:] printable_str = "TimeSeries([{}\n\t...\n\t{}])".format( ',\n\t'.join([str({i: self.time_series[i]}) for i in printable_data_1]), ',\n\t'.join([str({i: self.time_series[i]}) for i in printable_data_2]) ) else: printable_data = self.time_series printable_str = "TimeSeries([{}])".format(',\n\t'.join( [str({i: self.time_series[i]}) for i in printable_data])) return printable_str def __str__(self): if len(self.time_series) > 6: printable_data_1 = list(self.time_series)[:3] printable_data_2 = list(self.time_series)[-3:] printable_str = "[{}\n ...\n {}]".format( ',\n '.join([str({i: self.time_series[i]}) for i in printable_data_1]), ',\n '.join([str({i: self.time_series[i]}) for i in printable_data_2]) ) else: printable_data = self.time_series printable_str = "[{}]".format(',\n '.join([str({i: self.time_series[i]}) for i in printable_data])) return printable_str def info(self): """Summary info about the TimeSeries object""" total_dates = len(self.time_series.keys()) res_string = "First date: {}\nLast date: {}\nNumber of rows: {}" return res_string.format(self.start_date, self.end_date, total_dates) def ffill(self, inplace=False): num_days = (self.end_date - self.start_date).days + 1 new_ts = dict() for i in range(num_days): cur_date = self.start_date + datetime.timedelta(days=i) try: cur_val = self.time_series[cur_date] except KeyError: pass new_ts.update({cur_date: cur_val}) # type: ignore if inplace: self.time_series = new_ts return None return new_ts def bfill(self, inplace=False): num_days = (self.end_date - self.start_date).days + 1 new_ts = dict() for i in range(num_days): cur_date = self.end_date - datetime.timedelta(days=i) try: cur_val = self.time_series[cur_date] except KeyError: pass new_ts.update({cur_date: cur_val}) # type: ignore if inplace: self.time_series = new_ts return None return dict(reversed(new_ts.items())) def calculate_returns( self, as_on: datetime.datetime, closest: str = "previous", compounding: bool = True, years: int = 1 ) -> int: """Method to calculate returns for a certain time-period as on a particular date >>> calculate_returns(datetime.date(2020, 1, 1), years=1) """ try: current = self.time_series[as_on] except KeyError: raise ValueError("As on date not found") prev_date = as_on - relativedelta(years=years) if closest == "previous": delta = -1 elif closest == "next": delta = 1 else: raise ValueError(f"Invalid value for closes parameter: {closest}") while True: try: previous = self.time_series[prev_date] break except KeyError: prev_date = prev_date + relativedelta(days=delta) returns = current / previous if compounding: returns = returns ** (1 / years) return returns - 1 def calculate_rolling_returns( self, from_date: datetime.date, to_date: datetime.date, frequency: str = "d", closest: str = "previous", compounding: bool = True, years: int = 1, ) -> List[tuple]: """Calculates the rolling return""" datediff = (to_date - from_date).days all_dates = set() for i in range(datediff): all_dates.add(from_date + datetime.timedelta(days=i)) dates = all_dates.intersection(self.dates) rolling_returns = [] for i in dates: returns = self.calculate_returns(as_on=i, compounding=compounding, years=years, closest=closest) rolling_returns.append((i, returns)) self.rolling_returns = rolling_returns return self.rolling_returns if __name__ == '__main__': date_series = [ datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2), datetime.datetime(2020, 1, 3), datetime.datetime(2020, 1, 4), datetime.datetime(2020, 1, 7), datetime.datetime(2020, 1, 8), datetime.datetime(2020, 1, 9), datetime.datetime(2020, 1, 10), datetime.datetime(2020, 1, 12), ]