Compare commits

..

No commits in common. "ef2973a1d1c3577ba5824255d21062819275c90c" and "77845ff501e5da9812694174b6a5360da9866949" have entirely different histories.

3 changed files with 109 additions and 102 deletions

View File

@ -29,12 +29,71 @@ class AllFrequencies:
Y = Frequency("annual", "years", 1, 365, "Y") Y = Frequency("annual", "years", 1, 365, "Y")
class DateNotFoundError(Exception): def _preprocess_timeseries(
"""Exception to be raised when date is not found""" data: Union[
Sequence[Iterable[Union[str, datetime.datetime, float]]],
Sequence[Mapping[str, Union[float, datetime.datetime]]],
Sequence[Mapping[Union[str, datetime.datetime], float]],
Mapping[Union[str, datetime.datetime], float],
],
date_format: str,
) -> List[Tuple[datetime.datetime, float]]:
"""Converts any type of list to the correct type"""
def __init__(self, message, date): if isinstance(data, Sequence):
message = f"{message}: {date}" if isinstance(data[0], Mapping):
super().__init__(message) if len(data[0].keys()) == 2:
current_data = [tuple(i.values()) for i in data]
elif len(data[0].keys()) == 1:
current_data = [tuple(*i.items()) for i in data]
else:
raise TypeError("Could not parse the data")
current_data = _preprocess_timeseries(current_data, date_format)
elif isinstance(data[0], Sequence):
if isinstance(data[0][0], str):
current_data = []
for i in data:
row = datetime.datetime.strptime(i[0], date_format), i[1]
current_data.append(row)
elif isinstance(data[0][0], datetime.datetime):
current_data = [(i, j) for i, j in data]
else:
raise TypeError("Could not parse the data")
else:
raise TypeError("Could not parse the data")
elif isinstance(data, Mapping):
current_data = [(k, v) for k, v in data.items()]
current_data = _preprocess_timeseries(current_data, date_format)
else:
raise TypeError("Could not parse the data")
current_data.sort()
return current_data
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
"""Checks the arguments and returns appropriate timedelta objects"""
deltas = {"exact": 0, "previous": -1, "next": 1}
if closest not in deltas.keys():
raise ValueError(f"Invalid closest argument: {closest}")
as_on_match = closest if as_on_match == "closest" else as_on_match
prior_match = closest if prior_match == "closest" else prior_match
if as_on_match in deltas.keys():
as_on_delta = datetime.timedelta(days=deltas[as_on_match])
else:
raise ValueError(f"Invalid as_on_match argument: {as_on_match}")
if prior_match in deltas.keys():
prior_delta = datetime.timedelta(days=deltas[prior_match])
else:
raise ValueError(f"Invalid prior_match argument: {prior_match}")
return as_on_delta, prior_delta
def _parse_date(date: str, date_format: str = None): def _parse_date(date: str, date_format: str = None):
@ -55,85 +114,15 @@ def _parse_date(date: str, date_format: str = None):
return date return date
def _preprocess_timeseries( def _interval_to_years(interval_type: Literal['years', 'months', 'day'], interval_value: int) -> int:
data: Union[
Sequence[Iterable[Union[str, datetime.datetime, float]]],
Sequence[Mapping[str, Union[float, datetime.datetime]]],
Sequence[Mapping[Union[str, datetime.datetime], float]],
Mapping[Union[str, datetime.datetime], float],
],
date_format: str,
) -> List[Tuple[datetime.datetime, float]]:
"""Converts any type of list to the correct type"""
if isinstance(data, Mapping):
current_data = [(k, v) for k, v in data.items()]
return _preprocess_timeseries(current_data, date_format)
if not isinstance(data, Sequence):
raise TypeError("Could not parse the data")
if isinstance(data[0], Sequence):
return sorted([(_parse_date(i, date_format), j) for i, j in data])
if not isinstance(data[0], Mapping):
raise TypeError("Could not parse the data")
if len(data[0]) == 1:
current_data = [tuple(*i.items()) for i in data]
elif len(data[0]) == 2:
current_data = [tuple(i.values()) for i in data]
else:
raise TypeError("Could not parse the data")
return _preprocess_timeseries(current_data, date_format)
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
"""Checks the arguments and returns appropriate timedelta objects"""
deltas = {"exact": 0, "previous": -1, "next": 1}
if closest not in deltas.keys():
raise ValueError(f"Invalid argument for closest: {closest}")
as_on_match = closest if as_on_match == "closest" else as_on_match
prior_match = closest if prior_match == "closest" else prior_match
if as_on_match in deltas.keys():
as_on_delta = datetime.timedelta(days=deltas[as_on_match])
else:
raise ValueError(f"Invalid as_on_match argument: {as_on_match}")
if prior_match in deltas.keys():
prior_delta = datetime.timedelta(days=deltas[prior_match])
else:
raise ValueError(f"Invalid prior_match argument: {prior_match}")
return as_on_delta, prior_delta
def _find_closest_date(data, date, delta, if_not_found):
"""Helper function to find data for the closest available date"""
row = data.get(date, None)
if row is not None:
return date, row
if delta:
return _find_closest_date(data, date + delta, delta, if_not_found)
if if_not_found == "fail":
raise DateNotFoundError("Data not found for date", date)
if if_not_found == "nan":
return date, float("NaN")
raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
def _interval_to_years(interval_type: Literal["years", "months", "day"], interval_value: int) -> int:
"""Converts any time period to years for use with compounding functions""" """Converts any time period to years for use with compounding functions"""
year_conversion_factor = {"years": 1, "months": 12, "days": 365} day_conversion_factor = {
years = interval_value / year_conversion_factor[interval_type] 'years': 1,
'months': 12,
'days': 365
}
years = interval_value/day_conversion_factor[interval_type]
return years return years

View File

@ -8,7 +8,6 @@ from dateutil.relativedelta import relativedelta
from .core import ( from .core import (
AllFrequencies, AllFrequencies,
TimeSeriesCore, TimeSeriesCore,
_find_closest_date,
_interval_to_years, _interval_to_years,
_parse_date, _parse_date,
_preprocess_match_options, _preprocess_match_options,
@ -190,19 +189,40 @@ class TimeSeries(TimeSeriesCore):
as_on = _parse_date(as_on, date_format) as_on = _parse_date(as_on, date_format)
as_on_delta, prior_delta = _preprocess_match_options(as_on_match, prior_match, closest) as_on_delta, prior_delta = _preprocess_match_options(as_on_match, prior_match, closest)
original_as_on = as_on
while True:
current = self.data.get(as_on, None)
if current is not None:
break
elif not as_on_delta:
if if_not_found == 'fail':
raise ValueError(f"As on date {original_as_on} not found")
elif if_not_found == 'nan':
return as_on, float("NaN")
else:
raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
as_on += as_on_delta
prev_date = as_on - relativedelta(**{interval_type: interval_value}) prev_date = as_on - relativedelta(**{interval_type: interval_value})
current = _find_closest_date(self.data, as_on, as_on_delta, if_not_found) while True:
previous = _find_closest_date(self.data, prev_date, prior_delta, if_not_found) previous = self.data.get(prev_date, None)
if previous is not None:
break
elif not prior_delta:
if if_not_found == 'fail':
raise ValueError(f"Previous date {previous} not found")
elif if_not_found == 'nan':
return (as_on if return_actual_date else original_as_on), float("NaN")
else:
raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
prev_date += prior_delta
if current[1] == str('nan') or previous[1] == str('nan'): returns = current / previous
return as_on, float('NaN')
returns = current[1] / previous[1]
if compounding: if compounding:
years = _interval_to_years(interval_type, interval_value) years = _interval_to_years(interval_type, interval_value)
returns = returns ** (1 / years) returns = returns ** (1 / years)
return (current[0] if return_actual_date else as_on), returns - 1 return (as_on if return_actual_date else original_as_on), returns - 1
def calculate_rolling_returns( def calculate_rolling_returns(
self, self,
@ -254,13 +274,13 @@ class TimeSeries(TimeSeriesCore):
if __name__ == "__main__": if __name__ == "__main__":
date_series = [ date_series = [
datetime.datetime(2020, 1, 11), datetime.datetime(2020, 1, 1),
datetime.datetime(2020, 1, 2),
datetime.datetime(2020, 1, 3),
datetime.datetime(2020, 1, 4),
datetime.datetime(2020, 1, 7),
datetime.datetime(2020, 1, 8),
datetime.datetime(2020, 1, 9),
datetime.datetime(2020, 1, 10),
datetime.datetime(2020, 1, 12), datetime.datetime(2020, 1, 12),
datetime.datetime(2020, 1, 13),
datetime.datetime(2020, 1, 14),
datetime.datetime(2020, 1, 17),
datetime.datetime(2020, 1, 18),
datetime.datetime(2020, 1, 19),
datetime.datetime(2020, 1, 20),
datetime.datetime(2020, 1, 22),
] ]

View File

@ -4,7 +4,7 @@ import random
from typing import Literal, Sequence from typing import Literal, Sequence
import pytest import pytest
from fincal.core import DateNotFoundError, FincalOptions, Frequency, Series from fincal.core import FincalOptions, Frequency, Series
from fincal.fincal import TimeSeries, create_date_series from fincal.fincal import TimeSeries, create_date_series
THIS_DIR = os.path.dirname(os.path.abspath(__file__)) THIS_DIR = os.path.dirname(os.path.abspath(__file__))
@ -209,10 +209,8 @@ class TestReturns:
assert round(returns[1], 4) == 5.727 assert round(returns[1], 4) == 5.727
returns = ts.calculate_returns("2020-04-10", compounding=True, interval_type='days', interval_value=90) returns = ts.calculate_returns("2020-04-10", compounding=True, interval_type='days', interval_value=90)
assert round(returns[1], 4) == 5.727 assert round(returns[1], 4) == 5.727
with pytest.raises(DateNotFoundError): with pytest.raises(ValueError):
ts.calculate_returns("2020-04-10", interval_type='days', interval_value=90, as_on_match='exact') ts.calculate_returns("2020-04-10", interval_type='days', interval_value=90, as_on_match='exact')
with pytest.raises(DateNotFoundError):
ts.calculate_returns("2020-04-10", interval_type='days', interval_value=90, prior_match='exact')
def test_date_formats(self): def test_date_formats(self):
ts = TimeSeries(self.data, frequency='M') ts = TimeSeries(self.data, frequency='M')