implemented beta, yet to check edge cases

This commit is contained in:
Gourav Kumar 2022-05-29 17:56:00 +05:30
parent 922fe0f027
commit 177e3bc4c8
2 changed files with 53 additions and 1 deletions

View File

@ -7,7 +7,6 @@ from collections import UserList
from dataclasses import dataclass from dataclasses import dataclass
from numbers import Number from numbers import Number
from typing import Any, Callable, Iterable, List, Literal, Mapping, Sequence, Type from typing import Any, Callable, Iterable, List, Literal, Mapping, Sequence, Type
from unittest import skip
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta

View File

@ -1,9 +1,11 @@
import datetime import datetime
import statistics
from typing import Literal from typing import Literal
from fincal.core import date_parser from fincal.core import date_parser
from .fincal import TimeSeries from .fincal import TimeSeries
from .utils import _interval_to_years
@date_parser(3, 4) @date_parser(3, 4)
@ -21,6 +23,13 @@ def sharpe_ratio(
closest: Literal["previous", "next"] = "previous", closest: Literal["previous", "next"] = "previous",
date_format: str = None, date_format: str = None,
): ):
interval_days = int(_interval_to_years(return_period_unit, return_period_value) * 365 + 1)
if from_date is None:
from_date = time_series_data.start_date + datetime.timedelta(days=interval_days)
if to_date is None:
to_date = time_series_data.end_date
if risk_free_data is None and risk_free_rate is None: if risk_free_data is None and risk_free_rate is None:
raise ValueError("At least one of risk_free_data or risk_free rate is required") raise ValueError("At least one of risk_free_data or risk_free rate is required")
elif risk_free_data is not None: elif risk_free_data is not None:
@ -47,3 +56,47 @@ def sharpe_ratio(
sharpe_ratio_value = excess_returns / sd sharpe_ratio_value = excess_returns / sd
return sharpe_ratio_value return sharpe_ratio_value
@date_parser(2, 3)
def beta(
asset_data: TimeSeries,
market_data: TimeSeries,
from_date: str | datetime.datetime = None,
to_date: str | datetime.datetime = None,
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
return_period_unit: Literal["years", "months", "days"] = "years",
return_period_value: int = 1,
as_on_match: str = "closest",
prior_match: str = "closest",
closest: Literal["previous", "next"] = "previous",
date_format: str = None,
):
interval_days = int(_interval_to_years(return_period_unit, return_period_value) * 365 + 1)
if from_date is None:
from_date = asset_data.start_date + datetime.timedelta(days=interval_days)
if to_date is None:
to_date = asset_data.end_date
common_params = {
"from_date": from_date,
"to_date": to_date,
"frequency": frequency,
"return_period_unit": return_period_unit,
"return_period_value": return_period_value,
"as_on_match": as_on_match,
"prior_match": prior_match,
"closest": closest,
"date_format": date_format,
}
asset_rr = asset_data.calculate_rolling_returns(**common_params)
market_rr = market_data.calculate_rolling_returns(**common_params)
cov = statistics.covariance(asset_rr.values, market_rr.values)
market_var = statistics.variance(market_rr.values)
beta = cov / market_var
return beta