Compare commits

...

5 Commits

8 changed files with 411 additions and 141 deletions

161
README.md
View File

@ -29,7 +29,7 @@ Example:
... ('2021-06-01', 20)
...]
>>> ts = fc.TimeSeries(time_series_data)
>>> ts = pft.TimeSeries(time_series_data)
```
### Sample usage
@ -46,12 +46,169 @@ With PyFacts, you never have to go into the hassle of creating datetime objects
```
>>> import pyfacts as pft
>>> fc.PyfactsOptions.date_format = '%d-%m-%Y'
>>> pft.PyfactsOptions.date_format = '%d-%m-%Y'
```
Now the library will automatically parse all dates as DD-MM-YYYY
If you happen to have any one situation where you need to use a different format, all methods accept a date_format parameter to override the default.
### Working with multiple time series
While working with time series data, you will often need to perform calculations on the data. PyFacts supports all kinds of mathematical operations on time series.
Example:
```
>>> import pyfacts as pft
>>> time_series_data = [
... ('2021-01-01', 10),
... ('2021-02-01', 12),
... ('2021-03-01', 14),
... ('2021-04-01', 16),
... ('2021-05-01', 18),
... ('2021-06-01', 20)
...]
>>> ts = pft.TimeSeries(time_series_data)
>>> print(ts/100)
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 0.1),
(datetime.datetime(2022, 1, 2, 0, 0), 0.12),
(datetime.datetime(2022, 1, 3, 0, 0), 0.14),
(datetime.datetime(2022, 1, 4, 0, 0), 0.16),
(datetime.datetime(2022, 1, 6, 0, 0), 0.18),
(datetime.datetime(2022, 1, 7, 0, 0), 0.2)], frequency='M')
```
Mathematical operations can also be done between time series as long as they have the same dates.
Example:
```
>>> import pyfacts as pft
>>> time_series_data = [
... ('2021-01-01', 10),
... ('2021-02-01', 12),
... ('2021-03-01', 14),
... ('2021-04-01', 16),
... ('2021-05-01', 18),
... ('2021-06-01', 20)
...]
>>> ts = pft.TimeSeries(time_series_data)
>>> ts2 = pft.TimeSeries(time_series_data)
>>> print(ts/ts2)
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 1.0),
(datetime.datetime(2022, 1, 2, 0, 0), 1.0),
(datetime.datetime(2022, 1, 3, 0, 0), 1.0),
(datetime.datetime(2022, 1, 4, 0, 0), 1.0),
(datetime.datetime(2022, 1, 6, 0, 0), 1.0),
(datetime.datetime(2022, 1, 7, 0, 0), 1.0)], frequency='M')
```
However, if the dates are not in sync, PyFacts provides convenience methods for syncronising dates.
Example:
```
>>> import pyfacts as pft
>>> data1 = [
... ('2021-01-01', 10),
... ('2021-02-01', 12),
... ('2021-03-01', 14),
... ('2021-04-01', 16),
... ('2021-05-01', 18),
... ('2021-06-01', 20)
...]
>>> data2 = [
... ("2022-15-01", 20),
... ("2022-15-02", 22),
... ("2022-15-03", 24),
... ("2022-15-04", 26),
... ("2022-15-06", 28),
... ("2022-15-07", 30)
...]
>>> ts = pft.TimeSeries(data, frequency='M', date_format='%Y-%d-%m')
>>> ts2 = pft.TimeSeries(data2, frequency='M', date_format='%Y-%d-%m')
>>> ts.sync(ts2, fill_method='bfill') # Sync ts2 with ts1
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 20.0),
(datetime.datetime(2022, 2, 1, 0, 0), 22.0),
(datetime.datetime(2022, 3, 1, 0, 0), 24.0),
(datetime.datetime(2022, 4, 1, 0, 0), 26.0),
(datetime.datetime(2022, 6, 1, 0, 0), 28.0),
(datetime.datetime(2022, 7, 1, 0, 0), 30.0)], frequency='M')
```
Even if you need to perform calculations on data with different frequencies, PyFacts will let you easily handle this with the expand and shrink methods.
Example:
```
>>> data = [
... ("2022-01-01", 10),
... ("2022-02-01", 12),
... ("2022-03-01", 14),
... ("2022-04-01", 16),
... ("2022-05-01", 18),
... ("2022-06-01", 20)
...]
>>> ts = pft.TimeSeries(data, 'M')
>>> ts.expand(to_frequency='W', method='ffill')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 10.0),
(datetime.datetime(2022, 1, 8, 0, 0), 10.0),
(datetime.datetime(2022, 1, 15, 0, 0), 10.0)
...
(datetime.datetime(2022, 5, 14, 0, 0), 18.0),
(datetime.datetime(2022, 5, 21, 0, 0), 18.0),
(datetime.datetime(2022, 5, 28, 0, 0), 18.0)], frequency='W')
>>> ts.shrink(to_frequency='Q', method='ffill')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 10.0),
(datetime.datetime(2022, 4, 1, 0, 0), 16.0)], frequency='Q')
```
If you want to shorten the timeframe of the data with an aggregation function, the transform method will help you out. Currently it supports sum and mean.
Example:
```
>>> data = [
... ("2022-01-01", 10),
... ("2022-02-01", 12),
... ("2022-03-01", 14),
... ("2022-04-01", 16),
... ("2022-05-01", 18),
... ("2022-06-01", 20),
... ("2022-07-01", 22),
... ("2022-08-01", 24),
... ("2022-09-01", 26),
... ("2022-10-01", 28),
... ("2022-11-01", 30),
... ("2022-12-01", 32)
...]
>>> ts = pft.TimeSeries(data, 'M')
>>> ts.transform(to_frequency='Q', method='sum')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 36.0),
(datetime.datetime(2022, 4, 1, 0, 0), 54.0),
(datetime.datetime(2022, 7, 1, 0, 0), 72.0),
(datetime.datetime(2022, 10, 1, 0, 0), 90.0)], frequency='Q')
>>> ts.transform(to_frequency='Q', method='mean')
TimeSeries([(datetime.datetime(2022, 1, 1, 0, 0), 12.0),
(datetime.datetime(2022, 4, 1, 0, 0), 18.0),
(datetime.datetime(2022, 7, 1, 0, 0), 24.0),
(datetime.datetime(2022, 10, 1, 0, 0), 30.0)], frequency='Q')
```
## To-do
### Core features

View File

@ -180,7 +180,7 @@ class Series(UserList):
if len(self) != len(other):
raise ValueError("Length of Series must be same for comparison")
elif (self.dtype != float and isinstance(other, Number)) or not isinstance(other, self.dtype):
elif self.dtype != float and isinstance(other, Number):
raise Exception(f"Cannot compare type {self.dtype.__name__} to {type(other).__name__}")
return other
@ -300,14 +300,16 @@ class Series(UserList):
def _validate_frequency(
data: List[Tuple[datetime.datetime, float]], provided_frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None
data: List[Tuple[datetime.datetime, float]],
provided_frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
raise_error: bool = True,
):
"""Checks the data and returns the expected frequency."""
if provided_frequency is not None:
provided_frequency = getattr(AllFrequencies, provided_frequency)
start_date = data[0][0]
end_date = data[-1][0]
overall_gap = (end_date - start_date).days
overall_gap = (end_date - start_date).days + 1
num_data_points = len(data)
# days_per_data = num_data_points / overall_gap
@ -325,7 +327,10 @@ def _validate_frequency(
expected_frequency = frequency
break
else:
if raise_error:
raise ValueError("Data does not match any known frequency. Perhaps you have too many missing data points.")
else:
expected_frequency = provided_frequency.symbol
expected_data_points = expected_data_points[expected_frequency]
if provided_frequency is None:
@ -387,7 +392,7 @@ class TimeSeriesCore:
ts_data = _preprocess_timeseries(ts_data, date_format=date_format)
validation = _validate_frequency(data=ts_data, provided_frequency=frequency)
validation = _validate_frequency(data=ts_data, provided_frequency=frequency, raise_error=validate_frequency)
if frequency is None:
frequency = validation["expected_frequency"]
@ -508,7 +513,7 @@ class TimeSeriesCore:
"""Helper function to retrieve items using a list"""
data_to_return = [self._get_item_from_key(key) for key in date_list]
return self.__class__(data_to_return, frequency=self.frequency.symbol)
return self.__class__(data_to_return, frequency=self.frequency.symbol, validate_frequency=False)
def _get_item_from_series(self, series: Series):
"""Helper function to retrieve item using a Series object

View File

@ -344,8 +344,8 @@ class TimeSeries(TimeSeriesCore):
@date_parser(1, 2)
def calculate_rolling_returns(
self,
from_date: datetime.date | str,
to_date: datetime.date | str,
from_date: datetime.date | str = None,
to_date: datetime.date | str = None,
frequency: Literal["D", "W", "M", "Q", "H", "Y"] = None,
as_on_match: str = "closest",
prior_match: str = "closest",
@ -429,6 +429,13 @@ class TimeSeries(TimeSeriesCore):
frequency = getattr(AllFrequencies, frequency)
except AttributeError:
raise ValueError(f"Invalid argument for frequency {frequency}")
if from_date is None:
from_date = self.start_date + relativedelta(
days=int(_interval_to_years(return_period_unit, return_period_value) * 365 + 1)
)
if to_date is None:
to_date = self.end_date
dates = create_date_series(from_date, to_date, frequency.symbol)
if frequency == AllFrequencies.D:

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import datetime
import math
import statistics
from typing import Literal
@ -472,13 +473,14 @@ def sortino_ratio(
closest: Literal["previous", "next"] = "previous",
date_format: str = None,
) -> float:
"""Calculate the Sharpe ratio of any time series
"""Calculate the Sortino ratio of any time series
Sharpe ratio is a measure of returns per unit of risk,
where risk is measured by the standard deviation of the returns.
Sortino ratio is a variation of the Sharpe ratio,
where risk is measured as standard deviation of negative returns only.
Since deviation on the positive side is not undesirable, hence sortino ratio excludes positive deviations.
The formula for Sharpe ratio is:
(average asset return - risk free rate)/volatility of asset returns
The formula for Sortino ratio is:
(average asset return - risk free rate)/volatility of negative asset returns
Parameters
----------
@ -528,7 +530,7 @@ def sortino_ratio(
Returns
-------
Value of Sharpe ratio as a float.
Value of Sortino ratio as a float.
Raises
------
@ -559,11 +561,13 @@ def sortino_ratio(
"closest": closest,
"date_format": date_format,
}
average_rr_ts = time_series_data.calculate_rolling_returns(**common_params, annual_compounded_returns=True)
average_rr_ts = time_series_data.calculate_rolling_returns(**common_params, annual_compounded_returns=False)
average_rr = statistics.mean(average_rr_ts.values)
annualized_average_rr = (1 + average_rr) ** (365 / interval_days) - 1
excess_returns = average_rr - risk_free_rate
excess_returns = annualized_average_rr - risk_free_rate
sd = statistics.stdev([i for i in average_rr_ts.values if i < 0])
sd *= math.sqrt(365 / interval_days)
sortino_ratio_value = excess_returns / sd
return sortino_ratio_value

View File

@ -1,6 +1,6 @@
import datetime
from fincal.core import Series
from pyfacts.core import Series
s1 = Series([2.5, 6.2, 5.6, 8.4, 7.4, 1.5, 9.6, 5])
@ -19,7 +19,7 @@ dt_lst = [
datetime.datetime(2020, 6, 19, 0, 0),
datetime.datetime(2016, 3, 16, 0, 0),
datetime.datetime(2017, 4, 25, 0, 0),
datetime.datetime(2016, 7, 10, 0, 0)
datetime.datetime(2016, 7, 10, 0, 0),
]
s2 = Series(dt_lst)

View File

@ -62,6 +62,7 @@ def sample_data_generator(
mu: float = 0.1,
sigma: float = 0.05,
eomonth: bool = False,
dates_as_string: bool = False,
) -> List[tuple]:
"""Creates TimeSeries data
@ -95,6 +96,8 @@ def sample_data_generator(
}
end_date = start_date + relativedelta(**timedelta_dict)
dates = pft.create_date_series(start_date, end_date, frequency.symbol, skip_weekends=skip_weekends, eomonth=eomonth)
if dates_as_string:
dates = [dt.strftime("%Y-%m-%d") for dt in dates]
values = create_prices(1000, mu, sigma, num)
ts = list(zip(dates, values))
return ts

View File

@ -1,16 +1,15 @@
import datetime
import random
from typing import Literal, Mapping, Sequence
from typing import Mapping
import pyfacts as pft
import pytest
from pyfacts.core import AllFrequencies, Frequency, Series, TimeSeriesCore
from pyfacts.pyfacts import create_date_series
from pyfacts.utils import PyfactsOptions
class TestFrequency:
def test_creation(self):
D = Frequency("daily", "days", 1, 1, "D")
D = pft.Frequency("daily", "days", 1, 1, "D")
assert D.days == 1
assert D.symbol == "D"
assert D.name == "daily"
@ -18,106 +17,103 @@ class TestFrequency:
assert D.freq_type == "days"
def create_test_data(
frequency: str,
eomonth: bool,
n: int,
gaps: float,
month_position: Literal["start", "middle", "end"],
date_as_str: bool,
as_outer_type: Literal["dict", "list"] = "list",
as_inner_type: Literal["dict", "list", "tuple"] = "tuple",
) -> Sequence[tuple]:
start_dates = {
"start": datetime.datetime(2016, 1, 1),
"middle": datetime.datetime(2016, 1, 15),
"end": datetime.datetime(2016, 1, 31),
}
end_date = datetime.datetime(2021, 12, 31)
dates = create_date_series(start_dates[month_position], end_date, frequency=frequency, eomonth=eomonth)
dates = dates[:n]
if gaps:
num_gaps = int(len(dates) * gaps)
to_remove = random.sample(dates, num_gaps)
for i in to_remove:
dates.remove(i)
if date_as_str:
dates = [i.strftime("%Y-%m-%d") for i in dates]
values = [random.randint(8000, 90000) / 100 for _ in dates]
data = list(zip(dates, values))
if as_outer_type == "list":
if as_inner_type == "list":
data = [list(i) for i in data]
elif as_inner_type == "dict[1]":
data = [dict((i,)) for i in data]
elif as_inner_type == "dict[2]":
data = [dict(date=i, value=j) for i, j in data]
elif as_outer_type == "dict":
data = dict(data)
return data
class TestAllFrequencies:
def test_attributes(self):
assert hasattr(AllFrequencies, "D")
assert hasattr(AllFrequencies, "M")
assert hasattr(AllFrequencies, "Q")
assert hasattr(pft.AllFrequencies, "D")
assert hasattr(pft.AllFrequencies, "M")
assert hasattr(pft.AllFrequencies, "Q")
def test_days(self):
assert AllFrequencies.D.days == 1
assert AllFrequencies.M.days == 30
assert AllFrequencies.Q.days == 91
assert pft.AllFrequencies.D.days == 1
assert pft.AllFrequencies.M.days == 30
assert pft.AllFrequencies.Q.days == 91
def test_symbol(self):
assert AllFrequencies.H.symbol == "H"
assert AllFrequencies.W.symbol == "W"
assert pft.AllFrequencies.H.symbol == "H"
assert pft.AllFrequencies.W.symbol == "W"
def test_values(self):
assert AllFrequencies.H.value == 6
assert AllFrequencies.Y.value == 1
assert pft.AllFrequencies.H.value == 6
assert pft.AllFrequencies.Y.value == 1
def test_type(self):
assert AllFrequencies.Q.freq_type == "months"
assert AllFrequencies.W.freq_type == "days"
assert pft.AllFrequencies.Q.freq_type == "months"
assert pft.AllFrequencies.W.freq_type == "days"
class TestSeries:
def test_creation(self):
series = Series([1, 2, 3, 4, 5, 6, 7], dtype="number")
series = pft.Series([1, 2, 3, 4, 5, 6, 7], dtype="number")
assert series.dtype == float
assert series[2] == 3
dates = create_date_series("2021-01-01", "2021-01-31", frequency="D")
series = Series(dates, dtype="date")
dates = pft.create_date_series("2021-01-01", "2021-01-31", frequency="D")
series = pft.Series(dates, dtype="date")
assert series.dtype == datetime.datetime
class TestTimeSeriesCore:
data = [("2021-01-01", 220), ("2021-02-01", 230), ("2021-03-01", 240)]
def test_repr_str(self):
ts = TimeSeriesCore(self.data, frequency="M")
def test_repr_str(self, create_test_data):
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert str(ts) in repr(ts).replace("\t", " ")
data = create_test_data(frequency="D", eomonth=False, n=50, gaps=0, month_position="start", date_as_str=True)
ts = TimeSeriesCore(data, frequency="D")
data = create_test_data(frequency=pft.AllFrequencies.D, eomonth=False, num=50, dates_as_string=True)
ts = pft.TimeSeriesCore(data, frequency="D")
assert "..." in str(ts)
assert "..." in repr(ts)
def test_creation(self):
ts = TimeSeriesCore(self.data, frequency="M")
assert isinstance(ts, TimeSeriesCore)
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert isinstance(ts, pft.TimeSeriesCore)
assert isinstance(ts, Mapping)
def test_creation_no_freq(self, create_test_data):
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.D
data = create_test_data(num=300, frequency=pft.AllFrequencies.M)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.M
def test_creation_no_freq_missing_data(self, create_test_data):
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
data = random.sample(data, 182)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.D
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
data = random.sample(data, 175)
with pytest.raises(ValueError):
ts = pft.TimeSeriesCore(data)
data = create_test_data(num=100, frequency=pft.AllFrequencies.W)
data = random.sample(data, 70)
ts = pft.TimeSeriesCore(data)
assert ts.frequency == pft.AllFrequencies.W
data = create_test_data(num=100, frequency=pft.AllFrequencies.W)
data = random.sample(data, 68)
with pytest.raises(ValueError):
pft.TimeSeriesCore(data)
def test_creation_wrong_freq(self, create_test_data):
data = create_test_data(num=100, frequency=pft.AllFrequencies.W)
with pytest.raises(ValueError):
pft.TimeSeriesCore(data, frequency="D")
data = create_test_data(num=100, frequency=pft.AllFrequencies.D)
with pytest.raises(ValueError):
pft.TimeSeriesCore(data, frequency="W")
class TestSlicing:
data = [("2021-01-01", 220), ("2021-02-01", 230), ("2021-03-01", 240)]
def test_getitem(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert ts.dates[0] == datetime.datetime(2021, 1, 1, 0, 0)
assert ts.values[0] == 220
assert ts["2021-01-01"][1] == 220
@ -129,11 +125,11 @@ class TestSlicing:
ts["2021-02-03"]
subset_ts = ts[["2021-01-01", "2021-03-01"]]
assert len(subset_ts) == 2
assert isinstance(subset_ts, TimeSeriesCore)
assert isinstance(subset_ts, pft.TimeSeriesCore)
assert subset_ts.iloc[1][1] == 240
def test_get(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert ts.dates[0] == datetime.datetime(2021, 1, 1, 0, 0)
assert ts.values[0] == 220
assert ts.get("2021-01-01")[1] == 220
@ -147,43 +143,63 @@ class TestSlicing:
assert ts.get("2021-02-10")[1] == 240
def test_contains(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert datetime.datetime(2021, 1, 1) in ts
assert "2021-01-01" in ts
assert "2021-01-14" not in ts
def test_items(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
for i, j in ts.items():
assert j == self.data[0][1]
break
def test_special_keys(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
dates = ts["dates"]
values = ts["values"]
assert isinstance(dates, Series)
assert isinstance(values, Series)
assert isinstance(dates, pft.Series)
assert isinstance(values, pft.Series)
assert len(dates) == 3
assert len(values) == 3
assert dates[0] == datetime.datetime(2021, 1, 1, 0, 0)
assert values[0] == 220
def test_iloc_slicing(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert ts.iloc[0] == (datetime.datetime(2021, 1, 1), 220)
assert ts.iloc[-1] == (datetime.datetime(2021, 3, 1), 240)
ts_slice = ts.iloc[0:2]
assert isinstance(ts_slice, TimeSeriesCore)
assert isinstance(ts_slice, pft.TimeSeriesCore)
assert len(ts_slice) == 2
class TestComparativeSlicing:
def test_date_gt_daily(self, create_test_data):
data = create_test_data(num=300, frequency=pft.AllFrequencies.D)
ts = pft.TimeSeries(data, "D")
ts_rr = ts.calculate_rolling_returns(return_period_unit="months")
assert len(ts_rr) == 269
subset = ts_rr[ts_rr.values < 0.1]
assert isinstance(subset, pft.TimeSeriesCore)
assert subset.frequency == pft.AllFrequencies.D
def test_date_gt_monthly(self, create_test_data):
data = create_test_data(num=60, frequency=pft.AllFrequencies.M)
ts = pft.TimeSeries(data, "M")
ts_rr = ts.calculate_rolling_returns(return_period_unit="months")
assert len(ts_rr) == 59
subset = ts_rr[ts_rr.values < 0.1]
assert isinstance(subset, pft.TimeSeriesCore)
assert subset.frequency == pft.AllFrequencies.M
class TestSetitem:
data = [("2021-01-01", 220), ("2021-01-04", 230), ("2021-03-07", 240)]
def test_setitem(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert len(ts) == 3
ts["2021-01-02"] = 225
@ -195,7 +211,7 @@ class TestSetitem:
assert ts["2021-01-02"][1] == 227.6
def test_errors(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
with pytest.raises(TypeError):
ts["2021-01-03"] = "abc"
@ -223,25 +239,25 @@ class TestTimeSeriesCoreHeadTail:
]
def test_head(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert len(ts.head()) == 6
assert len(ts.head(3)) == 3
assert isinstance(ts.head(), TimeSeriesCore)
assert isinstance(ts.head(), pft.TimeSeriesCore)
head_ts = ts.head(6)
assert head_ts.iloc[-1][1] == 270
def test_tail(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
assert len(ts.tail()) == 6
assert len(ts.tail(8)) == 8
assert isinstance(ts.tail(), TimeSeriesCore)
assert isinstance(ts.tail(), pft.TimeSeriesCore)
tail_ts = ts.tail(6)
assert tail_ts.iloc[0][1] == 280
def test_head_tail(self):
ts = TimeSeriesCore(self.data, frequency="M")
ts = pft.TimeSeriesCore(self.data, frequency="M")
head_tail_ts = ts.head(8).tail(2)
assert isinstance(head_tail_ts, TimeSeriesCore)
assert isinstance(head_tail_ts, pft.TimeSeriesCore)
assert "2021-07-01" in head_tail_ts
assert head_tail_ts.iloc[1][1] == 290
@ -255,7 +271,7 @@ class TestDelitem:
]
def test_deletion(self):
ts = TimeSeriesCore(self.data, "M")
ts = pft.TimeSeriesCore(self.data, "M")
assert len(ts) == 4
del ts["2021-03-01"]
assert len(ts) == 3
@ -281,42 +297,42 @@ class TestTimeSeriesComparisons:
]
def test_number_comparison(self):
ts1 = TimeSeriesCore(self.data1, "M")
assert isinstance(ts1 > 23, TimeSeriesCore)
assert (ts1 > 230).values == Series([0.0, 0.0, 1.0, 1.0], "float")
assert (ts1 >= 230).values == Series([0.0, 1.0, 1.0, 1.0], "float")
assert (ts1 < 240).values == Series([1.0, 1.0, 0.0, 0.0], "float")
assert (ts1 <= 240).values == Series([1.0, 1.0, 1.0, 0.0], "float")
assert (ts1 == 240).values == Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != 240).values == Series([1.0, 1.0, 0.0, 1.0], "float")
ts1 = pft.TimeSeriesCore(self.data1, "M")
assert isinstance(ts1 > 23, pft.TimeSeriesCore)
assert (ts1 > 230).values == pft.Series([0.0, 0.0, 1.0, 1.0], "float")
assert (ts1 >= 230).values == pft.Series([0.0, 1.0, 1.0, 1.0], "float")
assert (ts1 < 240).values == pft.Series([1.0, 1.0, 0.0, 0.0], "float")
assert (ts1 <= 240).values == pft.Series([1.0, 1.0, 1.0, 0.0], "float")
assert (ts1 == 240).values == pft.Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != 240).values == pft.Series([1.0, 1.0, 0.0, 1.0], "float")
def test_series_comparison(self):
ts1 = TimeSeriesCore(self.data1, "M")
ser = Series([240, 210, 240, 270], dtype="int")
ts1 = pft.TimeSeriesCore(self.data1, "M")
ser = pft.Series([240, 210, 240, 270], dtype="int")
assert (ts1 > ser).values == Series([0.0, 1.0, 0.0, 0.0], "float")
assert (ts1 >= ser).values == Series([0.0, 1.0, 1.0, 0.0], "float")
assert (ts1 < ser).values == Series([1.0, 0.0, 0.0, 1.0], "float")
assert (ts1 <= ser).values == Series([1.0, 0.0, 1.0, 1.0], "float")
assert (ts1 == ser).values == Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != ser).values == Series([1.0, 1.0, 0.0, 1.0], "float")
assert (ts1 > ser).values == pft.Series([0.0, 1.0, 0.0, 0.0], "float")
assert (ts1 >= ser).values == pft.Series([0.0, 1.0, 1.0, 0.0], "float")
assert (ts1 < ser).values == pft.Series([1.0, 0.0, 0.0, 1.0], "float")
assert (ts1 <= ser).values == pft.Series([1.0, 0.0, 1.0, 1.0], "float")
assert (ts1 == ser).values == pft.Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != ser).values == pft.Series([1.0, 1.0, 0.0, 1.0], "float")
def test_tsc_comparison(self):
ts1 = TimeSeriesCore(self.data1, "M")
ts2 = TimeSeriesCore(self.data2, "M")
ts1 = pft.TimeSeriesCore(self.data1, "M")
ts2 = pft.TimeSeriesCore(self.data2, "M")
assert (ts1 > ts2).values == Series([0.0, 1.0, 0.0, 0.0], "float")
assert (ts1 >= ts2).values == Series([0.0, 1.0, 1.0, 0.0], "float")
assert (ts1 < ts2).values == Series([1.0, 0.0, 0.0, 1.0], "float")
assert (ts1 <= ts2).values == Series([1.0, 0.0, 1.0, 1.0], "float")
assert (ts1 == ts2).values == Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != ts2).values == Series([1.0, 1.0, 0.0, 1.0], "float")
assert (ts1 > ts2).values == pft.Series([0.0, 1.0, 0.0, 0.0], "float")
assert (ts1 >= ts2).values == pft.Series([0.0, 1.0, 1.0, 0.0], "float")
assert (ts1 < ts2).values == pft.Series([1.0, 0.0, 0.0, 1.0], "float")
assert (ts1 <= ts2).values == pft.Series([1.0, 0.0, 1.0, 1.0], "float")
assert (ts1 == ts2).values == pft.Series([0.0, 0.0, 1.0, 0.0], "float")
assert (ts1 != ts2).values == pft.Series([1.0, 1.0, 0.0, 1.0], "float")
def test_errors(self):
ts1 = TimeSeriesCore(self.data1, "M")
ts2 = TimeSeriesCore(self.data2, "M")
ser = Series([240, 210, 240], dtype="int")
ser2 = Series(["2021-01-01", "2021-02-01", "2021-03-01", "2021-04-01"], dtype="date")
ts1 = pft.TimeSeriesCore(self.data1, "M")
ts2 = pft.TimeSeriesCore(self.data2, "M")
ser = pft.Series([240, 210, 240], dtype="int")
ser2 = pft.Series(["2021-01-01", "2021-02-01", "2021-03-01", "2021-04-01"], dtype="date")
del ts2["2021-04-01"]
@ -345,7 +361,7 @@ class TestTimeSeriesArithmatic:
]
def test_add(self):
ts = TimeSeriesCore(self.data, "M")
ts = pft.TimeSeriesCore(self.data, "M")
ser = ts.values
num_add_ts = ts + 40
@ -365,8 +381,8 @@ class TestTimeSeriesArithmatic:
assert ts_add_ts["2021-04-01"][1] == 540
def test_sub(self):
ts = TimeSeriesCore(self.data, "M")
ser = Series([20, 30, 40, 50], "number")
ts = pft.TimeSeriesCore(self.data, "M")
ser = pft.Series([20, 30, 40, 50], "number")
num_sub_ts = ts - 40
assert num_sub_ts["2021-01-01"][1] == 180
@ -385,8 +401,8 @@ class TestTimeSeriesArithmatic:
assert ts_sub_ts["2021-04-01"][1] == 40
def test_truediv(self):
ts = TimeSeriesCore(self.data, "M")
ser = Series([22, 23, 24, 25], "number")
ts = pft.TimeSeriesCore(self.data, "M")
ser = pft.Series([22, 23, 24, 25], "number")
num_div_ts = ts / 10
assert num_div_ts["2021-01-01"][1] == 22
@ -404,8 +420,8 @@ class TestTimeSeriesArithmatic:
assert ts_div_ts["2021-04-01"][1] == 10
def test_floordiv(self):
ts = TimeSeriesCore(self.data, "M")
ser = Series([22, 23, 24, 25], "number")
ts = pft.TimeSeriesCore(self.data, "M")
ser = pft.Series([22, 23, 24, 25], "number")
num_div_ts = ts // 11
assert num_div_ts["2021-02-01"][1] == 20

View File

@ -84,6 +84,84 @@ class TestSharpe:
assert round(sharpe_ratio, 4) == 0.3199
class TestSortino:
def test_sortino_daily_freq(self, create_test_data):
data = create_test_data(num=1305, frequency=pft.AllFrequencies.D, skip_weekends=True)
ts = pft.TimeSeries(data, "D")
sortino_ratio = pft.sortino_ratio(
ts,
risk_free_rate=0.06,
from_date="2017-02-02",
to_date="2021-12-31",
return_period_unit="months",
return_period_value=1,
)
assert round(sortino_ratio, 4) == 2.5377
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.06,
# from_date="2017-01-09",
# to_date="2021-12-31",
# return_period_unit="days",
# return_period_value=7,
# )
# assert round(sharpe_ratio, 4) == 1.0701
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.06,
# from_date="2018-01-02",
# to_date="2021-12-31",
# return_period_unit="years",
# return_period_value=1,
# )
# assert round(sharpe_ratio, 4) == 1.4374
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.06,
# from_date="2017-07-03",
# to_date="2021-12-31",
# return_period_unit="months",
# return_period_value=6,
# )
# assert round(sharpe_ratio, 4) == 0.8401
# def test_sharpe_weekly_freq(self, create_test_data):
# data = create_test_data(num=261, frequency=pft.AllFrequencies.W, mu=0.6, sigma=0.7)
# ts = pft.TimeSeries(data, "W")
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.052,
# from_date="2017-01-08",
# to_date="2021-12-31",
# return_period_unit="days",
# return_period_value=7,
# )
# assert round(sharpe_ratio, 4) == 0.4533
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.052,
# from_date="2017-02-05",
# to_date="2021-12-31",
# return_period_unit="months",
# return_period_value=1,
# )
# assert round(sharpe_ratio, 4) == 0.4898
# sharpe_ratio = pft.sharpe_ratio(
# ts,
# risk_free_rate=0.052,
# from_date="2018-01-01",
# to_date="2021-12-31",
# return_period_unit="months",
# return_period_value=12,
# )
# assert round(sharpe_ratio, 4) == 0.3199
class TestBeta:
def test_beta_daily_freq(self, create_test_data):
market_data = create_test_data(num=3600, frequency=pft.AllFrequencies.D)