A Python library for working with time series data. It comes with common financial functions built-in.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
6.0 KiB

import datetime
from typing import Any, Dict, Iterable, List, Union
from dateutil.relativedelta import relativedelta
class TimeSeries:
"""Container for TimeSeries objects"""
def __init__(self, data: List[tuple], date_format: str = "%Y-%m-%d", frequency="infer"):
"""Instantiate a TimeSeries object
Parameters
----------
data : List[tuple]
Time Series data in the form of list of tuples.
The first element of each tuple should be a date and second element should be a value.
date_format : str, optional, default "%Y-%m-%d"
Specify the format of the date
Required only if the first argument of tuples is a string. Otherwise ignored.
frequency : str, optional, default "infer"
The frequency of the time series. Default is infer.
The class will try to infer the frequency automatically and adjust to the closest member.
Note that inferring frequencies can fail if the data is too irregular.
Valid values are {D, W, M, Q, H, Y}
"""
time_series = [(datetime.datetime.strptime(i[0], date_format), i[1]) for i in data]
time_series.sort()
self.time_series = dict(time_series)
self.dates = set(list(self.time_series))
if len(self.dates) != len(time_series):
print("Warning: The input data contains duplicate dates which have been ignored.")
self.start_date = list(self.time_series)[0]
self.end_date = list(self.time_series)[-1]
def __repr__(self):
if len(self.time_series) > 6:
printable_data_1 = list(self.time_series)[:3]
printable_data_2 = list(self.time_series)[-3:]
printable_str = "TimeSeries([{}\n\t...\n\t{}])".format(
',\n\t'.join([str({i: self.time_series[i]}) for i in printable_data_1]),
',\n\t'.join([str({i: self.time_series[i]}) for i in printable_data_2])
)
else:
printable_data = self.time_series
printable_str = "TimeSeries([{}])".format(',\n\t'.join(
[str({i: self.time_series[i]}) for i in printable_data]))
return printable_str
def __str__(self):
if len(self.time_series) > 6:
printable_data_1 = list(self.time_series)[:3]
printable_data_2 = list(self.time_series)[-3:]
printable_str = "[{}\n ...\n {}]".format(
',\n '.join([str({i: self.time_series[i]}) for i in printable_data_1]),
',\n '.join([str({i: self.time_series[i]}) for i in printable_data_2])
)
else:
printable_data = self.time_series
printable_str = "[{}]".format(',\n '.join([str({i: self.time_series[i]}) for i in printable_data]))
return printable_str
def info(self):
"""Summary info about the TimeSeries object"""
total_dates = len(self.time_series.keys())
res_string = "First date: {}\nLast date: {}\nNumber of rows: {}"
return res_string.format(self.start_date, self.end_date, total_dates)
def ffill(self, inplace=False):
num_days = (self.end_date - self.start_date).days + 1
new_ts = dict()
for i in range(num_days):
cur_date = self.start_date + datetime.timedelta(days=i)
try:
cur_val = self.time_series[cur_date]
except KeyError:
pass
new_ts.update({cur_date: cur_val})
if inplace:
self.time_series = new_ts
return None
return new_ts
def bfill(self, inplace=False):
num_days = (self.end_date - self.start_date).days + 1
new_ts = dict()
for i in range(num_days):
cur_date = self.end_date - datetime.timedelta(days=i)
try:
cur_val = self.time_series[cur_date]
except KeyError:
pass
new_ts.update({cur_date: cur_val})
if inplace:
self.time_series = new_ts
return None
return dict(reversed(new_ts.items()))
def calculate_returns(
self, as_on: datetime.datetime, closest: str = "previous", compounding: bool = True, years: int = 1
) -> int:
"""Method to calculate returns for a certain time-period as on a particular date
>>> calculate_returns(datetime.date(2020, 1, 1), years=1)
"""
try:
current = self.time_series[as_on]
except KeyError:
raise ValueError("As on date not found")
prev_date = as_on - relativedelta(years=years)
if closest == "previous":
delta = -1
elif closest == "next":
delta = 1
else:
raise ValueError(f"Invalid value for closes parameter: {closest}")
while True:
try:
previous = self.time_series[prev_date]
break
except KeyError:
prev_date = prev_date + relativedelta(days=delta)
returns = current / previous
if compounding:
returns = returns ** (1 / years)
return returns - 1
def calculate_rolling_returns(
self,
from_date: datetime.date,
to_date: datetime.date,
frequency: str = "d",
closest: str = "previous",
compounding: bool = True,
years: int = 1,
) -> List[tuple]:
"""Calculates the rolling return"""
datediff = (to_date - from_date).days
all_dates = set()
for i in range(datediff):
all_dates.add(from_date + datetime.timedelta(days=i))
dates = all_dates.intersection(self.dates)
rolling_returns = []
for i in dates:
returns = self.calculate_returns(as_on=i, compounding=compounding, years=years, closest=closest)
rolling_returns.append((i, returns))
self.rolling_returns = rolling_returns
return self.rolling_returns