update: look at todo

main
Thamognya Kodi 2023-01-14 21:20:23 +07:00
parent 5f0faa77a4
commit 5261d734de
2 changed files with 199 additions and 34 deletions

View File

@ -1,4 +1,13 @@
# type: ignore # type: ignore
"""
model naming convention
# Open-AI models:
include prefix openai-*
# HuggingFace
include prefix hf-*
"""
from typing import Any, List, Tuple from typing import Any, List, Tuple
import os import os
@ -31,11 +40,6 @@ def answer(
CHATGPT_CONVERSATION_ID: str = "", CHATGPT_CONVERSATION_ID: str = "",
CHATGPT_PARENT_ID: str = "", CHATGPT_PARENT_ID: str = "",
) -> tuple[Any, list[str]]: ) -> tuple[Any, list[str]]:
# if environment keys are not given, assume it is in env
if GOOGLE_SEARCH_API_KEY == "":
GOOGLE_SEARCH_API_KEY = str(os.environ.get("GOOGLE_SEARCH_API_KEY"))
if GOOGLE_SEARCH_ENGINE_ID == "":
GOOGLE_SEARCH_ENGINE_ID = str(os.environ.get("GOOGLE_SEARCH_ENGINE_ID"))
if OPENAI_API_KEY == "": if OPENAI_API_KEY == "":
OPENAI_API_KEY = str(os.environ.get("OPENAI_API_KEY")) OPENAI_API_KEY = str(os.environ.get("OPENAI_API_KEY"))
openai.api_key = OPENAI_API_KEY openai.api_key = OPENAI_API_KEY
@ -45,25 +49,20 @@ def answer(
CHATGPT_CONVERSATION_ID = str(os.environ.get("CHATGPT_CONVERSATION_ID")) CHATGPT_CONVERSATION_ID = str(os.environ.get("CHATGPT_CONVERSATION_ID"))
if CHATGPT_PARENT_ID == "": if CHATGPT_PARENT_ID == "":
CHATGPT_PARENT_ID = str(os.environ.get("CHATGPT_PARENT_ID")) CHATGPT_PARENT_ID = str(os.environ.get("CHATGPT_PARENT_ID"))
"""
model naming convention
# Open-AI models:
include prefix openai-*
# HuggingFace
include prefix hf-*
#
"""
if not (model.startswith("openai-") or model.startswith("hf-")): if not (model.startswith("openai-") or model.startswith("hf-")):
model = "openai-chatgpt" # Default model = "openai-chatgpt" # Default
if model.startswith("openai-"):
if model == "openai-chatgpt":
# ChatGPT
results: tuple[list[str], list[str]] = internet.Google( results: tuple[list[str], list[str]] = internet.Google(
query, GOOGLE_SEARCH_API_KEY, GOOGLE_SEARCH_ENGINE_ID query, GOOGLE_SEARCH_API_KEY, GOOGLE_SEARCH_ENGINE_ID
).google() ).google()
# print(' '.join(filter(lambda x: isinstance(x, str), results[0]))[:4000])
if model.startswith("openai-"):
if model == "openai-chatgpt":
# ChatGPT
prompt = f"Using the context: {' '.join(filter(lambda x: isinstance(x, str), results[0]))[:3000]} and answer the question with the context above and previous knowledge: \"{query}\". Also write long answers or essays if asked." prompt = f"Using the context: {' '.join(filter(lambda x: isinstance(x, str), results[0]))[:3000]} and answer the question with the context above and previous knowledge: \"{query}\". Also write long answers or essays if asked."
print(prompt) print(prompt)
exit(1)
chatbot = Chatbot( chatbot = Chatbot(
{"session_token": CHATGPT_SESSION_TOKEN}, {"session_token": CHATGPT_SESSION_TOKEN},
conversation_id=None, conversation_id=None,
@ -77,14 +76,11 @@ def answer(
return (response["message"], results[1]) return (response["message"], results[1])
else: else:
if model == "openai-text-davinci-003": if model == "openai-text-davinci-003":
results: tuple[list[str], list[str]] = internet.Google( # text-davinci-003
query, GOOGLE_SEARCH_API_KEY, GOOGLE_SEARCH_ENGINE_ID prompt = f"Using the context: {' '.join(filter(lambda x: isinstance(x, str), results[0]))[:3000]} and answer the question with the context above and previous knowledge: \"{query}\". Also write long answers or essays if asked."
).google()
context = " ".join(results[0])
context[: (4097 - len(query) - 10)]
response = openai.Completion.create( response = openai.Completion.create(
model="text-davinci-003", model="text-davinci-003",
prompt=f"{context} Q: {query}", prompt=prompt,
max_tokens=len(context), max_tokens=len(context),
n=1, n=1,
stop=None, stop=None,
@ -94,9 +90,6 @@ def answer(
# TODO: add suport later # TODO: add suport later
else: else:
model = model.replace("hf-", "", 1) model = model.replace("hf-", "", 1)
results: tuple[list[str], list[str]] = internet.Google(
query, GOOGLE_SEARCH_API_KEY, GOOGLE_SEARCH_ENGINE_ID
).google()
qa_model = pipeline("question-answering", model=model) qa_model = pipeline("question-answering", model=model)
response = qa_model(question=query, context=" ".join(results[0])) response = qa_model(question=query, context=" ".join(results[0]))
return (response["answer"], results[1]) return (response["answer"], results[1])

View File

@ -1,6 +1,142 @@
from typing import Any, List, Tuple # from typing import Any, List, Tuple
# import os
# import sys
# from importlib import reload
# from pathlib import Path
# import dotenv
# import requests
# HTTP_USERAGENT: dict[str, str] = {
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
# }
# sys.path.append(str(Path(__file__).parent.parent.parent.parent) + "/utils/NLP")
# sys.path.append(str(Path(__file__).parent.parent.parent.parent) + "/utils")
# sys.path.append(str(Path(__file__).parent.parent))
# import asyncio
# import itertools
# import re
# import aiohttp
# import config
# from bs4 import BeautifulSoup
# from normalize import normalizer
# # from relevancy import filter_irrelevant
# from sentencize import sentencizer
# from urlextract import URLExtract
# from adremover import AdRemover
# class Google:
# def __init__(
# self: "Google",
# query: str,
# GOOGLE_SEARCH_API_KEY: str,
# GOOGLE_SEARCH_ENGINE_ID: str,
# ) -> None:
# self.__GOOGLE_SEARCH_API_KEY: str = GOOGLE_SEARCH_API_KEY
# self.__GOOGLE_SEARCH_ENGINE_ID: str = GOOGLE_SEARCH_ENGINE_ID
# # if environment keys are not given, assume it is in env
# if GOOGLE_SEARCH_API_KEY == "":
# self.__GOOGLE_SEARCH_API_KEY = str(os.environ.get("GOOGLE_SEARCH_API_KEY"))
# if GOOGLE_SEARCH_ENGINE_ID == "":
# self.__GOOGLE_SEARCH_ENGINE_ID = str(os.environ.get("GOOGLE_SEARCH_ENGINE_ID"))
# self.__num_res: int = 10
# self.__query = query
# self.__URL_EXTRACTOR: URLExtract = URLExtract()
# self.__urls: list[str] = self.__URL_EXTRACTOR.find_urls(query)
# self.__query = str(
# re.sub(
# r"\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*",
# "",
# str(self.__query),
# )
# )
# def __get_urls(self: "Google") -> None:
# if self.__GOOGLE_SEARCH_API_KEY == "":
# exit("ERROR: Google API Key not found")
# if self.__GOOGLE_SEARCH_ENGINE_ID == "":
# exit("ERROR: Google Search Engine Id not found")
# response = requests.get(
# "https://www.googleapis.com/customsearch/v1",
# params={
# "key": self.__GOOGLE_SEARCH_API_KEY,
# "q": self.__query,
# "cx": self.__GOOGLE_SEARCH_ENGINE_ID,
# },
# )
# results = response.json()["items"]
# for result in results:
# self.__urls.append(result["link"])
# if len(self.__urls) == self.__num_res:
# break
# async def __fetch_url(self: "Google", session: Any, url: str) -> list[str]:
# try:
# async with session.get(url, headers=HTTP_USERAGENT) as response:
# html = await response.text()
# soup = BeautifulSoup(html, "html.parser")
# text = soup.get_text()
# normalized_text = normalizer(text)
# sentences: list[str] = sentencizer(normalized_text)
# return sentences
# except aiohttp.ClientConnectorError:
# return [""]
# except Exception:
# return [""]
# async def __fetch_urls(self: "Google", urls: list[str]) -> Any:
# async with aiohttp.ClientSession() as session:
# tasks = [
# asyncio.create_task(self.__fetch_url(session, url)) for url in urls
# ]
# results = await asyncio.gather(*tasks)
# return results
# def __flatten(self: Any, a: list[list[Any]]) -> list[Any]:
# return list(itertools.chain(*a))
# def __get_urls_contents(self: "Google") -> None:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# contents = loop.run_until_complete(self.__fetch_urls(self.__urls))
# loop.close()
# self.__content = self.__flatten(contents)
# def google(self: "Google") -> tuple[list[str], list[str]]:
# self.__get_urls()
# self.__get_urls_contents()
# return (self.__content, self.__urls)
# """
# Timing:
# import time
# start_time = time.time()
# google("Who is Elon Musk")
# print("--- %s seconds ---" % (time.time() - start_time))
# # Results:
# # --- 2.2230100631713867 seconds ---
# # ________________________________________________________
# # Executed in 4.73 secs fish external
# # usr time 3.35 secs 85.00 micros 3.35 secs
# # sys time 1.86 secs 956.00 micros 1.86 secs
# """
from typing import Any, Dict, List, Tuple
import os import os
import pickle
import sys import sys
from importlib import reload from importlib import reload
from pathlib import Path from pathlib import Path
@ -17,18 +153,22 @@ sys.path.append(str(Path(__file__).parent.parent.parent.parent) + "/utils")
sys.path.append(str(Path(__file__).parent.parent)) sys.path.append(str(Path(__file__).parent.parent))
import asyncio import asyncio
import concurrent.futures
import itertools import itertools
import re import re
import aiohttp import aiohttp
import config import config
from adremover import AdRemover
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from keywords import get_keywords
from normalize import normalizer from normalize import normalizer
from relevancy import filter_relevant
# from relevancy import filter_irrelevant
from sentencize import sentencizer from sentencize import sentencizer
from urlextract import URLExtract from urlextract import URLExtract
dotenv.load_dotenv()
class Google: class Google:
def __init__( def __init__(
@ -37,9 +177,18 @@ class Google:
GOOGLE_SEARCH_API_KEY: str, GOOGLE_SEARCH_API_KEY: str,
GOOGLE_SEARCH_ENGINE_ID: str, GOOGLE_SEARCH_ENGINE_ID: str,
) -> None: ) -> None:
self.__GOOGLE_SEARCH_API_KEY: str = GOOGLE_SEARCH_API_KEY # if environment keys are not given, assume it is in env
self.__GOOGLE_SEARCH_ENGINE_ID: str = GOOGLE_SEARCH_ENGINE_ID if GOOGLE_SEARCH_API_KEY == "":
self.__num_res: int = 10 self.__GOOGLE_SEARCH_API_KEY = str(os.environ.get("GOOGLE_SEARCH_API_KEY"))
if GOOGLE_SEARCH_ENGINE_ID == "":
self.__GOOGLE_SEARCH_ENGINE_ID = str(
os.environ.get("GOOGLE_SEARCH_ENGINE_ID")
)
self.__num_res: int = (
5
if config.NLP_CONF_MODE == "speed"
else (20 if config.NLP_CONF_MODE else 10)
)
self.__query = query self.__query = query
self.__URL_EXTRACTOR: URLExtract = URLExtract() self.__URL_EXTRACTOR: URLExtract = URLExtract()
self.__urls: list[str] = self.__URL_EXTRACTOR.find_urls(query) self.__urls: list[str] = self.__URL_EXTRACTOR.find_urls(query)
@ -50,8 +199,15 @@ class Google:
str(self.__query), str(self.__query),
) )
) )
self.__content: list[str] = []
ADBLOCK_RULES = [
"https://easylist-downloads.adblockplus.org/ruadlist+easylist.txt",
"https://filters.adtidy.org/extension/chromium/filters/1.txt",
]
self.__ad_remover = AdRemover(ADBLOCK_RULES)
def __get_urls(self: "Google") -> None: def __get_urls(self: "Google") -> None:
# Send the request to the Google Search API
if self.__GOOGLE_SEARCH_API_KEY == "": if self.__GOOGLE_SEARCH_API_KEY == "":
exit("ERROR: Google API Key not found") exit("ERROR: Google API Key not found")
if self.__GOOGLE_SEARCH_ENGINE_ID == "": if self.__GOOGLE_SEARCH_ENGINE_ID == "":
@ -74,6 +230,7 @@ class Google:
try: try:
async with session.get(url, headers=HTTP_USERAGENT) as response: async with session.get(url, headers=HTTP_USERAGENT) as response:
html = await response.text() html = await response.text()
html = self.__ad_remover.remove_ads(html)
soup = BeautifulSoup(html, "html.parser") soup = BeautifulSoup(html, "html.parser")
text = soup.get_text() text = soup.get_text()
normalized_text = normalizer(text) normalized_text = normalizer(text)
@ -101,11 +258,26 @@ class Google:
contents = loop.run_until_complete(self.__fetch_urls(self.__urls)) contents = loop.run_until_complete(self.__fetch_urls(self.__urls))
loop.close() loop.close()
self.__content = self.__flatten(contents) self.__content = self.__flatten(contents)
self.__content = [str(x) for x in self.__content]
def google(self: "Google") -> tuple[list[str], list[str]]: def __filter_irrelevant_processing(self: "Google") -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=500) as executor:
futures = [executor.submit(filter_relevant, self.__content, self.__query)]
concurrent.futures.wait(futures)
content: list[str] = []
for future in futures:
content.append(future.result())
self.__content = content
def google(
self: "Google", filter_irrelevant: bool = True
) -> tuple[list[str], list[str]]:
self.__get_urls() self.__get_urls()
self.__get_urls_contents() self.__get_urls_contents()
return (self.__content, self.__urls) if filter_irrelevant:
self.__filter_irrelevant_processing()
results: tuple[list[str], list[str]] = (self.__content, self.__urls)
return results
""" """