telegram-mf-bot/mf_bot2.py
2022-01-11 10:51:32 +05:30

280 lines
11 KiB
Python

import datetime
import numpy as np
import os
import psycopg2
import time
from dotenv import load_dotenv
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, InlineQueryHandler, CallbackQueryHandler
from telegram import InlineQueryResultArticle, ParseMode, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.utils.helpers import escape_markdown
load_dotenv()
def connect_db():
pgcon = psycopg2.connect(dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PWD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT'))
return pgcon
def slugify(message: str) -> str:
message = message.replace("(", "\\(")\
.replace(")", "\\)")\
.replace(".", "\\.")\
.replace("-", "\\-")
return message
def fund_search(search_string: str) -> list:
"""Searches for a fund in the Postgres Db"""
if len(search_string) < 3:
return []
connection = connect_db()
fund_name = search_string.replace(" ", ":*&").replace("&-", " & !")
fund_name = fund_name.replace('cap', ' cap').replace('fund', '').replace(' ',' ')
fund_name = f"{fund_name}:*" # enables partial match in tsquery
sql_query = """select lnav.*, fm.category, fm.sub_category
from latest_nav lnav
join fund_master fm on lnav.amfi_code = fm.amfi_code
where lnav.fts_doc @@ to_tsquery(%s)
order by lnav.fund_name
limit 10
"""
cur = connection.cursor()
start = time.time()
cur.execute(sql_query, (fund_name,))
results = list(cur.fetchall())
cur.close()
print(f'search took {time.time() - start} seconds')
return results
def mf_query(update, context):
query = update.inline_query.query
mf_list = fund_search(query)
results = []
for i, j in enumerate(mf_list):
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'r{j[0]}'),
InlineKeyboardButton("SIP Returns", callback_data=f's{j[0]}')
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
message = slugify(f'*{j[1]}*\n*Category:* {j[7]}\n*Sub-category:* {j[8]}\n*Date:* {str(j[2])}\n*NAV:* {str(j[3])}')
line = InlineQueryResultArticle(id=j[0], title=j[1],
input_message_content=InputTextMessageContent(message, parse_mode=ParseMode.MARKDOWN_V2),
reply_markup=reply_markup)
results.append(line)
update.inline_query.answer(results)
def start(update, context):
msg = 'Welcome to India MF Bot\.\nTo get started, type @india\_mf\_bot in the message box and search for any fund\. '\
"You will get a list of funds\. When you make your choice, you'll get inline buttons to get more info on the fund\."
update.message.reply_text(msg, parse_mode=ParseMode.MARKDOWN_V2)
def button(update, context):
query = update.callback_query
data = query.data
amfi_code = int(data[1:])
connection = connect_db()
cur = connection.cursor()
cur.execute("select fund_name, category, sub_category from fund_master where amfi_code = %s", (amfi_code,))
result = cur.fetchall()
fund_name = slugify(result[0][0])
if data[0] == 'b':
cur = connection.cursor()
cur.execute("select date, nav from latest_nav where amfi_code = %s", (amfi_code,))
nav_result = cur.fetchall()
msg = slugify(f'*Category: *{result[0][1]}\n*Sub-category:* {result[0][2]}\n*Date*: {str(nav_result[0][0])}\n*NAV*: {str(nav_result[0][1])}')
returns = ''
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'r{amfi_code}'),
InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}')
]
]
elif data[0] == 'r':
msg = 'Returns:'
returns = slugify(return_calc(amfi_code))
keyboard = [
[
InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}"),
InlineKeyboardButton("SIP Returns", callback_data=f's{amfi_code}')
]
]
else:
msg = 'SIP Returns:'
returns = slugify(sip_returns(amfi_code))
keyboard = [
[
InlineKeyboardButton("Returns", callback_data=f'{amfi_code}'),
InlineKeyboardButton("<< Back", callback_data=f"b{amfi_code}")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
# CallbackQueries need to be answered, even if no notification to the user is needed
# Some clients may have trouble otherwise. See https://core.telegram.org/bots/api#callbackquery
query.answer()
query.edit_message_text(text="*{}*\n{}\n{}".format(fund_name, msg, str(returns)), reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2)
def return_calc(amfi_code: int, return_type: str='m', raw: bool=False) -> str:
"""Give returns numbers for a mutual fund
Use return type s for 1-3-6 months, m for 1-3-5 years, and l for 5-7-10 years
"""
returns_query = """
select dates, %(amfi_code)s as amfi_code, ffill_nav from (
select dates, amfi_code, first_value(nav) over (partition by grp_close order by dates) as ffill_nav
from (
select dates, amfi_code, nav,
sum(case when nav is not null then 1 end) over (order by dates) as grp_close
from (
SELECT generate_series(current_date - '61 month'::interval, current_date, interval '1 day')::date
) d(dates)
left join nav_history nh on d.dates = nh.date and nh.amfi_code = %(amfi_code)s
) t
)td
where dates in (current_date - '60 month'::interval - '1 day':: interval,
current_date - '36 month'::interval - '1 day':: interval,
current_date - '12 month'::interval - '1 day':: interval,
current_date - '1 day':: interval )
order by dates desc
"""
start_time = time.time()
connection = connect_db()
cursor = connection.cursor()
cursor.execute(returns_query, {'amfi_code':amfi_code})
result = cursor.fetchall()
#print(result)
returns = []
for i, j in enumerate(result):
if i == 0:
continue
else:
years = (result[0][0] - j[0]).days/365
ret = (result[0][2]/j[2])**(1/years) - 1
returns.append((years, ret))
if raw:
return returns
format_returns = []
for i in returns:
format_returns.append((str(int(i[0]))+'-year', str(round(i[1]*100,2))+'%'))
print(time.time() - start_time)
return '\n'.join([f'{i[0]}: {i[1]}' for i in format_returns])
def xirr_np(dates: list, amounts: list, guess: float=0.05, step: float=0.05) -> float:
"""Calculates XIRR from a series of cashflows.
Requires NumPy and datetime libraries
Params:
dates: A list of dates on which cashflows occur
amounts: The amount of cashflows corresponding to each date
guess: A guess for XIRR.
This is used as the starting XIRR for testing. The closer the guess, the faster will be the output
step: Starting value at which the guess will be increased/decreased in each iteration
Returns:
Returns the XIRR as a float value
"""
years = np.array(dates - dates[0], dtype='timedelta64[D]')/np.timedelta64(365, 'D')
epsilon = 0.1
limit = 100
residual = 1
#test
dex = np.sum(amounts/((1.05+guess)**years)) < np.sum(amounts/((1+guess)**years))
mul = 1 if dex else -1
# Calculate XIRR
for _ in range(limit):
prev_residual = residual
residual = np.sum(amounts/((1+guess)**years))
if abs(residual) > epsilon:
if residual * prev_residual < 0:
step /= 2
guess = guess + step * mul * (-1 if residual < 0 else 1)
else:
return guess
return "XIRR not calculated"
def sip_returns(amfi_code: int) -> str:
sip_schedule_query = """
with myvars(xamfi_code, xmonths) as (
values(%s, %s)
)
select amfi_code, date, nav, 10000 as amount, round(10000/nav::numeric, 3) as units
from(
select *, row_number() over (partition by xmonth order by date) as rn,
row_number() over (order by date) as rno
from (
select *, TO_CHAR(date, 'YYYY-MM') as xMonth
from nav_history, myvars
where amfi_code = xamfi_code
and extract(day from date) >= 25
and date between current_date - (xmonths || ' month')::interval and current_date
) t1
) t2
where rn = 1 and rno <> 1
union
select amfi_code, date, nav, 0 as amount, 0 as units
from latest_nav, myvars
where amfi_code = xamfi_code
order by date
"""
months = [12, 36, 60, 84, 120]
xirrs = []
connection = connect_db()
with connection.cursor() as cur:
cur.execute(sip_schedule_query, (amfi_code, months[-1]+1))
transactions = cur.fetchall()
transactions = np.array(transactions)
transactions[:,3] = transactions[:,3].astype(float)
transactions[:,4] = transactions[:,4].astype(float)
for m in months:
df_slice = transactions[-(m+1):,:]
sip_value = sum(df_slice[:-1,4])*df_slice[-1, 2]
df_slice[-1,3] = sip_value * -1
dates = df_slice[:, 1]
amounts = df_slice[:, 3]
xirrs.append({'years': m // 12, 'returns': round(xirr_np(dates, amounts), 6)})
str_returns = []
for i in xirrs:
x = f"{i['years']}-year: {round(i['returns']*100,2)}%"
str_returns.append(x)
return '\n'.join(str_returns)
def main():
updater = Updater(token=os.getenv('TELEGRAM_TOKEN'), use_context=True)
dispatcher = updater.dispatcher
dispatcher.add_handler(InlineQueryHandler(mf_query))
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(CommandHandler('help', start))
dispatcher.add_handler(CallbackQueryHandler(button))
updater.start_polling()
updater.idle()
if __name__ == "__main__":
print("MF Bot is running.")
main()