import datetime import os import time from typing import Union import numpy as np import psycopg2 from dotenv import load_dotenv from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, InlineQueryHandler, CallbackQueryHandler from telegram import InlineQueryResultArticle, ParseMode, InputTextMessageContent from telegram import InlineKeyboardButton, InlineKeyboardMarkup from telegram.utils.helpers import escape_markdown load_dotenv() def connect_db(): """Connects to the Postgres Db""" pgcon = psycopg2.connect(dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PWD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT')) return pgcon def slugify(message: str) -> str: """This function ensures that messages are properly escaped as per Telegram's specs.""" message = message.replace("(", "\\(")\ .replace(")", "\\)")\ .replace(".", "\\.")\ .replace("-", "\\-") return message def fund_search(search_string: str) -> list: """Searches for a fund in the Postgres Db Returns a list of matches along with its latest NAV, category, and sub-category """ if len(search_string) < 3: return [] connection = connect_db() fund_name = search_string.replace(" ", ":*&").replace("&-", " & !") fund_name = fund_name.replace('cap', ' cap').replace('fund', '').replace(' ',' ') fund_name = f"{fund_name}:*" # enables partial match in tsquery sql_query = """select lnav.*, fm.category, fm.sub_category from latest_nav lnav join fund_master fm on lnav.amfi_code = fm.amfi_code where lnav.fts_doc @@ to_tsquery(%s) order by lnav.fund_name limit 10 """ cur = connection.cursor() start = time.time() cur.execute(sql_query, (fund_name,)) results = list(cur.fetchall()) cur.close() print(f'search took {time.time() - start} seconds') return results def mf_query(update, context) -> None: """Handles inline search query from the MF bot Creates a messaged containing the name of the fund along with its latest NAV. Also adds two keys to the message, one for returns and one for SIP returns. The callback data for the buttons contains a notation letter followed by the AMFI code of the fund. """ query = update.inline_query.query matched_funds = fund_search(query) results = [] for fund in matched_funds: keyboard = [ [ InlineKeyboardButton("Returns", callback_data=f'r{fund[0]}'), InlineKeyboardButton("SIP Returns", callback_data=f's{fund[0]}') ] ] reply_markup = InlineKeyboardMarkup(keyboard) message = slugify(f"*{fund[1]}*\n*"\ f"Category:* {fund[7]}\n*"\ f"Sub-category:* {fund[8]}\n*"\ f"Date:* {str(fund[2])}\n*"\ f"NAV:* {str(fund[3])}") line = InlineQueryResultArticle(id=fund[0], title=fund[1], input_message_content=InputTextMessageContent(message, parse_mode=ParseMode.MARKDOWN_V2), reply_markup=reply_markup) results.append(line) update.inline_query.answer(results) def welcome(update, context): """Start message for the bot""" msg = r'Welcome to India MF Bot\.\n'\ r'To get started, type @india\_mf\_bot in the message box and search for any fund\.'\ r"You will get a list of funds\. When you make your choice, you'll get buttons to get more info on the fund\." update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN_V2) def button(update, context): """This function handles the response to the buttons in the main message.""" query = update.callback_query data = query.data amfi_code = int(data[1:]) connection = connect_db() cur = connection.cursor() cur.execute("select fund_name, category, sub_category from fund_master where amfi_code = %s", (amfi_code,)) result = cur.fetchall() fund_name = slugify(result[0][0]) if data[0] == 'b': # Handles back button cur = connection.cursor() cur.execute("select date, nav from latest_nav where amfi_code = %s", (amfi_code,)) nav_result = cur.fetchall() msg = slugify(f'*Category:* {result[0][1]}\n'\ f'*Sub-category:* {result[0][2]}\n'\ f'*Date*: {str(nav_result[0][0])}\n'\ f'*NAV*: {str(nav_result[0][1])}') returns = '' keyboard = [ [ InlineKeyboardButton("Returns", callback_data=f'r{amfi_code}'), InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}') ] ] elif data[0] == 'r': # Handles returns msg = 'Returns:' returns = slugify(return_calc(amfi_code)) keyboard = [ [ InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}"), InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}') ] ] else: msg = 'SIP Returns:' # Handles SIP returns returns = slugify(sip_returns(amfi_code)) keyboard = [ [ InlineKeyboardButton("Returns", callback_data=f'r{amfi_code}'), InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}") ] ] reply_markup = InlineKeyboardMarkup(keyboard) # CallbackQueries need to be answered, even if no notification to the user is needed # Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery query.answer() query.edit_message_text(text=f"*{fund_name}*\n{msg}\n{str(returns)}", reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2) def return_calc(amfi_code: int, return_type: str='m', return_string: bool=True) -> Union[list, str]: """Give returns numbers for a mutual fund Params: ------- amfi_code: amfi_code of the fund for which returns need to be calculated return_type: short term, medium term, or long term return Use return type s for 1-3-6 months, m for 1-3-5 years, and l for 5-7-10 years return_string: Whether to return the returns as Telegram compatible message string Returns: -------- If return_string is true, then returns a Telegram compatible string If return_string is false, then returns a list of dicts with returns """ period_map = { "s": [1, 3, 6], "m": [12, 36, 60], "l": [60, 84, 120] } returns_query = """ select dates, %(amfi_code)s as amfi_code, ffill_nav from ( select dates, amfi_code, first_value(nav) over (partition by grp_close order by dates) as ffill_nav from ( select dates, amfi_code, nav, sum(case when nav is not null then 1 end) over (order by dates) as grp_close from ( SELECT generate_series(current_date - '1 month'::interval - '%(max_period)s month'::interval, current_date, interval '1 day')::date ) d(dates) left join nav_history nh on d.dates = nh.date and nh.amfi_code = %(amfi_code)s ) t )td where dates in (current_date - '%(max_period)s month'::interval - '1 day':: interval, current_date - '%(med_period)s month'::interval - '1 day':: interval, current_date - '%(min_period)s month'::interval - '1 day':: interval, current_date - '1 day':: interval ) order by dates desc """ start_time = time.time() connection = connect_db() cursor = connection.cursor() params = { 'amfi_code':amfi_code, 'min_period': period_map[return_type][0], 'med_period': period_map[return_type][1], 'max_period': period_map[return_type][2] } cursor.execute(returns_query, params) result = cursor.fetchall() #print(result) returns = [] for i, j in enumerate(result): if i > 0: years = (result[0][0] - j[0]).days/365 ret = (result[0][2]/j[2])**(1/years) - 1 returns.append((years, ret)) else: continue if return_string: format_returns = [] for i in returns: format_returns.append((str(int(i[0]))+'-year', str(round(i[1]*100,2))+'%')) print(f"It took {time.time() - start_time} to calculate returns") return '\n'.join([f'{i[0]}: {i[1]}' for i in format_returns]) return returns def xirr_np(dates: list, amounts: list, guess: float=0.05, step: float=0.05) -> float: """Calculates XIRR from a series of cashflows. Requires NumPy and datetime libraries Params: dates: A list of dates on which cashflows occur amounts: The amount of cashflows corresponding to each date guess: A guess for XIRR. This is used as the starting XIRR for testing. The closer the guess, the faster will be the output step: Starting value at which the guess will be increased/decreased in each iteration Returns: Returns the XIRR as a float value """ years = np.array(dates - dates[0], dtype='timedelta64[D]')/np.timedelta64(365, 'D') epsilon = 0.1 limit = 100 residual = 1 #test dex = np.sum(amounts/((1.05+guess)**years)) < np.sum(amounts/((1+guess)**years)) mul = 1 if dex else -1 # Calculate XIRR for _ in range(limit): prev_residual = residual residual = np.sum(amounts/((1+guess)**years)) if abs(residual) > epsilon: if residual * prev_residual < 0: step /= 2 guess = guess + step * mul * (-1 if residual < 0 else 1) else: return guess return "XIRR not calculated" def sip_returns(amfi_code: int) -> str: """Calculates the SIP returns for a fund Queries the Db and directly gets a list of relevant NAVs only. It also incorporates the unit calculation in the query itself, with an amount of Rs. 10,000 Do note that the investment amount itself does not matter, any value with give the same output. The result is then sliced for each of the periods and the returns are calculated using the XIRR function. Params: ------- amfi_code: amfi_code of the fund for which SIP returns need to be calculated Returns: -------- Returns a Telegram compatible string of SIP returns """ sip_schedule_query = """ with myvars(xamfi_code, xmonths) as ( values(%s, %s) ) select amfi_code, date, nav, 10000 as amount, round(10000/nav::numeric, 3) as units from( select *, row_number() over (partition by xmonth order by date) as rn, row_number() over (order by date) as rno from ( select *, TO_CHAR(date, 'YYYY-MM') as xMonth from nav_history, myvars where amfi_code = xamfi_code and extract(day from date) >= 25 and date between current_date - (xmonths || ' month')::interval and current_date ) t1 ) t2 where rn = 1 and rno <> 1 union select amfi_code, date, nav, 0 as amount, 0 as units from latest_nav, myvars where amfi_code = xamfi_code order by date """ months = [12, 36, 60, 84, 120] xirrs = [] start = time.time() connection = connect_db() with connection.cursor() as cur: cur.execute(sip_schedule_query, (amfi_code, months[-1]+1)) transactions = cur.fetchall() transactions = np.array(transactions) transactions[:,3] = transactions[:,3].astype(float) transactions[:,4] = transactions[:,4].astype(float) for month in months: df_slice = transactions[-(month+1):,:] sip_value = sum(df_slice[:-1,4])*df_slice[-1, 2] df_slice[-1,3] = sip_value * -1 dates = df_slice[:, 1] amounts = df_slice[:, 3] xirrs.append({'years': month // 12, 'returns': round(xirr_np(dates, amounts), 6)}) str_returns = [] for i in xirrs: xirr_value = f"{i['years']}-year: {round(i['returns']*100,2)}%" str_returns.append(xirr_value) print(f"It took {time.time() - start} seconds to calcluate SIP returns") return '\n'.join(str_returns) def main(): """ Starts the bot and keeps it running """ updater = Updater(token=os.getenv('TELEGRAM_TOKEN'), use_context=True) dispatcher = updater.dispatcher dispatcher.add_handler(InlineQueryHandler(mf_query)) dispatcher.add_handler(CommandHandler('start', welcome)) dispatcher.add_handler(CommandHandler('help', welcome)) dispatcher.add_handler(CallbackQueryHandler(button)) updater.start_polling() updater.idle() if __name__ == "__main__": print("MF Bot is running.") main()