Compare commits

...

3 Commits

  1. 107
      fincal/core.py
  2. 54
      fincal/fincal.py
  3. 6
      tests/test_fincal.py

107
fincal/core.py

@ -29,6 +29,32 @@ class AllFrequencies:
Y = Frequency("annual", "years", 1, 365, "Y") Y = Frequency("annual", "years", 1, 365, "Y")
class DateNotFoundError(Exception):
"""Exception to be raised when date is not found"""
def __init__(self, message, date):
message = f"{message}: {date}"
super().__init__(message)
def _parse_date(date: str, date_format: str = None):
"""Parses date and handles errors"""
if isinstance(date, (datetime.datetime, datetime.date)):
return datetime.datetime.fromordinal(date.toordinal())
if date_format is None:
date_format = FincalOptions.date_format
try:
date = datetime.datetime.strptime(date, date_format)
except TypeError:
raise ValueError("Date does not seem to be valid date-like string")
except ValueError:
raise ValueError("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?")
return date
def _preprocess_timeseries( def _preprocess_timeseries(
data: Union[ data: Union[
Sequence[Iterable[Union[str, datetime.datetime, float]]], Sequence[Iterable[Union[str, datetime.datetime, float]]],
@ -40,37 +66,26 @@ def _preprocess_timeseries(
) -> List[Tuple[datetime.datetime, float]]: ) -> List[Tuple[datetime.datetime, float]]:
"""Converts any type of list to the correct type""" """Converts any type of list to the correct type"""
if isinstance(data, Sequence): if isinstance(data, Mapping):
if isinstance(data[0], Mapping):
if len(data[0].keys()) == 2:
current_data = [tuple(i.values()) for i in data]
elif len(data[0].keys()) == 1:
current_data = [tuple(*i.items()) for i in data]
else:
raise TypeError("Could not parse the data")
current_data = _preprocess_timeseries(current_data, date_format)
elif isinstance(data[0], Sequence):
if isinstance(data[0][0], str):
current_data = []
for i in data:
row = datetime.datetime.strptime(i[0], date_format), i[1]
current_data.append(row)
elif isinstance(data[0][0], datetime.datetime):
current_data = [(i, j) for i, j in data]
else:
raise TypeError("Could not parse the data")
else:
raise TypeError("Could not parse the data")
elif isinstance(data, Mapping):
current_data = [(k, v) for k, v in data.items()] current_data = [(k, v) for k, v in data.items()]
current_data = _preprocess_timeseries(current_data, date_format) return _preprocess_timeseries(current_data, date_format)
if not isinstance(data, Sequence):
raise TypeError("Could not parse the data")
if isinstance(data[0], Sequence):
return sorted([(_parse_date(i, date_format), j) for i, j in data])
if not isinstance(data[0], Mapping):
raise TypeError("Could not parse the data")
if len(data[0]) == 1:
current_data = [tuple(*i.items()) for i in data]
elif len(data[0]) == 2:
current_data = [tuple(i.values()) for i in data]
else: else:
raise TypeError("Could not parse the data") raise TypeError("Could not parse the data")
current_data.sort() return _preprocess_timeseries(current_data, date_format)
return current_data
def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta: def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str) -> datetime.timedelta:
@ -78,7 +93,7 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
deltas = {"exact": 0, "previous": -1, "next": 1} deltas = {"exact": 0, "previous": -1, "next": 1}
if closest not in deltas.keys(): if closest not in deltas.keys():
raise ValueError(f"Invalid closest argument: {closest}") raise ValueError(f"Invalid argument for closest: {closest}")
as_on_match = closest if as_on_match == "closest" else as_on_match as_on_match = closest if as_on_match == "closest" else as_on_match
prior_match = closest if prior_match == "closest" else prior_match prior_match = closest if prior_match == "closest" else prior_match
@ -96,33 +111,29 @@ def _preprocess_match_options(as_on_match: str, prior_match: str, closest: str)
return as_on_delta, prior_delta return as_on_delta, prior_delta
def _parse_date(date: str, date_format: str = None): def _find_closest_date(data, date, delta, if_not_found):
"""Parses date and handles errors""" """Helper function to find data for the closest available date"""
if isinstance(date, (datetime.datetime, datetime.date)): row = data.get(date, None)
return datetime.datetime.fromordinal(date.toordinal()) if row is not None:
return date, row
if date_format is None: if delta:
date_format = FincalOptions.date_format return _find_closest_date(data, date + delta, delta, if_not_found)
try: if if_not_found == "fail":
date = datetime.datetime.strptime(date, date_format) raise DateNotFoundError("Data not found for date", date)
except TypeError: if if_not_found == "nan":
raise ValueError("Date does not seem to be valid date-like string") return date, float("NaN")
except ValueError:
raise ValueError("Date could not be parsed. Have you set the correct date format in FincalOptions.date_format?") raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
return date
def _interval_to_years(interval_type: Literal['years', 'months', 'day'], interval_value: int) -> int: def _interval_to_years(interval_type: Literal["years", "months", "day"], interval_value: int) -> int:
"""Converts any time period to years for use with compounding functions""" """Converts any time period to years for use with compounding functions"""
day_conversion_factor = { year_conversion_factor = {"years": 1, "months": 12, "days": 365}
'years': 1, years = interval_value / year_conversion_factor[interval_type]
'months': 12,
'days': 365
}
years = interval_value/day_conversion_factor[interval_type]
return years return years

54
fincal/fincal.py

@ -8,6 +8,7 @@ from dateutil.relativedelta import relativedelta
from .core import ( from .core import (
AllFrequencies, AllFrequencies,
TimeSeriesCore, TimeSeriesCore,
_find_closest_date,
_interval_to_years, _interval_to_years,
_parse_date, _parse_date,
_preprocess_match_options, _preprocess_match_options,
@ -189,40 +190,19 @@ class TimeSeries(TimeSeriesCore):
as_on = _parse_date(as_on, date_format) as_on = _parse_date(as_on, date_format)
as_on_delta, prior_delta = _preprocess_match_options(as_on_match, prior_match, closest) as_on_delta, prior_delta = _preprocess_match_options(as_on_match, prior_match, closest)
original_as_on = as_on
while True:
current = self.data.get(as_on, None)
if current is not None:
break
elif not as_on_delta:
if if_not_found == 'fail':
raise ValueError(f"As on date {original_as_on} not found")
elif if_not_found == 'nan':
return as_on, float("NaN")
else:
raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
as_on += as_on_delta
prev_date = as_on - relativedelta(**{interval_type: interval_value}) prev_date = as_on - relativedelta(**{interval_type: interval_value})
while True: current = _find_closest_date(self.data, as_on, as_on_delta, if_not_found)
previous = self.data.get(prev_date, None) previous = _find_closest_date(self.data, prev_date, prior_delta, if_not_found)
if previous is not None:
break if current[1] == str('nan') or previous[1] == str('nan'):
elif not prior_delta: return as_on, float('NaN')
if if_not_found == 'fail':
raise ValueError(f"Previous date {previous} not found") returns = current[1] / previous[1]
elif if_not_found == 'nan':
return (as_on if return_actual_date else original_as_on), float("NaN")
else:
raise ValueError(f"Invalid argument for if_not_found: {if_not_found}")
prev_date += prior_delta
returns = current / previous
if compounding: if compounding:
years = _interval_to_years(interval_type, interval_value) years = _interval_to_years(interval_type, interval_value)
returns = returns ** (1 / years) returns = returns ** (1 / years)
return (as_on if return_actual_date else original_as_on), returns - 1 return (current[0] if return_actual_date else as_on), returns - 1
def calculate_rolling_returns( def calculate_rolling_returns(
self, self,
@ -274,13 +254,13 @@ class TimeSeries(TimeSeriesCore):
if __name__ == "__main__": if __name__ == "__main__":
date_series = [ date_series = [
datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 11),
datetime.datetime(2020, 1, 2),
datetime.datetime(2020, 1, 3),
datetime.datetime(2020, 1, 4),
datetime.datetime(2020, 1, 7),
datetime.datetime(2020, 1, 8),
datetime.datetime(2020, 1, 9),
datetime.datetime(2020, 1, 10),
datetime.datetime(2020, 1, 12), datetime.datetime(2020, 1, 12),
datetime.datetime(2020, 1, 13),
datetime.datetime(2020, 1, 14),
datetime.datetime(2020, 1, 17),
datetime.datetime(2020, 1, 18),
datetime.datetime(2020, 1, 19),
datetime.datetime(2020, 1, 20),
datetime.datetime(2020, 1, 22),
] ]

6
tests/test_fincal.py

@ -4,7 +4,7 @@ import random
from typing import Literal, Sequence from typing import Literal, Sequence
import pytest import pytest
from fincal.core import FincalOptions, Frequency, Series from fincal.core import DateNotFoundError, FincalOptions, Frequency, Series
from fincal.fincal import TimeSeries, create_date_series from fincal.fincal import TimeSeries, create_date_series
THIS_DIR = os.path.dirname(os.path.abspath(__file__)) THIS_DIR = os.path.dirname(os.path.abspath(__file__))
@ -209,8 +209,10 @@ class TestReturns:
assert round(returns[1], 4) == 5.727 assert round(returns[1], 4) == 5.727
returns = ts.calculate_returns("2020-04-10", compounding=True, interval_type='days', interval_value=90) returns = ts.calculate_returns("2020-04-10", compounding=True, interval_type='days', interval_value=90)
assert round(returns[1], 4) == 5.727 assert round(returns[1], 4) == 5.727
with pytest.raises(ValueError): with pytest.raises(DateNotFoundError):
ts.calculate_returns("2020-04-10", interval_type='days', interval_value=90, as_on_match='exact') ts.calculate_returns("2020-04-10", interval_type='days', interval_value=90, as_on_match='exact')
with pytest.raises(DateNotFoundError):
ts.calculate_returns("2020-04-10", interval_type='days', interval_value=90, prior_match='exact')
def test_date_formats(self): def test_date_formats(self):
ts = TimeSeries(self.data, frequency='M') ts = TimeSeries(self.data, frequency='M')

Loading…
Cancel
Save