Added docstrings, made changes for pylint

This commit is contained in:
Gourav Kumar 2022-01-11 21:03:37 +05:30
parent 560d9893d6
commit 40c95b7c6b
2 changed files with 136 additions and 58 deletions

2
.pylintrc Normal file
View File

@ -0,0 +1,2 @@
[FORMAT]
max-line-length=120

View File

@ -1,27 +1,35 @@
import datetime
import numpy as np
import os
import psycopg2
import time
from typing import Union
import numpy as np
import psycopg2
from dotenv import load_dotenv
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, InlineQueryHandler, CallbackQueryHandler
from telegram import InlineQueryResultArticle, ParseMode, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup
from telegram import InlineQueryResultArticle, ParseMode, InputTextMessageContent
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.utils.helpers import escape_markdown
load_dotenv()
def connect_db():
"""Connects to the Postgres Db"""
pgcon = psycopg2.connect(dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PWD'),
password=os.getenv('DB_PWD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT'))
return pgcon
def slugify(message: str) -> str:
"""This function ensures that messages are properly escaped as per Telegram's specs."""
message = message.replace("(", "\\(")\
.replace(")", "\\)")\
.replace(".", "\\.")\
@ -30,15 +38,19 @@ def slugify(message: str) -> str:
def fund_search(search_string: str) -> list:
"""Searches for a fund in the Postgres Db"""
"""Searches for a fund in the Postgres Db
Returns a list of matches along with its latest NAV, category, and sub-category
"""
if len(search_string) < 3:
return []
connection = connect_db()
fund_name = search_string.replace(" ", ":*&").replace("&-", " & !")
fund_name = fund_name.replace('cap', ' cap').replace('fund', '').replace(' ',' ')
fund_name = f"{fund_name}:*" # enables partial match in tsquery
sql_query = """select lnav.*, fm.category, fm.sub_category
sql_query = """select lnav.*, fm.category, fm.sub_category
from latest_nav lnav
join fund_master fm on lnav.amfi_code = fm.amfi_code
where lnav.fts_doc @@ to_tsquery(%s)
@ -54,34 +66,50 @@ def fund_search(search_string: str) -> list:
return results
def mf_query(update, context):
def mf_query(update, context) -> None:
"""Handles inline search query from the MF bot
Creates a messaged containing the name of the fund along with its latest NAV.
Also adds two keys to the message, one for returns and one for SIP returns.
The callback data for the buttons contains a notation letter followed by the AMFI code of the fund.
"""
query = update.inline_query.query
mf_list = fund_search(query)
matched_funds = fund_search(query)
results = []
for i, j in enumerate(mf_list):
for fund in matched_funds:
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'r{j[0]}'),
InlineKeyboardButton("SIP Returns", callback_data=f's{j[0]}')
InlineKeyboardButton("Returns", callback_data=f'r{fund[0]}'),
InlineKeyboardButton("SIP Returns", callback_data=f's{fund[0]}')
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
message = slugify(f'*{j[1]}*\n*Category:* {j[7]}\n*Sub-category:* {j[8]}\n*Date:* {str(j[2])}\n*NAV:* {str(j[3])}')
line = InlineQueryResultArticle(id=j[0], title=j[1],
input_message_content=InputTextMessageContent(message, parse_mode=ParseMode.MARKDOWN_V2),
reply_markup=reply_markup)
message = slugify(f"*{fund[1]}*\n*"\
f"Category:* {fund[7]}\n*"\
f"Sub-category:* {fund[8]}\n*"\
f"Date:* {str(fund[2])}\n*"\
f"NAV:* {str(fund[3])}")
line = InlineQueryResultArticle(id=fund[0], title=fund[1],
input_message_content=InputTextMessageContent(message, parse_mode=ParseMode.MARKDOWN_V2),
reply_markup=reply_markup)
results.append(line)
update.inline_query.answer(results)
def start(update, context):
msg = 'Welcome to India MF Bot\.\nTo get started, type @india\_mf\_bot in the message box and search for any fund\. '\
"You will get a list of funds\. When you make your choice, you'll get inline buttons to get more info on the fund\."
def welcome(update, context):
"""Start message for the bot"""
msg = r'Welcome to India MF Bot\.\n'\
r'To get started, type @india\_mf\_bot in the message box and search for any fund\.'\
r"You will get a list of funds\. When you make your choice, you'll get buttons to get more info on the fund\."
update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN_V2)
def button(update, context):
"""This function handles the response to the buttons in the main message."""
query = update.callback_query
data = query.data
amfi_code = int(data[1:])
@ -90,12 +118,15 @@ def button(update, context):
cur.execute("select fund_name, category, sub_category from fund_master where amfi_code = %s", (amfi_code,))
result = cur.fetchall()
fund_name = slugify(result[0][0])
if data[0] == 'b':
if data[0] == 'b': # Handles back button
cur = connection.cursor()
cur.execute("select date, nav from latest_nav where amfi_code = %s", (amfi_code,))
nav_result = cur.fetchall()
msg = slugify(f'*Category: *{result[0][1]}\n*Sub-category:* {result[0][2]}\n*Date*: {str(nav_result[0][0])}\n*NAV*: {str(nav_result[0][1])}')
msg = slugify(f'*Category:* {result[0][1]}\n'\
f'*Sub-category:* {result[0][2]}\n'\
f'*Date*: {str(nav_result[0][0])}\n'\
f'*NAV*: {str(nav_result[0][1])}')
returns = ''
keyboard = [
[
@ -103,7 +134,7 @@ def button(update, context):
InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}')
]
]
elif data[0] == 'r':
elif data[0] == 'r': # Handles returns
msg = 'Returns:'
returns = slugify(return_calc(amfi_code))
keyboard = [
@ -113,11 +144,11 @@ def button(update, context):
]
]
else:
msg = 'SIP Returns:'
msg = 'SIP Returns:' # Handles SIP returns
returns = slugify(sip_returns(amfi_code))
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'{amfi_code}'),
InlineKeyboardButton("Returns", callback_data=f'r{amfi_code}'),
InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}")
]
]
@ -125,14 +156,32 @@ def button(update, context):
# CallbackQueries need to be answered, even if no notification to the user is needed
# Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
query.answer()
query.edit_message_text(text="*{}*\n{}\n{}".format(fund_name, msg, str(returns)), reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2)
query.edit_message_text(text=f"*{fund_name}*\n{msg}\n{str(returns)}",
reply_markup=reply_markup,
parse_mode=ParseMode.MARKDOWN_V2)
def return_calc(amfi_code: int, return_type: str='m', raw: bool=False) -> str:
def return_calc(amfi_code: int, return_type: str='m', return_string: bool=True) -> Union[list, str]:
"""Give returns numbers for a mutual fund
Use return type s for 1-3-6 months, m for 1-3-5 years, and l for 5-7-10 years
Params:
-------
amfi_code: amfi_code of the fund for which returns need to be calculated
return_type: short term, medium term, or long term return
Use return type s for 1-3-6 months, m for 1-3-5 years, and l for 5-7-10 years
return_string: Whether to return the returns as Telegram compatible message string
Returns:
--------
If return_string is true, then returns a Telegram compatible string
If return_string is false, then returns a list of dicts with returns
"""
period_map = {
"s": [1, 3, 6],
"m": [12, 36, 60],
"l": [60, 84, 120]
}
returns_query = """
select dates, %(amfi_code)s as amfi_code, ffill_nav from (
select dates, amfi_code, first_value(nav) over (partition by grp_close order by dates) as ffill_nav
@ -140,49 +189,56 @@ def return_calc(amfi_code: int, return_type: str='m', raw: bool=False) -> str:
select dates, amfi_code, nav,
sum(case when nav is not null then 1 end) over (order by dates) as grp_close
from (
SELECT generate_series(current_date - '61 month'::interval, current_date, interval '1 day')::date
SELECT generate_series(current_date - '1 month'::interval - '%(max_period)s month'::interval, current_date, interval '1 day')::date
) d(dates)
left join nav_history nh on d.dates = nh.date and nh.amfi_code = %(amfi_code)s
) t
)td
where dates in (current_date - '60 month'::interval - '1 day':: interval,
current_date - '36 month'::interval - '1 day':: interval,
current_date - '12 month'::interval - '1 day':: interval,
where dates in (current_date - '%(max_period)s month'::interval - '1 day':: interval,
current_date - '%(med_period)s month'::interval - '1 day':: interval,
current_date - '%(min_period)s month'::interval - '1 day':: interval,
current_date - '1 day':: interval )
order by dates desc
"""
start_time = time.time()
connection = connect_db()
cursor = connection.cursor()
cursor.execute(returns_query, {'amfi_code':amfi_code})
params = {
'amfi_code':amfi_code,
'min_period': period_map[return_type][0],
'med_period': period_map[return_type][1],
'max_period': period_map[return_type][2]
}
cursor.execute(returns_query, params)
result = cursor.fetchall()
#print(result)
returns = []
for i, j in enumerate(result):
if i == 0:
continue
else:
if i > 0:
years = (result[0][0] - j[0]).days/365
ret = (result[0][2]/j[2])**(1/years) - 1
returns.append((years, ret))
if raw:
return returns
format_returns = []
for i in returns:
format_returns.append((str(int(i[0]))+'-year', str(round(i[1]*100,2))+'%'))
print(time.time() - start_time)
return '\n'.join([f'{i[0]}: {i[1]}' for i in format_returns])
else:
continue
if return_string:
format_returns = []
for i in returns:
format_returns.append((str(int(i[0]))+'-year', str(round(i[1]*100,2))+'%'))
print(f"It took {time.time() - start_time} to calculate returns")
return '\n'.join([f'{i[0]}: {i[1]}' for i in format_returns])
return returns
def xirr_np(dates: list, amounts: list, guess: float=0.05, step: float=0.05) -> float:
"""Calculates XIRR from a series of cashflows.
"""Calculates XIRR from a series of cashflows.
Requires NumPy and datetime libraries
Params:
dates: A list of dates on which cashflows occur
amounts: The amount of cashflows corresponding to each date
guess: A guess for XIRR.
guess: A guess for XIRR.
This is used as the starting XIRR for testing. The closer the guess, the faster will be the output
step: Starting value at which the guess will be increased/decreased in each iteration
@ -196,7 +252,7 @@ def xirr_np(dates: list, amounts: list, guess: float=0.05, step: float=0.05) ->
limit = 100
residual = 1
#test
#test
dex = np.sum(amounts/((1.05+guess)**years)) < np.sum(amounts/((1+guess)**years))
mul = 1 if dex else -1
@ -214,6 +270,21 @@ def xirr_np(dates: list, amounts: list, guess: float=0.05, step: float=0.05) ->
def sip_returns(amfi_code: int) -> str:
"""Calculates the SIP returns for a fund
Queries the Db and directly gets a list of relevant NAVs only.
It also incorporates the unit calculation in the query itself, with an amount of Rs. 10,000
Do note that the investment amount itself does not matter, any value with give the same output.
The result is then sliced for each of the periods and the returns are calculated using the XIRR function.
Params:
-------
amfi_code: amfi_code of the fund for which SIP returns need to be calculated
Returns:
--------
Returns a Telegram compatible string of SIP returns
"""
sip_schedule_query = """
with myvars(xamfi_code, xmonths) as (
values(%s, %s)
@ -237,8 +308,10 @@ def sip_returns(amfi_code: int) -> str:
where amfi_code = xamfi_code
order by date
"""
months = [12, 36, 60, 84, 120]
xirrs = []
start = time.time()
connection = connect_db()
with connection.cursor() as cur:
cur.execute(sip_schedule_query, (amfi_code, months[-1]+1))
@ -246,34 +319,37 @@ def sip_returns(amfi_code: int) -> str:
transactions = np.array(transactions)
transactions[:,3] = transactions[:,3].astype(float)
transactions[:,4] = transactions[:,4].astype(float)
for m in months:
df_slice = transactions[-(m+1):,:]
for month in months:
df_slice = transactions[-(month+1):,:]
sip_value = sum(df_slice[:-1,4])*df_slice[-1, 2]
df_slice[-1,3] = sip_value * -1
dates = df_slice[:, 1]
amounts = df_slice[:, 3]
xirrs.append({'years': m // 12, 'returns': round(xirr_np(dates, amounts), 6)})
xirrs.append({'years': month // 12, 'returns': round(xirr_np(dates, amounts), 6)})
str_returns = []
for i in xirrs:
x = f"{i['years']}-year: {round(i['returns']*100,2)}%"
str_returns.append(x)
xirr_value = f"{i['years']}-year: {round(i['returns']*100,2)}%"
str_returns.append(xirr_value)
print(f"It took {time.time() - start} seconds to calcluate SIP returns")
return '\n'.join(str_returns)
def main():
""" Starts the bot and keeps it running """
updater = Updater(token=os.getenv('TELEGRAM_TOKEN'), use_context=True)
dispatcher = updater.dispatcher
dispatcher.add_handler(InlineQueryHandler(mf_query))
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(CommandHandler('help', start))
dispatcher.add_handler(CommandHandler('start', welcome))
dispatcher.add_handler(CommandHandler('help', welcome))
dispatcher.add_handler(CallbackQueryHandler(button))
updater.start_polling()
updater.idle()
if __name__ == "__main__":
print("MF Bot is running.")
main()