diff --git a/README.md b/README.md index ae655aa..887c8af 100644 --- a/README.md +++ b/README.md @@ -21,14 +21,14 @@ Fincal aims to simplify things by allowing you to: ### Core features - [ ] Add __setitem__ - [ ] Create emtpy TimeSeries object -- [ ] Read from CSV +- [x] Read from CSV - [ ] Write to CSV - [ ] Convert to dict - [ ] Convert to list of dicts ### Fincal features - [ ] Sync two TimeSeries -- [ ] Average rolling return +- [x] Average rolling return - [ ] Sharpe ratio - [ ] Jensen's Alpha - [ ] Beta -- [ ] Max drawdown \ No newline at end of file +- [x] Max drawdown \ No newline at end of file diff --git a/fincal/fincal.py b/fincal/fincal.py index f57dbc6..fafdc28 100644 --- a/fincal/fincal.py +++ b/fincal/fincal.py @@ -1,9 +1,11 @@ from __future__ import annotations +import csv import datetime import math +import pathlib import statistics -from typing import Iterable, List, Literal, Mapping, TypedDict, Union +from typing import Iterable, List, Literal, Mapping, Tuple, TypedDict, Union from dateutil.relativedelta import relativedelta @@ -581,6 +583,65 @@ class TimeSeries(TimeSeriesCore): return output_ts +def _preprocess_csv(file_path: str | pathlib.Path, delimiter: str = ",", encoding: str = "utf-8") -> List[list]: + """Preprocess csv data""" + + if isinstance(file_path, str): + file_path = pathlib.Path(file_path) + + if not file_path.exists(): + raise ValueError("File not found. Check the file path") + + with open(file_path, "r", encoding=encoding) as file: + reader = csv.reader(file, delimiter=delimiter) + csv_data = list(reader) + + csv_data = [i for i in csv_data if i] # remove blank rows + if not csv_data: + raise ValueError("File is empty") + + return csv_data + + +def read_csv( + csv_file_path: str | pathlib.Path, + frequency: Literal["D", "W", "M", "Q", "Y"], + date_format: str = None, + col_names: Tuple[str, str] = None, + col_index: Tuple[int, int] = (0, 1), + has_header: bool = True, + skip_rows: int = 0, + nrows: int = -1, + delimiter: str = ",", + encoding: str = "utf-8", +) -> TimeSeriesCore: + """Reads Time Series data directly from a CSV file""" + + data = _preprocess_csv(csv_file_path, delimiter, encoding) + + read_start_row = skip_rows + read_end_row = skip_rows + nrows if nrows >= 0 else None + + if has_header: + header = data[read_start_row] + print(header) + # fmt: off + # Black and pylance disagree on the foratting of the following line, hence formatting is disabled + data = data[(read_start_row + 1):read_end_row] + # fmt: on + + if col_names is not None: + date_col = header.index(col_names[0]) + value_col = header.index(col_names[1]) + else: + date_col = col_index[0] + value_col = col_index[1] + + ts_data = [(i[date_col], i[value_col]) for i in data if i] + + return TimeSeries(ts_data, frequency=frequency, date_format=date_format) + + if __name__ == "__main__": date_series = [ datetime.datetime(2020, 1, 11),