บทช่วยสอน: สร้างเครื่องมือค้นหาแบบกําหนดเองและระบบการตอบคําถาม
ในบทช่วยสอนนี้ เรียนรู้วิธีการทําดัชนีและคิวรีข้อมูลขนาดใหญ่ที่โหลดจากคลัสเตอร์ Spark คุณตั้งค่าสมุดบันทึก Jupyter ที่ทํางานต่อไปนี้:
- โหลดฟอร์มต่าง ๆ (ใบแจ้งหนี้) ลงในดาต้าเฟรมในเซสชัน Apache Spark
- วิเคราะห์ข้อมูลเพื่อกําหนดคุณลักษณะของฟีเจอร์
- รวมเอาต์พุตที่เป็นผลลัพธ์ไว้ในโครงสร้างข้อมูลแบบตาราง
- เขียนผลลัพธ์ไปยังดัชนีการค้นหาที่โฮสต์ใน Azure Cognitive Search
- สํารวจและคิวรีผ่านเนื้อหาที่คุณสร้างขึ้น
1 - ตั้งค่าการขึ้นต่อกัน
เราเริ่มต้นด้วยการนําเข้าแพคเกจและการเชื่อมต่อกับทรัพยากร Azure ที่ใช้ในเวิร์กโฟลว์นี้
import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"
translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"
search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"
openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"
2 - โหลดข้อมูลลงใน Spark
รหัสนี้จะโหลดไฟล์ภายนอกสองสามไฟล์จากบัญชีที่เก็บข้อมูล Azure ที่ใช้สําหรับวัตถุประสงค์ในการสาธิต ไฟล์เป็นใบแจ้งหนี้ต่าง ๆ และพวกเขากําลังอ่านลงในดาต้าเฟรม
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def blob_to_url(blob):
[prefix, postfix] = blob.split("@")
container = prefix.split("/")[-1]
split_postfix = postfix.split("/")
account = split_postfix[0]
filepath = "/".join(split_postfix[1:])
return "https://{}/{}/{}".format(account, container, filepath)
df2 = (
spark.read.format("binaryFile")
.load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
.select("path")
.limit(10)
.select(udf(blob_to_url, StringType())("path").alias("url"))
.cache()
)
display(df2)
3 - ใช้การจดจําฟอร์ม
รหัสนี้จะโหลดตัวแปลง AnalyzeInvoices และส่งผ่านการอ้างอิงไปยังเฟรมข้อมูลที่มีใบแจ้งหนี้ ซึ่งเรียกใช้แบบจําลองใบแจ้งหนี้ที่สร้างไว้ล่วงหน้าของ Azure Forms Analyzer
from synapse.ml.cognitive import AnalyzeInvoices
analyzed_df = (
AnalyzeInvoices()
.setSubscriptionKey(cognitive_key)
.setLocation(cognitive_location)
.setImageUrlCol("url")
.setOutputCol("invoices")
.setErrorCol("errors")
.setConcurrency(5)
.transform(df2)
.cache()
)
display(analyzed_df)
4 - ลดความซับซ้อนของผลลัพธ์การจดจําฟอร์ม
รหัสนี้ใช้ FormOntologyLearner ซึ่งเป็นตัวแปลงที่วิเคราะห์ผลลัพธ์ของตัวแปลงตัวรู้จําแบบฟอร์ม (สําหรับ Azure AI Document Intelligence) และอนุมานโครงสร้างข้อมูลแบบตาราง ผลลัพธ์ของ AnalyzeInvoices เป็นแบบไดนามิกและแตกต่างกันไปตามคุณลักษณะที่ตรวจพบในเนื้อหาของคุณ
FormOntologyLearner ขยายยูทิลิตี้ของตัวแปลง AnalyzeInvoices โดยการค้นหารูปแบบที่สามารถใช้เพื่อสร้างโครงสร้างข้อมูลแบบตาราง การจัดระเบียบเอาต์พุตลงในหลายคอลัมน์และแถวทําให้การวิเคราะห์ปลายทางง่ายขึ้น
from synapse.ml.cognitive import FormOntologyLearner
organized_df = (
FormOntologyLearner()
.setInputCol("invoices")
.setOutputCol("extracted")
.fit(analyzed_df)
.transform(analyzed_df)
.select("url", "extracted.*")
.cache()
)
display(organized_df)
ด้วยดาต้าเฟรมแบบตารางที่ดีของเรา เราสามารถลดรูปแบบตารางที่ซ้อนกันที่พบในแบบฟอร์มด้วย SparkSQL บางตัว
from pyspark.sql.functions import explode, col
itemized_df = (
organized_df.select("*", explode(col("Items")).alias("Item"))
.drop("Items")
.select("Item.*", "*")
.drop("Item")
)
display(itemized_df)
5 - เพิ่มการแปล
รหัสนี้จะโหลด Translate ซึ่งเป็นตัวแปลงที่เรียกใช้บริการ Azure AI ตัวแปลภาษา ในบริการ Azure AI ข้อความต้นฉบับซึ่งอยู่ในภาษาอังกฤษในคอลัมน์ "Description" จะถูกแปลด้วยเครื่องเป็นภาษาต่างๆ เอาต์พุตทั้งหมดรวมอยู่ในอาร์เรย์ "output.translations"
from synapse.ml.cognitive import Translate
translated_df = (
Translate()
.setSubscriptionKey(translator_key)
.setLocation(translator_location)
.setTextCol("Description")
.setErrorCol("TranslationError")
.setOutputCol("output")
.setToLanguage(["zh-Hans", "fr", "ru", "cy"])
.setConcurrency(5)
.transform(itemized_df)
.withColumn("Translations", col("output.translations")[0])
.drop("output", "TranslationError")
.cache()
)
display(translated_df)
6 - แปลผลิตภัณฑ์เป็นอีโมจิด้วย OpenAI 🤯
from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split
emoji_template = """
Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
Two Ducks: 🦆🦆,
Light Bulb: 💡,
Three Peaches: 🍑🍑🍑,
Two kitchen stoves: ♨️♨️,
A red car: 🚗,
A person and a cat: 🧍🐈,
A {Description}: """
prompter = (
OpenAIPrompt()
.setSubscriptionKey(openai_key)
.setDeploymentName(openai_deployment_name)
.setUrl(openai_url)
.setMaxTokens(5)
.setPromptTemplate(emoji_template)
.setErrorCol("error")
.setOutputCol("Emoji")
)
emoji_df = (
prompter.transform(translated_df)
.withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(emoji_df.select("Description", "Emoji"))
7 - อนุมานที่อยู่ผู้จัดจําหน่ายที่มี OpenAI
continent_template = """
Which continent does the following address belong to?
Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica.
Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.
Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""
continent_df = (
prompter.setOutputCol("Continent")
.setPromptTemplate(continent_template)
.transform(emoji_df)
.withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
.drop("error", "prompt")
.cache()
)
display(continent_df.select("VendorAddress", "Continent"))
8 - สร้างดัชนีการค้นหา Azure สําหรับฟอร์ม
from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit
(
continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
.withColumn("SearchAction", lit("upload"))
.writeToAzureSearch(
subscriptionKey=search_key,
actionCol="SearchAction",
serviceName=search_service,
indexName=search_index,
keyCol="DocID",
)
)
9 - ลองใช้คิวรีการค้นหา
import requests
search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
search_service, search_index
)
requests.post(
search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()
10 - สร้างแชทบอทที่สามารถใช้ Azure Search เป็นเครื่องมือ 🧠🔧
import json
import openai
openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"
chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:
{continent_df.columns}
If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""
def search_query_prompt(question):
return f"""
Given the search engine above, what would you search for to answer the following question?
Question: "{question}"
Please output a json in the form of {{"query": "example_query"}}
"""
def search_result_prompt(query):
search_results = requests.post(
search_url, json={"search": query}, headers={"api-key": search_key}
).json()
return f"""
You previously ran a search for "{query}" which returned the following results:
{search_results}
You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem.
"""
def prompt_gpt(messages):
response = openai.ChatCompletion.create(
engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
)
return response["choices"][0]["message"]["content"]
def custom_chatbot(question):
while True:
try:
query = json.loads(
prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "user", "content": search_query_prompt(question)},
]
)
)["query"]
return prompt_gpt(
[
{"role": "system", "content": chat_context_prompt},
{"role": "system", "content": search_result_prompt(query)},
{"role": "user", "content": question},
]
)
except Exception as e:
raise e
11 - ถามคําถามเกี่ยวกับแชทบอทของเรา
custom_chatbot("What did Luke Diaz buy?")
12 - ตรวจสอบอีกครั้งอย่างรวดเร็ว
display(
continent_df.where(col("CustomerName") == "Luke Diaz")
.select("Description")
.distinct()
)