added read csv function

This commit is contained in:
Gourav Kumar 2022-04-02 07:45:59 +05:30
parent eb63766c1e
commit d88acc5888
2 changed files with 65 additions and 4 deletions

View File

@ -21,14 +21,14 @@ Fincal aims to simplify things by allowing you to:
### Core features
- [ ] Add __setitem__
- [ ] Create emtpy TimeSeries object
- [ ] Read from CSV
- [x] Read from CSV
- [ ] Write to CSV
- [ ] Convert to dict
- [ ] Convert to list of dicts
### Fincal features
- [ ] Sync two TimeSeries
- [ ] Average rolling return
- [x] Average rolling return
- [ ] Sharpe ratio
- [ ] Jensen's Alpha
- [ ] Beta
- [ ] Max drawdown
- [x] Max drawdown

View File

@ -1,9 +1,11 @@
from __future__ import annotations
import csv
import datetime
import math
import pathlib
import statistics
from typing import Iterable, List, Literal, Mapping, TypedDict, Union
from typing import Iterable, List, Literal, Mapping, Tuple, TypedDict, Union
from dateutil.relativedelta import relativedelta
@ -581,6 +583,65 @@ class TimeSeries(TimeSeriesCore):
return output_ts
def _preprocess_csv(file_path: str | pathlib.Path, delimiter: str = ",", encoding: str = "utf-8") -> List[list]:
"""Preprocess csv data"""
if isinstance(file_path, str):
file_path = pathlib.Path(file_path)
if not file_path.exists():
raise ValueError("File not found. Check the file path")
with open(file_path, "r", encoding=encoding) as file:
reader = csv.reader(file, delimiter=delimiter)
csv_data = list(reader)
csv_data = [i for i in csv_data if i] # remove blank rows
if not csv_data:
raise ValueError("File is empty")
return csv_data
def read_csv(
csv_file_path: str | pathlib.Path,
frequency: Literal["D", "W", "M", "Q", "Y"],
date_format: str = None,
col_names: Tuple[str, str] = None,
col_index: Tuple[int, int] = (0, 1),
has_header: bool = True,
skip_rows: int = 0,
nrows: int = -1,
delimiter: str = ",",
encoding: str = "utf-8",
) -> TimeSeriesCore:
"""Reads Time Series data directly from a CSV file"""
data = _preprocess_csv(csv_file_path, delimiter, encoding)
read_start_row = skip_rows
read_end_row = skip_rows + nrows if nrows >= 0 else None
if has_header:
header = data[read_start_row]
print(header)
# fmt: off
# Black and pylance disagree on the foratting of the following line, hence formatting is disabled
data = data[(read_start_row + 1):read_end_row]
# fmt: on
if col_names is not None:
date_col = header.index(col_names[0])
value_col = header.index(col_names[1])
else:
date_col = col_index[0]
value_col = col_index[1]
ts_data = [(i[date_col], i[value_col]) for i in data if i]
return TimeSeries(ts_data, frequency=frequency, date_format=date_format)
if __name__ == "__main__":
date_series = [
datetime.datetime(2020, 1, 11),