Compare commits

...

5 Commits

8 changed files with 411 additions and 141 deletions

161
README.md
View File

@ -29,7 +29,7 @@ Example:
... ('2021-06-01', 20) ... ('2021-06-01', 20)
...] ...]
>>> ts = fc.TimeSeries(time_series_data) >>> ts = pft.TimeSeries(time_series_data)
``` ```
### Sample usage ### Sample usage
@ -46,12 +46,169 @@ With PyFacts, you never have to go into the hassle of creating datetime objects
``` ```
>>> import pyfacts as pft >>> import pyfacts as pft
>>> fc.PyfactsOptions.date_format = '%d-%m-%Y' >>> pft.PyfactsOptions.date_format = '%d-%m-%Y'
``` ```
Now the library will automatically parse all dates as DD-MM-YYYY Now the library will automatically parse all dates as DD-MM-YYYY
If you happen to have any one situation where you need to use a different format, all methods accept a date_format parameter to override the default. If you happen to have any one situation where you need to use a different format, all methods accept a date_format parameter to override the default.
### Working with multiple time series
While working with time series data, you will often need to perform calculations on the data. PyFacts supports all kinds of mathematical operations on time series.
Example:
```
>>> import pyfacts as pft
>>> time_series_data = [
... ('2021-01-01', 10),
... ('2021-02-01', 12),
... ('2021-03-01', 14),
... ('2021-04-01', 16),
... ('2021-05-01', 18),
... ('2021-06-01', 20)
...]
>>> ts = pft.TimeSeries(time_series_data)
>>> print(ts/100)
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 0.1),
(datetime.datetime(2022, 1, 2, 0, 0), 0.12),
(datetime.datetime(2022, 1, 3, 0, 0), 0.14),
(datetime.datetime(2022, 1, 4, 0, 0), 0.16),
(datetime.datetime(2022, 1, 6, 0, 0), 0.18),
(datetime.datetime(2022, 1, 7, 0, 0), 0.2)], frequency='M')
```
Mathematical operations can also be done between time series as long as they have the same dates.
Example:
```
>>> import pyfacts as pft
>>> time_series_data = [
... ('2021-01-01', 10),
... ('2021-02-01', 12),
... ('2021-03-01', 14),
... ('2021-04-01', 16),
... ('2021-05-01', 18),
... ('2021-06-01', 20)
...]
>>> ts = pft.TimeSeries(time_series_data)
>>> ts2 = pft.TimeSeries(time_series_data)
>>> print(ts/ts2)
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 1.0),
(datetime.datetime(2022, 1, 2, 0, 0), 1.0),
(datetime.datetime(2022, 1, 3, 0, 0), 1.0),
(datetime.datetime(2022, 1, 4, 0, 0), 1.0),
(datetime.datetime(2022, 1, 6, 0, 0), 1.0),
(datetime.datetime(2022, 1, 7, 0, 0), 1.0)], frequency='M')
```
However, if the dates are not in sync, PyFacts provides convenience methods for syncronising dates.
Example:
```
>>> import pyfacts as pft
>>> data1 = [
... ('2021-01-01', 10),
... ('2021-02-01', 12),
... ('2021-03-01', 14),
... ('2021-04-01', 16),
... ('2021-05-01', 18),
... ('2021-06-01', 20)
...]
>>> data2 = [
... ("2022-15-01", 20),
... ("2022-15-02", 22),
... ("2022-15-03", 24),
... ("2022-15-04", 26),
... ("2022-15-06", 28),
... ("2022-15-07", 30)
...]
>>> ts = pft.TimeSeries(data, frequency='M', date_format='%Y-%d-%m')
>>> ts2 = pft.TimeSeries(data2, frequency='M', date_format='%Y-%d-%m')
>>> ts.sync(ts2, fill_method='bfill') # Sync ts2 with ts1
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 20.0),
(datetime.datetime(2022, 2, 1, 0, 0), 22.0),
(datetime.datetime(2022, 3, 1, 0, 0), 24.0),
(datetime.datetime(2022, 4, 1, 0, 0), 26.0),
(datetime.datetime(2022, 6, 1, 0, 0), 28.0),
(datetime.datetime(2022, 7, 1, 0, 0), 30.0)], frequency='M')
```
Even if you need to perform calculations on data with different frequencies, PyFacts will let you easily handle this with the expand and shrink methods.
Example:
```
>>> data = [
... ("2022-01-01", 10),
... ("2022-02-01", 12),
... ("2022-03-01", 14),
... ("2022-04-01", 16),
... ("2022-05-01", 18),
... ("2022-06-01", 20)
...]
>>> ts = pft.TimeSeries(data, 'M')
>>> ts.expand(to_frequency='W', method='ffill')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 10.0),
(datetime.datetime(2022, 1, 8, 0, 0), 10.0),
(datetime.datetime(2022, 1, 15, 0, 0), 10.0)
...
(datetime.datetime(2022, 5, 14, 0, 0), 18.0),
(datetime.datetime(2022, 5, 21, 0, 0), 18.0),
(datetime.datetime(2022, 5, 28, 0, 0), 18.0)], frequency='W')
>>> ts.shrink(to_frequency='Q', method='ffill')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 10.0),
(datetime.datetime(2022, 4, 1, 0, 0), 16.0)], frequency='Q')
```
If you want to shorten the timeframe of the data with an aggregation function, the transform method will help you out. Currently it supports sum and mean.
Example:
```
>>> data = [
... ("2022-01-01", 10),
... ("2022-02-01", 12),
... ("2022-03-01", 14),
... ("2022-04-01", 16),
... ("2022-05-01", 18),
... ("2022-06-01", 20),
... ("2022-07-01", 22),
... ("2022-08-01", 24),
... ("2022-09-01", 26),
... ("2022-10-01", 28),
... ("2022-11-01", 30),
... ("2022-12-01", 32)
...]
>>> ts = pft.TimeSeries(data, 'M')
>>> ts.transform(to_frequency='Q', method='sum')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 36.0),
(datetime.datetime(2022, 4, 1, 0, 0), 54.0),
(datetime.datetime(2022, 7, 1, 0, 0), 72.0),
(datetime.datetime(2022, 10, 1, 0, 0), 90.0)], frequency='Q')
>>> ts.transform(to_frequency='Q', method='mean')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 12.0),
(datetime.datetime(2022, 4, 1, 0, 0), 18.0),
(datetime.datetime(2022, 7, 1, 0, 0), 24.0),
(datetime.datetime(2022, 10, 1, 0, 0), 30.0)], frequency='Q')
```
## To-do ## To-do
### Core features ### Core features

View File

@ -180,7 +180,7 @@ class Series(UserList):
if len(self) != len(other): if len(self) != len(other):
raise ValueError("Length of Series must be same for comparison") raise ValueError("Length of Series must be same for comparison")
elif (self.dtype != float and isinstance(other, Number)) or not isinstance(other, self.dtype): elif self.dtype != float and isinstance(other, Number):
raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}") raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}")
return other return other
@ -300,14 +300,16 @@ class Series(UserList):
def _validate_frequency( def _validate_frequency(
data: List[Tuple[datetime.datetime, float]], provided_frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None data: List[Tuple[datetime.datetime, float]],
provided_frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
raise_error: bool = True,
): ):
"""Checks the data and returns the expected frequency.""" """Checks the data and returns the expected frequency."""
if provided_frequency is not None: if provided_frequency is not None:
provided_frequency = getattr(AllFrequencies, provided_frequency) provided_frequency = getattr(AllFrequencies, provided_frequency)
start_date = data[0][0] start_date = data[0][0]
end_date = data[-1][0] end_date = data[-1][0]
overall_gap = (end_date - start_date).days overall_gap = (end_date - start_date).days + 1
num_data_points = len(data) num_data_points = len(data)
# days_per_data = num_data_points / overall_gap # days_per_data = num_data_points / overall_gap
@ -325,7 +327,10 @@ def _validate_frequency(
expected_frequency = frequency expected_frequency = frequency
break break
else: else:
raise ValueError("Data does not match any known frequency. Perhaps you have too many missing data points.") if raise_error:
raise ValueError("Data does not match any known frequency. Perhaps you have too many missing data points.")
else:
expected_frequency = provided_frequency.symbol
expected_data_points = expected_data_points[expected_frequency] expected_data_points = expected_data_points[expected_frequency]
if provided_frequency is None: if provided_frequency is None:
@ -387,7 +392,7 @@ class TimeSeriesCore:
ts_data = _preprocess_timeseries(ts_data, date_format=date_format) ts_data = _preprocess_timeseries(ts_data, date_format=date_format)
validation = _validate_frequency(data=ts_data, provided_frequency=frequency) validation = _validate_frequency(data=ts_data, provided_frequency=frequency, raise_error=validate_frequency)
if frequency is None: if frequency is None:
frequency = validation["expected_frequency"] frequency = validation["expected_frequency"]
@ -508,7 +513,7 @@ class TimeSeriesCore:
"""Helper function to retrieve items using a list""" """Helper function to retrieve items using a list"""
data_to_return = [self._get_item_from_key(key) for key in date_list] data_to_return = [self._get_item_from_key(key) for key in date_list]
return self.__class__(data_to_return, frequency=self.frequency.symbol) return self.__class__(data_to_return, frequency=self.frequency.symbol, validate_frequency=False)
def _get_item_from_series(self, series: Series): def _get_item_from_series(self, series: Series):
"""Helper function to retrieve item using a Series object """Helper function to retrieve item using a Series object

View File

@ -344,8 +344,8 @@ class TimeSeries(TimeSeriesCore):
@date_parser(1, 2) @date_parser(1, 2)
def calculate_rolling_returns( def calculate_rolling_returns(
self, self,
from_date: datetime.date | str, from_date: datetime.date | str = None,
to_date: datetime.date | str, to_date: datetime.date | str = None,
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None, frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
as_on_match: str = "closest", as_on_match: str = "closest",
prior_match: str = "closest", prior_match: str = "closest",
@ -429,6 +429,13 @@ class TimeSeries(TimeSeriesCore):
frequency = getattr(AllFrequencies, frequency) frequency = getattr(AllFrequencies, frequency)
except AttributeError: except AttributeError:
raise ValueError(f"Invalid argument for frequency {frequency}") raise ValueError(f"Invalid argument for frequency {frequency}")
if from_date is None:
from_date = self.start_date + relativedelta(
days=int(_interval_to_years(return_period_unit, return_period_value) * 365 + 1)
)
if to_date is None:
to_date = self.end_date
dates = create_date_series(from_date, to_date, frequency.symbol) dates = create_date_series(from_date, to_date, frequency.symbol)
if frequency == AllFrequencies.D: if frequency == AllFrequencies.D:

View File

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import datetime import datetime
import math
import statistics import statistics
from typing import Literal from typing import Literal
@ -472,13 +473,14 @@ def sortino_ratio(
closest: Literal["previous", "next"] = "previous", closest: Literal["previous", "next"] = "previous",
date_format: str = None, date_format: str = None,
) -> float: ) -> float:
"""Calculate the Sharpe ratio of any time series """Calculate the Sortino ratio of any time series
Sharpe ratio is a measure of returns per unit of risk, Sortino ratio is a variation of the Sharpe ratio,
where risk is measured by the standard deviation of the returns. where risk is measured as standard deviation of negative returns only.
Since deviation on the positive side is not undesirable, hence sortino ratio excludes positive deviations.
The formula for Sharpe ratio is: The formula for Sortino ratio is:
(average asset return - risk free rate)/volatility of asset returns (average asset return - risk free rate)/volatility of negative asset returns
Parameters Parameters
---------- ----------
@ -528,7 +530,7 @@ def sortino_ratio(
Returns Returns
------- -------
Value of Sharpe ratio as a float. Value of Sortino ratio as a float.
Raises Raises
------ ------
@ -559,11 +561,13 @@ def sortino_ratio(
"closest": closest, "closest": closest,
"date_format": date_format, "date_format": date_format,
} }
average_rr_ts = time_series_data.calculate_rolling_returns(**common_params, annual_compounded_returns=True) average_rr_ts = time_series_data.calculate_rolling_returns(**common_params, annual_compounded_returns=False)
average_rr = statistics.mean(average_rr_ts.values) average_rr = statistics.mean(average_rr_ts.values)
annualized_average_rr = (1 + average_rr) ** (365 / interval_days) - 1
excess_returns = average_rr - risk_free_rate excess_returns = annualized_average_rr - risk_free_rate
sd = statistics.stdev([i for i in average_rr_ts.values if i < 0]) sd = statistics.stdev([i for i in average_rr_ts.values if i < 0])
sd *= math.sqrt(365 / interval_days)
sortino_ratio_value = excess_returns / sd sortino_ratio_value = excess_returns / sd
return sortino_ratio_value return sortino_ratio_value

View File

@ -1,6 +1,6 @@
import datetime import datetime
from fincal.core import Series from pyfacts.core import Series
s1 = Series([2.5, 6.2, 5.6, 8.4, 7.4, 1.5, 9.6, 5]) s1 = Series([2.5, 6.2, 5.6, 8.4, 7.4, 1.5, 9.6, 5])
@ -19,7 +19,7 @@ dt_lst = [
datetime.datetime(2020, 6, 19, 0, 0), datetime.datetime(2020, 6, 19, 0, 0),
datetime.datetime(2016, 3, 16, 0, 0), datetime.datetime(2016, 3, 16, 0, 0),
datetime.datetime(2017, 4, 25, 0, 0), datetime.datetime(2017, 4, 25, 0, 0),
datetime.datetime(2016, 7, 10, 0, 0) datetime.datetime(2016, 7, 10, 0, 0),
] ]
s2 = Series(dt_lst) s2 = Series(dt_lst)

View File

@ -62,6 +62,7 @@ def sample_data_generator(
mu: float = 0.1, mu: float = 0.1,
sigma: float = 0.05, sigma: float = 0.05,
eomonth: bool = False, eomonth: bool = False,
dates_as_string: bool = False,
) -> List[tuple]: ) -> List[tuple]:
"""Creates TimeSeries data """Creates TimeSeries data
@ -95,6 +96,8 @@ def sample_data_generator(
} }
end_date = start_date + relativedelta(**timedelta_dict) end_date = start_date + relativedelta(**timedelta_dict)
dates = pft.create_date_series(start_date, end_date, frequency.symbol, skip_weekends=skip_weekends, eomonth=eomonth) dates = pft.create_date_series(start_date, end_date, frequency.symbol, skip_weekends=skip_weekends, eomonth=eomonth)
if dates_as_string:
dates = [dt.strftime("%Y-%m-%d") for dt in dates]
values = create_prices(1000, mu, sigma, num) values = create_prices(1000, mu, sigma, num)
ts = list(zip(dates, values)) ts = list(zip(dates, values))
return ts return ts

View File

@ -1,16 +1,15 @@
import datetime import datetime
import random import random
from typing import Literal, Mapping, Sequence from typing import Mapping
import pyfacts as pft
import pytest import pytest
from pyfacts.core import AllFrequencies, Frequency, Series, TimeSeriesCore
from pyfacts.pyfacts import create_date_series
from pyfacts.utils import PyfactsOptions from pyfacts.utils import PyfactsOptions
class TestFrequency: class TestFrequency:
def test_creation(self): def test_creation(self):
D = Frequency("daily", "days", 1, 1, "D") D = pft.Frequency("daily", "days", 1, 1, "D")
assert D.days == 1 assert D.days == 1
assert D.symbol == "D" assert D.symbol == "D"
assert D.name == "daily" assert D.name == "daily"
@ -18,106 +17,103 @@ class TestFrequency:
assert D.freq_type == "days" assert D.freq_type == "days"
def create_test_data(
frequency: str,
eomonth: bool,
n: int,
gaps: float,
month_position: Literal["start", "middle", "end"],
date_as_str: bool,
as_outer_type: Literal["dict", "list"] = "list",
as_inner_type: Literal["dict", "list", "tuple"] = "tuple",
) -> Sequence[tuple]:
start_dates = {
"start": datetime.datetime(2016, 1, 1),
"middle": datetime.datetime(2016, 1, 15),
"end": datetime.datetime(2016, 1, 31),
}
end_date = datetime.datetime(2021, 12, 31)
dates = create_date_series(start_dates[month_position], end_date, frequency=frequency, eomonth=eomonth)
dates = dates[:n]
if gaps:
num_gaps = int(len(dates) * gaps)
to_remove = random.sample(dates, num_gaps)
for i in to_remove:
dates.remove(i)
if date_as_str:
dates = [i.strftime("%Y-%m-%d") for i in dates]
values = [random.randint(8000, 90000) / 100 for _ in dates]
data = list(zip(dates, values))
if as_outer_type == "list":
if as_inner_type == "list":
data = [list(i) for i in data]
elif as_inner_type == "dict[1]":
data = [dict((i,)) for i in data]
elif as_inner_type == "dict[2]":
data = [dict(date=i, value=j) for i, j in data]
elif as_outer_type == "dict":
data = dict(data)
return data
class TestAllFrequencies: class TestAllFrequencies:
def test_attributes(self): def test_attributes(self):
assert hasattr(AllFrequencies, "D") assert hasattr(pft.AllFrequencies, "D")
assert hasattr(AllFrequencies, "M") assert hasattr(pft.AllFrequencies, "M")
assert hasattr(AllFrequencies, "Q") assert hasattr(pft.AllFrequencies, "Q")
def test_days(self): def test_days(self):
assert AllFrequencies.D.days == 1 assert pft.AllFrequencies.D.days == 1
assert AllFrequencies.M.days == 30 assert pft.AllFrequencies.M.days == 30
assert AllFrequencies.Q.days == 91 assert pft.AllFrequencies.Q.days == 91
def test_symbol(self): def test_symbol(self):
assert AllFrequencies.H.symbol == "H" assert pft.AllFrequencies.H.symbol == "H"
assert AllFrequencies.W.symbol == "W" assert pft.AllFrequencies.W.symbol == "W"
def test_values(self): def test_values(self):
assert AllFrequencies.H.value == 6 assert pft.AllFrequencies.H.value == 6
assert AllFrequencies.Y.value == 1 assert pft.AllFrequencies.Y.value == 1
def test_type(self): def test_type(self):
assert AllFrequencies.Q.freq_type == "months" assert pft.AllFrequencies.Q.freq_type == "months"
assert AllFrequencies.W.freq_type == "days" assert pft.AllFrequencies.W.freq_type == "days"
class TestSeries: class TestSeries:
def test_creation(self): def test_creation(self):
series = Series([1, 2, 3, 4, 5, 6, 7], dtype="number") series = pft.Series([1, 2, 3, 4, 5, 6, 7], dtype="number")
assert series.dtype == float assert series.dtype == float
assert series[2] == 3 assert series[2] == 3
dates = create_date_series("2021-01-01", "2021-01-31", frequency="D") dates = pft.create_date_series("2021-01-01", "2021-01-31", frequency="D")
series = Series(dates, dtype="date") series = pft.Series(dates, dtype="date")
assert series.dtype == datetime.datetime assert series.dtype == datetime.datetime
class TestTimeSeriesCore: class TestTimeSeriesCore:
data = [("2021-01-01", 220), ("2021-02-01", 230), ("2021-03-01", 240)] data = [("2021-01-01", 220), ("2021-02-01", 230), ("2021-03-01", 240)]
def test_repr_str(self): def test_repr_str(self, create_test_data):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert str(ts) in repr(ts).replace("\t", " ") assert str(ts) in repr(ts).replace("\t", " ")
data = create_test_data(frequency="D", eomonth=False, n=50, gaps=0, month_position="start", date_as_str=True) data = create_test_data(frequency=pft.AllFrequencies.D, eomonth=False, num=50, dates_as_string=True)
ts = TimeSeriesCore(data, frequency="D") ts = pft.TimeSeriesCore(data, frequency="D")
assert "..." in str(ts) assert "..." in str(ts)
assert "..." in repr(ts) assert "..." in repr(ts)
def test_creation(self): def test_creation(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert isinstance(ts, TimeSeriesCore) assert isinstance(ts, pft.TimeSeriesCore)
assert isinstance(ts, Mapping) assert isinstance(ts, Mapping)
def test_creation_no_freq(self, create_test_data):
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.D
data = create_test_data(num=300, frequency=pft.AllFrequencies.M)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.M
def test_creation_no_freq_missing_data(self, create_test_data):
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
data = random.sample(data, 182)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.D
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
data = random.sample(data, 175)
with pytest.raises(ValueError):
ts = pft.TimeSeriesCore(data)
data = create_test_data(num=100, frequency=pft.AllFrequencies.W)
data = random.sample(data, 70)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.W
data = create_test_data(num=100, frequency=pft.AllFrequencies.W)
data = random.sample(data, 68)
with pytest.raises(ValueError):
pft.TimeSeriesCore(data)
def test_creation_wrong_freq(self, create_test_data):
data = create_test_data(num=100, frequency=pft.AllFrequencies.W)
with pytest.raises(ValueError):
pft.TimeSeriesCore(data, frequency="D")
data = create_test_data(num=100, frequency=pft.AllFrequencies.D)
with pytest.raises(ValueError):
pft.TimeSeriesCore(data, frequency="W")
class TestSlicing: class TestSlicing:
data = [("2021-01-01", 220), ("2021-02-01", 230), ("2021-03-01", 240)] data = [("2021-01-01", 220), ("2021-02-01", 230), ("2021-03-01", 240)]
def test_getitem(self): def test_getitem(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert ts.dates[0] == datetime.datetime(2021, 1, 1, 0, 0) assert ts.dates[0] == datetime.datetime(2021, 1, 1, 0, 0)
assert ts.values[0] == 220 assert ts.values[0] == 220
assert ts["2021-01-01"][1] == 220 assert ts["2021-01-01"][1] == 220
@ -129,11 +125,11 @@ class TestSlicing:
ts["2021-02-03"] ts["2021-02-03"]
subset_ts = ts[["2021-01-01", "2021-03-01"]] subset_ts = ts[["2021-01-01", "2021-03-01"]]
assert len(subset_ts) == 2 assert len(subset_ts) == 2
assert isinstance(subset_ts, TimeSeriesCore) assert isinstance(subset_ts, pft.TimeSeriesCore)
assert subset_ts.iloc[1][1] == 240 assert subset_ts.iloc[1][1] == 240
def test_get(self): def test_get(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert ts.dates[0] == datetime.datetime(2021, 1, 1, 0, 0) assert ts.dates[0] == datetime.datetime(2021, 1, 1, 0, 0)
assert ts.values[0] == 220 assert ts.values[0] == 220
assert ts.get("2021-01-01")[1] == 220 assert ts.get("2021-01-01")[1] == 220
@ -147,43 +143,63 @@ class TestSlicing:
assert ts.get("2021-02-10")[1] == 240 assert ts.get("2021-02-10")[1] == 240
def test_contains(self): def test_contains(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert datetime.datetime(2021, 1, 1) in ts assert datetime.datetime(2021, 1, 1) in ts
assert "2021-01-01" in ts assert "2021-01-01" in ts
assert "2021-01-14" not in ts assert "2021-01-14" not in ts
def test_items(self): def test_items(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
for i, j in ts.items(): for i, j in ts.items():
assert j == self.data[0][1] assert j == self.data[0][1]
break break
def test_special_keys(self): def test_special_keys(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
dates = ts["dates"] dates = ts["dates"]
values = ts["values"] values = ts["values"]
assert isinstance(dates, Series) assert isinstance(dates, pft.Series)
assert isinstance(values, Series) assert isinstance(values, pft.Series)
assert len(dates) == 3 assert len(dates) == 3
assert len(values) == 3 assert len(values) == 3
assert dates[0] == datetime.datetime(2021, 1, 1, 0, 0) assert dates[0] == datetime.datetime(2021, 1, 1, 0, 0)
assert values[0] == 220 assert values[0] == 220
def test_iloc_slicing(self): def test_iloc_slicing(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert ts.iloc[0] == (datetime.datetime(2021, 1, 1), 220) assert ts.iloc[0] == (datetime.datetime(2021, 1, 1), 220)
assert ts.iloc[-1] == (datetime.datetime(2021, 3, 1), 240) assert ts.iloc[-1] == (datetime.datetime(2021, 3, 1), 240)
ts_slice = ts.iloc[0:2] ts_slice = ts.iloc[0:2]
assert isinstance(ts_slice, TimeSeriesCore) assert isinstance(ts_slice, pft.TimeSeriesCore)
assert len(ts_slice) == 2 assert len(ts_slice) == 2
class TestComparativeSlicing:
def test_date_gt_daily(self, create_test_data):
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
ts = pft.TimeSeries(data, "D")
ts_rr = ts.calculate_rolling_returns(return_period_unit="months")
assert len(ts_rr) == 269
subset = ts_rr[ts_rr.values < 0.1]
assert isinstance(subset, pft.TimeSeriesCore)
assert subset.frequency == pft.AllFrequencies.D
def test_date_gt_monthly(self, create_test_data):
data = create_test_data(num=60, frequency=pft.AllFrequencies.M)
ts = pft.TimeSeries(data, "M")
ts_rr = ts.calculate_rolling_returns(return_period_unit="months")
assert len(ts_rr) == 59
subset = ts_rr[ts_rr.values < 0.1]
assert isinstance(subset, pft.TimeSeriesCore)
assert subset.frequency == pft.AllFrequencies.M
class TestSetitem: class TestSetitem:
data = [("2021-01-01", 220), ("2021-01-04", 230), ("2021-03-07", 240)] data = [("2021-01-01", 220), ("2021-01-04", 230), ("2021-03-07", 240)]
def test_setitem(self): def test_setitem(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert len(ts) == 3 assert len(ts) == 3
ts["2021-01-02"] = 225 ts["2021-01-02"] = 225
@ -195,7 +211,7 @@ class TestSetitem:
assert ts["2021-01-02"][1] == 227.6 assert ts["2021-01-02"][1] == 227.6
def test_errors(self): def test_errors(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
with pytest.raises(TypeError): with pytest.raises(TypeError):
ts["2021-01-03"] = "abc" ts["2021-01-03"] = "abc"
@ -223,25 +239,25 @@ class TestTimeSeriesCoreHeadTail:
] ]
def test_head(self): def test_head(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert len(ts.head()) == 6 assert len(ts.head()) == 6
assert len(ts.head(3)) == 3 assert len(ts.head(3)) == 3
assert isinstance(ts.head(), TimeSeriesCore) assert isinstance(ts.head(), pft.TimeSeriesCore)
head_ts = ts.head(6) head_ts = ts.head(6)
assert head_ts.iloc[-1][1] == 270 assert head_ts.iloc[-1][1] == 270
def test_tail(self): def test_tail(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
assert len(ts.tail()) == 6 assert len(ts.tail()) == 6
assert len(ts.tail(8)) == 8 assert len(ts.tail(8)) == 8
assert isinstance(ts.tail(), TimeSeriesCore) assert isinstance(ts.tail(), pft.TimeSeriesCore)
tail_ts = ts.tail(6) tail_ts = ts.tail(6)
assert tail_ts.iloc[0][1] == 280 assert tail_ts.iloc[0][1] == 280
def test_head_tail(self): def test_head_tail(self):
ts = TimeSeriesCore(self.data, frequency="M") ts = pft.TimeSeriesCore(self.data, frequency="M")
head_tail_ts = ts.head(8).tail(2) head_tail_ts = ts.head(8).tail(2)
assert isinstance(head_tail_ts, TimeSeriesCore) assert isinstance(head_tail_ts, pft.TimeSeriesCore)
assert "2021-07-01" in head_tail_ts assert "2021-07-01" in head_tail_ts
assert head_tail_ts.iloc[1][1] == 290 assert head_tail_ts.iloc[1][1] == 290
@ -255,7 +271,7 @@ class TestDelitem:
] ]
def test_deletion(self): def test_deletion(self):
ts = TimeSeriesCore(self.data, "M") ts = pft.TimeSeriesCore(self.data, "M")
assert len(ts) == 4 assert len(ts) == 4
del ts["2021-03-01"] del ts["2021-03-01"]
assert len(ts) == 3 assert len(ts) == 3
@ -281,42 +297,42 @@ class TestTimeSeriesComparisons:
] ]
def test_number_comparison(self): def test_number_comparison(self):
ts1 = TimeSeriesCore(self.data1, "M") ts1 = pft.TimeSeriesCore(self.data1, "M")
assert isinstance(ts1 > 23, TimeSeriesCore) assert isinstance(ts1 > 23, pft.TimeSeriesCore)
assert (ts1 > 230).values == Series([0.0, 0.0, 1.0, 1.0], "float") assert (ts1 > 230).values == pft.Series([0.0, 0.0, 1.0, 1.0], "float")
assert (ts1 >= 230).values == Series([0.0, 1.0, 1.0, 1.0], "float") assert (ts1 >= 230).values == pft.Series([0.0, 1.0, 1.0, 1.0], "float")
assert (ts1 < 240).values == Series([1.0, 1.0, 0.0, 0.0], "float") assert (ts1 < 240).values == pft.Series([1.0, 1.0, 0.0, 0.0], "float")
assert (ts1 <= 240).values == Series([1.0, 1.0, 1.0, 0.0], "float") assert (ts1 <= 240).values == pft.Series([1.0, 1.0, 1.0, 0.0], "float")
assert (ts1 == 240).values == Series([0.0, 0.0, 1.0, 0.0], "float") assert (ts1 == 240).values == pft.Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != 240).values == Series([1.0, 1.0, 0.0, 1.0], "float") assert (ts1 != 240).values == pft.Series([1.0, 1.0, 0.0, 1.0], "float")
def test_series_comparison(self): def test_series_comparison(self):
ts1 = TimeSeriesCore(self.data1, "M") ts1 = pft.TimeSeriesCore(self.data1, "M")
ser = Series([240, 210, 240, 270], dtype="int") ser = pft.Series([240, 210, 240, 270], dtype="int")
assert (ts1 > ser).values == Series([0.0, 1.0, 0.0, 0.0], "float") assert (ts1 > ser).values == pft.Series([0.0, 1.0, 0.0, 0.0], "float")
assert (ts1 >= ser).values == Series([0.0, 1.0, 1.0, 0.0], "float") assert (ts1 >= ser).values == pft.Series([0.0, 1.0, 1.0, 0.0], "float")
assert (ts1 < ser).values == Series([1.0, 0.0, 0.0, 1.0], "float") assert (ts1 < ser).values == pft.Series([1.0, 0.0, 0.0, 1.0], "float")
assert (ts1 <= ser).values == Series([1.0, 0.0, 1.0, 1.0], "float") assert (ts1 <= ser).values == pft.Series([1.0, 0.0, 1.0, 1.0], "float")
assert (ts1 == ser).values == Series([0.0, 0.0, 1.0, 0.0], "float") assert (ts1 == ser).values == pft.Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != ser).values == Series([1.0, 1.0, 0.0, 1.0], "float") assert (ts1 != ser).values == pft.Series([1.0, 1.0, 0.0, 1.0], "float")
def test_tsc_comparison(self): def test_tsc_comparison(self):
ts1 = TimeSeriesCore(self.data1, "M") ts1 = pft.TimeSeriesCore(self.data1, "M")
ts2 = TimeSeriesCore(self.data2, "M") ts2 = pft.TimeSeriesCore(self.data2, "M")
assert (ts1 > ts2).values == Series([0.0, 1.0, 0.0, 0.0], "float") assert (ts1 > ts2).values == pft.Series([0.0, 1.0, 0.0, 0.0], "float")
assert (ts1 >= ts2).values == Series([0.0, 1.0, 1.0, 0.0], "float") assert (ts1 >= ts2).values == pft.Series([0.0, 1.0, 1.0, 0.0], "float")
assert (ts1 < ts2).values == Series([1.0, 0.0, 0.0, 1.0], "float") assert (ts1 < ts2).values == pft.Series([1.0, 0.0, 0.0, 1.0], "float")
assert (ts1 <= ts2).values == Series([1.0, 0.0, 1.0, 1.0], "float") assert (ts1 <= ts2).values == pft.Series([1.0, 0.0, 1.0, 1.0], "float")
assert (ts1 == ts2).values == Series([0.0, 0.0, 1.0, 0.0], "float") assert (ts1 == ts2).values == pft.Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != ts2).values == Series([1.0, 1.0, 0.0, 1.0], "float") assert (ts1 != ts2).values == pft.Series([1.0, 1.0, 0.0, 1.0], "float")
def test_errors(self): def test_errors(self):
ts1 = TimeSeriesCore(self.data1, "M") ts1 = pft.TimeSeriesCore(self.data1, "M")
ts2 = TimeSeriesCore(self.data2, "M") ts2 = pft.TimeSeriesCore(self.data2, "M")
ser = Series([240, 210, 240], dtype="int") ser = pft.Series([240, 210, 240], dtype="int")
ser2 = Series(["2021-01-01", "2021-02-01", "2021-03-01", "2021-04-01"], dtype="date") ser2 = pft.Series(["2021-01-01", "2021-02-01", "2021-03-01", "2021-04-01"], dtype="date")
del ts2["2021-04-01"] del ts2["2021-04-01"]
@ -345,7 +361,7 @@ class TestTimeSeriesArithmatic:
] ]
def test_add(self): def test_add(self):
ts = TimeSeriesCore(self.data, "M") ts = pft.TimeSeriesCore(self.data, "M")
ser = ts.values ser = ts.values
num_add_ts = ts + 40 num_add_ts = ts + 40
@ -365,8 +381,8 @@ class TestTimeSeriesArithmatic:
assert ts_add_ts["2021-04-01"][1] == 540 assert ts_add_ts["2021-04-01"][1] == 540
def test_sub(self): def test_sub(self):
ts = TimeSeriesCore(self.data, "M") ts = pft.TimeSeriesCore(self.data, "M")
ser = Series([20, 30, 40, 50], "number") ser = pft.Series([20, 30, 40, 50], "number")
num_sub_ts = ts - 40 num_sub_ts = ts - 40
assert num_sub_ts["2021-01-01"][1] == 180 assert num_sub_ts["2021-01-01"][1] == 180
@ -385,8 +401,8 @@ class TestTimeSeriesArithmatic:
assert ts_sub_ts["2021-04-01"][1] == 40 assert ts_sub_ts["2021-04-01"][1] == 40
def test_truediv(self): def test_truediv(self):
ts = TimeSeriesCore(self.data, "M") ts = pft.TimeSeriesCore(self.data, "M")
ser = Series([22, 23, 24, 25], "number") ser = pft.Series([22, 23, 24, 25], "number")
num_div_ts = ts / 10 num_div_ts = ts / 10
assert num_div_ts["2021-01-01"][1] == 22 assert num_div_ts["2021-01-01"][1] == 22
@ -404,8 +420,8 @@ class TestTimeSeriesArithmatic:
assert ts_div_ts["2021-04-01"][1] == 10 assert ts_div_ts["2021-04-01"][1] == 10
def test_floordiv(self): def test_floordiv(self):
ts = TimeSeriesCore(self.data, "M") ts = pft.TimeSeriesCore(self.data, "M")
ser = Series([22, 23, 24, 25], "number") ser = pft.Series([22, 23, 24, 25], "number")
num_div_ts = ts // 11 num_div_ts = ts // 11
assert num_div_ts["2021-02-01"][1] == 20 assert num_div_ts["2021-02-01"][1] == 20

View File

@ -84,6 +84,84 @@ class TestSharpe:
assert round(sharpe_ratio, 4) == 0.3199 assert round(sharpe_ratio, 4) == 0.3199
class TestSortino:
def test_sortino_daily_freq(self, create_test_data):
data = create_test_data(num=1305, frequency=pft.AllFrequencies.D, skip_weekends=True)
ts = pft.TimeSeries(data, "D")
sortino_ratio = pft.sortino_ratio(
ts,
risk_free_rate=0.06,
from_date="2017-02-02",
to_date="2021-12-31",
return_period_unit="months",
return_period_value=1,
)
assert round(sortino_ratio, 4) == 2.5377
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.06,
# from_date="2017-01-09",
# to_date="2021-12-31",
# return_period_unit="days",
# return_period_value=7,
# )
# assert round(sharpe_ratio, 4) == 1.0701
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.06,
# from_date="2018-01-02",
# to_date="2021-12-31",
# return_period_unit="years",
# return_period_value=1,
# )
# assert round(sharpe_ratio, 4) == 1.4374
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.06,
# from_date="2017-07-03",
# to_date="2021-12-31",
# return_period_unit="months",
# return_period_value=6,
# )
# assert round(sharpe_ratio, 4) == 0.8401
# def test_sharpe_weekly_freq(self, create_test_data):
# data = create_test_data(num=261, frequency=pft.AllFrequencies.W, mu=0.6, sigma=0.7)
# ts = pft.TimeSeries(data, "W")
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.052,
# from_date="2017-01-08",
# to_date="2021-12-31",
# return_period_unit="days",
# return_period_value=7,
# )
# assert round(sharpe_ratio, 4) == 0.4533
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.052,
# from_date="2017-02-05",
# to_date="2021-12-31",
# return_period_unit="months",
# return_period_value=1,
# )
# assert round(sharpe_ratio, 4) == 0.4898
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.052,
# from_date="2018-01-01",
# to_date="2021-12-31",
# return_period_unit="months",
# return_period_value=12,
# )
# assert round(sharpe_ratio, 4) == 0.3199
class TestBeta: class TestBeta:
def test_beta_daily_freq(self, create_test_data): def test_beta_daily_freq(self, create_test_data):
market_data = create_test_data(num=3600, frequency=pft.AllFrequencies.D) market_data = create_test_data(num=3600, frequency=pft.AllFrequencies.D)