telegram-mf-bot/mf_bot.py

356 lines
14 KiB
Python

import datetime
import os
import time
from typing import Union
import numpy as np
import psycopg2
from dotenv import load_dotenv
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, InlineQueryHandler, CallbackQueryHandler
from telegram import InlineQueryResultArticle, ParseMode, InputTextMessageContent
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.utils.helpers import escape_markdown
load_dotenv()
def connect_db():
"""Connects to the Postgres Db"""
pgcon = psycopg2.connect(dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PWD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT'))
return pgcon
def slugify(message: str) -> str:
"""This function ensures that messages are properly escaped as per Telegram's specs."""
message = message.replace("(", "\\(")\
.replace(")", "\\)")\
.replace(".", "\\.")\
.replace("-", "\\-")
return message
def fund_search(search_string: str) -> list:
"""Searches for a fund in the Postgres Db
Returns a list of matches along with its latest NAV, category, and sub-category
"""
if len(search_string) < 3:
return []
connection = connect_db()
fund_name = search_string.replace(" ", ":*&").replace("&-", " & !")
fund_name = fund_name.replace('cap', ' cap').replace('fund', '').replace(' ',' ')
fund_name = f"{fund_name}:*" # enables partial match in tsquery
sql_query = """select lnav.*, fm.category, fm.sub_category
from latest_nav lnav
join fund_master fm on lnav.amfi_code = fm.amfi_code
where lnav.fts_doc @@ to_tsquery(%s)
order by lnav.fund_name
limit 10
"""
cur = connection.cursor()
start = time.time()
cur.execute(sql_query, (fund_name,))
results = list(cur.fetchall())
cur.close()
print(f'search took {time.time() - start} seconds')
return results
def mf_query(update, context) -> None:
"""Handles inline search query from the MF bot
Creates a messaged containing the name of the fund along with its latest NAV.
Also adds two keys to the message, one for returns and one for SIP returns.
The callback data for the buttons contains a notation letter followed by the AMFI code of the fund.
"""
query = update.inline_query.query
matched_funds = fund_search(query)
results = []
for fund in matched_funds:
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'r{fund[0]}'),
InlineKeyboardButton("SIP Returns", callback_data=f's{fund[0]}')
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
message = slugify(f"*{fund[1]}*\n*"\
f"Category:* {fund[7]}\n*"\
f"Sub-category:* {fund[8]}\n*"\
f"Date:* {str(fund[2])}\n*"\
f"NAV:* {str(fund[3])}")
line = InlineQueryResultArticle(id=fund[0], title=fund[1],
input_message_content=InputTextMessageContent(message, parse_mode=ParseMode.MARKDOWN_V2),
reply_markup=reply_markup)
results.append(line)
update.inline_query.answer(results)
def welcome(update, context):
"""Start message for the bot"""
msg = r'Welcome to India MF Bot\.\n'\
r'To get started, type @india\_mf\_bot in the message box and search for any fund\.'\
r"You will get a list of funds\. When you make your choice, you'll get buttons to get more info on the fund\."
update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN_V2)
def button(update, context):
"""This function handles the response to the buttons in the main message."""
query = update.callback_query
data = query.data
amfi_code = int(data[1:])
connection = connect_db()
cur = connection.cursor()
cur.execute("select fund_name, category, sub_category from fund_master where amfi_code = %s", (amfi_code,))
result = cur.fetchall()
fund_name = slugify(result[0][0])
if data[0] == 'b': # Handles back button
cur = connection.cursor()
cur.execute("select date, nav from latest_nav where amfi_code = %s", (amfi_code,))
nav_result = cur.fetchall()
msg = slugify(f'*Category:* {result[0][1]}\n'\
f'*Sub-category:* {result[0][2]}\n'\
f'*Date*: {str(nav_result[0][0])}\n'\
f'*NAV*: {str(nav_result[0][1])}')
returns = ''
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'r{amfi_code}'),
InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}')
]
]
elif data[0] == 'r': # Handles returns
msg = 'Returns:'
returns = slugify(return_calc(amfi_code))
keyboard = [
[
InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}"),
InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}')
]
]
else:
msg = 'SIP Returns:' # Handles SIP returns
returns = slugify(sip_returns(amfi_code))
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'r{amfi_code}'),
InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
# CallbackQueries need to be answered, even if no notification to the user is needed
# Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
query.answer()
query.edit_message_text(text=f"*{fund_name}*\n{msg}\n{str(returns)}",
reply_markup=reply_markup,
parse_mode=ParseMode.MARKDOWN_V2)
def return_calc(amfi_code: int, return_type: str='m', return_string: bool=True) -> Union[list, str]:
"""Give returns numbers for a mutual fund
Params:
-------
amfi_code: amfi_code of the fund for which returns need to be calculated
return_type: short term, medium term, or long term return
Use return type s for 1-3-6 months, m for 1-3-5 years, and l for 5-7-10 years
return_string: Whether to return the returns as Telegram compatible message string
Returns:
--------
If return_string is true, then returns a Telegram compatible string
If return_string is false, then returns a list of dicts with returns
"""
period_map = {
"s": [1, 3, 6],
"m": [12, 36, 60],
"l": [60, 84, 120]
}
returns_query = """
select dates, %(amfi_code)s as amfi_code, ffill_nav from (
select dates, amfi_code, first_value(nav) over (partition by grp_close order by dates) as ffill_nav
from (
select dates, amfi_code, nav,
sum(case when nav is not null then 1 end) over (order by dates) as grp_close
from (
SELECT generate_series(current_date - '1 month'::interval - '%(max_period)s month'::interval, current_date, interval '1 day')::date
) d(dates)
left join nav_history nh on d.dates = nh.date and nh.amfi_code = %(amfi_code)s
) t
)td
where dates in (current_date - '%(max_period)s month'::interval - '1 day':: interval,
current_date - '%(med_period)s month'::interval - '1 day':: interval,
current_date - '%(min_period)s month'::interval - '1 day':: interval,
current_date - '1 day':: interval )
order by dates desc
"""
start_time = time.time()
connection = connect_db()
cursor = connection.cursor()
params = {
'amfi_code':amfi_code,
'min_period': period_map[return_type][0],
'med_period': period_map[return_type][1],
'max_period': period_map[return_type][2]
}
cursor.execute(returns_query, params)
result = cursor.fetchall()
#print(result)
returns = []
for i, j in enumerate(result):
if i > 0:
years = (result[0][0] - j[0]).days/365
ret = (result[0][2]/j[2])**(1/years) - 1
returns.append((years, ret))
else:
continue
if return_string:
format_returns = []
for i in returns:
format_returns.append((str(int(i[0]))+'-year', str(round(i[1]*100,2))+'%'))
print(f"It took {time.time() - start_time} to calculate returns")
return '\n'.join([f'{i[0]}: {i[1]}' for i in format_returns])
return returns
def xirr_np(dates: list, amounts: list, guess: float=0.05, step: float=0.05) -> float:
"""Calculates XIRR from a series of cashflows.
Requires NumPy and datetime libraries
Params:
dates: A list of dates on which cashflows occur
amounts: The amount of cashflows corresponding to each date
guess: A guess for XIRR.
This is used as the starting XIRR for testing. The closer the guess, the faster will be the output
step: Starting value at which the guess will be increased/decreased in each iteration
Returns:
Returns the XIRR as a float value
"""
years = np.array(dates - dates[0], dtype='timedelta64[D]')/np.timedelta64(365, 'D')
epsilon = 0.1
limit = 100
residual = 1
#test
dex = np.sum(amounts/((1.05+guess)**years)) < np.sum(amounts/((1+guess)**years))
mul = 1 if dex else -1
# Calculate XIRR
for _ in range(limit):
prev_residual = residual
residual = np.sum(amounts/((1+guess)**years))
if abs(residual) > epsilon:
if residual * prev_residual < 0:
step /= 2
guess = guess + step * mul * (-1 if residual < 0 else 1)
else:
return guess
return "XIRR not calculated"
def sip_returns(amfi_code: int) -> str:
"""Calculates the SIP returns for a fund
Queries the Db and directly gets a list of relevant NAVs only.
It also incorporates the unit calculation in the query itself, with an amount of Rs. 10,000
Do note that the investment amount itself does not matter, any value with give the same output.
The result is then sliced for each of the periods and the returns are calculated using the XIRR function.
Params:
-------
amfi_code: amfi_code of the fund for which SIP returns need to be calculated
Returns:
--------
Returns a Telegram compatible string of SIP returns
"""
sip_schedule_query = """
with myvars(xamfi_code, xmonths) as (
values(%s, %s)
)
select amfi_code, date, nav, 10000 as amount, round(10000/nav::numeric, 3) as units
from(
select *, row_number() over (partition by xmonth order by date) as rn,
row_number() over (order by date) as rno
from (
select *, TO_CHAR(date, 'YYYY-MM') as xMonth
from nav_history, myvars
where amfi_code = xamfi_code
and extract(day from date) >= 25
and date between current_date - (xmonths || ' month')::interval and current_date
) t1
) t2
where rn = 1 and rno <> 1
union
select amfi_code, date, nav, 0 as amount, 0 as units
from latest_nav, myvars
where amfi_code = xamfi_code
order by date
"""
months = [12, 36, 60, 84, 120]
xirrs = []
start = time.time()
connection = connect_db()
with connection.cursor() as cur:
cur.execute(sip_schedule_query, (amfi_code, months[-1]+1))
transactions = cur.fetchall()
transactions = np.array(transactions)
transactions[:,3] = transactions[:,3].astype(float)
transactions[:,4] = transactions[:,4].astype(float)
for month in months:
df_slice = transactions[-(month+1):,:]
sip_value = sum(df_slice[:-1,4])*df_slice[-1, 2]
df_slice[-1,3] = sip_value * -1
dates = df_slice[:, 1]
amounts = df_slice[:, 3]
xirrs.append({'years': month // 12, 'returns': round(xirr_np(dates, amounts), 6)})
str_returns = []
for i in xirrs:
xirr_value = f"{i['years']}-year: {round(i['returns']*100,2)}%"
str_returns.append(xirr_value)
print(f"It took {time.time() - start} seconds to calcluate SIP returns")
return '\n'.join(str_returns)
def main():
""" Starts the bot and keeps it running """
updater = Updater(token=os.getenv('TELEGRAM_TOKEN'), use_context=True)
dispatcher = updater.dispatcher
dispatcher.add_handler(InlineQueryHandler(mf_query))
dispatcher.add_handler(CommandHandler('start', welcome))
dispatcher.add_handler(CommandHandler('help', welcome))
dispatcher.add_handler(CallbackQueryHandler(button))
updater.start_polling()
updater.idle()
if __name__ == "__main__":
print("MF Bot is running.")
main()