步驟 6 (管線)。 實作數據管線修正

資料管線

請遵循下列步驟來修改您的資料管線,並將其執行至:

  1. 建立新的向量索引。
  2. 使用數據管線的元數據建立 MLflow 執行。

筆記本會 B_quality_iteration/02_evaluate_fixes 參考產生的 MLflow 執行。

有兩種方法可以修改數據管線:

  • 一次 實作單一修正 在此方法中,您會一次設定並執行單一數據管線。 如果您想要嘗試單一內嵌模型並測試單一新的剖析器,則此模式是最佳模式。 Databricks 建議從這裡開始熟悉這些筆記本。
  • 一次 實作多個修正 在這種方法中,也稱為掃掠,您可以平行執行多個具有不同組態的數據管線。 如果您想要跨許多不同的策略進行「掃掠」,例如,評估三個 PDF 剖析器或評估許多不同的區塊大小,則此模式是最佳模式。

如需本節中的範例程序代碼,請參閱 GitHub 存放庫

方法 1:一次實作單一修正

  1. 開啟 B_quality_iteration/data_pipeline_fixes/single_fix/00_config 筆記本
  2. 請遵循下列其中一項中的指示:
  3. 執行管線,方法是:
  4. 將產生的 MLflow Run 名稱新增至 DATA_PIPELINE_FIXES_RUN_NAMES B_quality_iteration/02_evaluate_fixes 筆記本中的變數

注意

數據準備管線會採用 Spark 結構化串流,以累加方式載入和處理檔案。 這需要已在檢查點中追蹤已載入和備妥的檔案,且不會重新處理。 只有新加入的檔案會載入、準備及附加至對應的數據表。

因此,如果您想要 從頭 重新執行整個管線,並重新處理所有檔,您需要刪除檢查點和數據表。 您可以使用 reset_tables_and_checkpoints Notebook 來完成這項作業

方法 2:一次實作多個修正

  1. 開啟 B_quality_iteration/data_pipeline_fixes/multiple_fixes/00_Run_Multiple_Pipelines 筆記本。
  2. 請遵循筆記本中的指示,新增兩個或多個要執行的數據管線組態。
  3. 執行筆記本以執行這些管線。
  4. 將產生的 MLflow 執行名稱新增至 DATA_PIPELINE_FIXES_RUN_NAMES B_quality_iteration/02_evaluate_fixes 筆記本中的變數。

附錄

注意

您可以根據您一次實作單一修正或多個修正,在 single_fixmultiple_fixes 目錄中找到下列參考的筆記本。

組態設定深入探討

下列列出數據管線的各種預先實作組態選項。 或者,您可以實 作自定義剖析器/區塊器

  • vectorsearch_config:指定 向量搜尋 端點(必須啟動並執行),以及要建立的索引名稱。 此外,定義 源數據表與索引之間的同步 處理類型(預設值為 TRIGGERED)。
  • embedding_config:指定要搭配Tokenizer使用的內嵌模型。 如需選項的完整清單,請參閱 supporting_configs/embedding_models 筆記本。 內嵌模型必須部署到執行中的模型服務端點。 根據區塊化策略,Tokenizer 也會在分割期間,以確保區塊不會超過內嵌模型的令牌限制。 這裡會使用Tokenizer來計算文字區塊中的令牌數目,以確保它們不會超過所選內嵌模型的內容長度上限。

下列顯示 HuggingFace 中的 Tokenizer:

    "embedding_tokenizer": {
        "tokenizer_model_name": "BAAI/bge-large-en-v1.5",
        "tokenizer_source": "hugging_face",
    }

下列顯示來自 TikToken 的 Tokenizer:

"embedding_tokenizer": {
        "tokenizer_model_name": "text-embedding-small",
        "tokenizer_source": "tiktoken",
    }
  • pipeline_config:定義來源欄位的檔案剖析器、區塊器和路徑。 剖析器和區塊器分別定義於和 chunker_library 筆記本中parser_library。 這些可以在single_fixmultiple_fixes目錄中找到。 如需選項的完整清單,請參閱 supporting_configs/parser_chunker_strategies 筆記本,該筆記本可在單一和多個修正目錄中再次使用。 不同的剖析器或區塊器可能需要不同的組態參數,其中 <param x> 代表特定區塊器所需的潛在參數。 剖析器也可以使用相同的格式傳遞組態值。
    "chunker": {
        "name": <chunker-name>,
        "config": {
            "<param 1>": "...",
            "<param 2>": "...",
            ...
        }
    }

實作自定義剖析器/區塊器

此專案是結構化的,可協助將自定義剖析器或區塊器新增至數據準備管線。

新增剖析器

假設您想要使用 PyMuPDF 連結庫 來合併新的剖析器,將剖析的文字轉換成 Markdown 格式。 執行下列步驟:

  1. 將下列程式代碼新增至 或 multiple_fix 目錄中的parser_librarysingle_fix筆記本,以安裝必要的相依性:

    # Dependencies for PyMuPdf
    %pip install pymupdf pymupdf4llm
    
  2. parser_library 或 目錄中的single_fix筆記本中,新增剖PyMuPdfMarkdown析器的新區段,並實作剖multiple_fix析函式。 請確定函式的輸出符合 ParserReturnValue 筆記本開頭所定義的類別。 這可確保與Spark UDF的相容性。 tryexcept 區塊可防止 Spark 在 或 multiple_fix 目錄中將剖析器套用為筆記本single_fix中的 02_parse_docs UDF 時,因為個別文件中發生錯誤,導致 Spark 無法完成整個剖析作業。 此筆記本會檢查是否有任何檔剖析失敗、隔離對應的數據列並引發警告。

    import fitz
    import pymupdf4llm
    
    def parse_bytes_pymupdfmarkdown(
        raw_doc_contents_bytes: bytes,
    ) -> ParserReturnValue:
        try:
            pdf_doc = fitz.Document(stream=raw_doc_contents_bytes, filetype="pdf")
            md_text = pymupdf4llm.to_markdown(pdf_doc)
    
            output = {
                "num_pages": str(pdf_doc.page_count),
                "parsed_content": md_text.strip(),
            }
    
            return {
                OUTPUT_FIELD_NAME: output,
                STATUS_FIELD_NAME: "SUCCESS",
            }
        except Exception as e:
            warnings.warn(f"Exception {e} has been thrown during parsing")
            return {
                OUTPUT_FIELD_NAME: {"num_pages": "", "parsed_content": ""},
                STATUS_FIELD_NAME: f"ERROR: {e}",
            }
    
  3. 將新的剖析函式新增至 parser_factorymultiple_fix 目錄中的 parser_librarysingle_fix使其可在筆記本的 00_configpipeline_config設定。

  4. 在筆記本中 02_parse_docs ,剖析器函式會轉換成 Spark Python UDF(針對 Databricks Runtime 14.0 或更新版本優化箭號優化 ),並套用至包含新二進位 PDF 檔案的數據框架。 若要進行測試和開發,請將簡單的測試函式新增至載入test-document.pdf檔案並判斷提示成功剖析parser_library筆記本

    with open("./test_data/test-document.pdf", "rb") as file:
        file_bytes = file.read()
        test_result_pymupdfmarkdown = parse_bytes_pymupdfmarkdown(file_bytes)
    
    assert test_result_pymupdfmarkdown[STATUS_FIELD_NAME] == "SUCCESS"
    

新增區塊器

新增區塊器的程式會遵循上述針對新剖析器所說明的步驟。

  1. 在chunker_library筆記本中新增必要的相依性。
  2. 為區塊器新增區段並實作函式,例如 chunk_parsed_content_newchunkername。 新區塊器函式的輸出必須是符合chunker_library筆記本開頭所定義的類別的 Python 字典ChunkerReturnValue。 函式應該至少接受要區塊化之剖析文字的字串。 如果您的區塊器需要其他參數,您可以將它們新增為函式參數。
  3. 將新的區塊器新增至chunker_factorychunker_library筆記本中定義的函式。 如果您的函式接受其他參數,請使用 functools 的部分 來預先設定它們。 這是必要的,因為 UDF 只接受一個輸入參數,這是我們案例中剖析的文字。 chunker_factory可讓您在pipeline_config設定不同的區塊器方法,並傳回Spark Python UDF(已針對 Databricks Runtime 14.0 和更新版本優化)。
  4. 為新的區塊化函式新增簡單的測試區段。 本節應該將預先定義的文字區塊化為字串。

效能微調

Spark 會利用數據分割來平行處理。 數據會分割成數據列的區塊,而且每個分割區預設都會由單一核心處理。 不過,當 Apache Spark 最初讀取數據時,它可能不會建立針對所需計算優化的分割區,特別是針對執行剖析和區塊化工作的 UDF。 建立足夠小的分割區,以便進行有效率的平行處理,而不需要那麼小,管理分割區的額外負荷就超過了優點,這一點非常重要。

您可以使用 來調整分割 df.repartitions(<number of partitions>)區數目。 套用 UDF 時,請針對背景工作節點上可用的多個核心。 例如,在02_parse_docs筆記本中,您可以包含df_raw_bronze = df_raw_bronze.repartition(2*sc.defaultParallelism)建立與可用背景工作核心數目一樣多的分割區。 一般而言,介於 1 到 3 之間的倍數應該會產生令人滿意的效能。

手動執行管線

或者,您可以逐步執行每個個別的 Notebook:

  1. 使用01_load_files筆記本載入源檔。 這會將每個檔二進位檔儲存為 中定義的青銅數據表 (raw_files_table_name) 中的 destination_tables_config一筆記錄。 檔案會以累加方式載入,只處理自上次執行以來的新檔。
  2. 使用02_parse_docs筆記本剖析檔。 此筆記本會 parser_library 執行筆記本(確保執行此動作作為重新啟動 Python 的第一個數據格),讓不同的剖析器和相關的公用程式可供使用。 然後,它會使用 中的 pipeline_config 指定剖析器,將每個檔剖析成純文本。 例如,擷取與剖析文字一起的原始 PDF 頁數等相關元數據。 成功剖析的文件會儲存在銀表中(parsed_docs_table_name),而任何未剖析的檔會隔離到對應的數據表中。
  3. 使用03_chunk_docs筆記本將剖析的檔區塊化。 類似於剖析,此筆記本會 chunker_library 執行筆記本(同樣地,執行為第一個數據格)。 它會使用 指定的區塊器 pipeline_config,將每個剖析的檔分割成較小的區塊。 每個區塊都會使用 MD5 哈希來指派唯一標識碼,這是與向量搜尋索引同步處理的必要專案。 最後的區塊會載入金表 (chunked_docs_table_name)。
  4. 使用 04_vector_index建立/同步向量搜尋索引。 此筆記本會驗證 中 vectorsearch_config指定向量搜尋端點的整備程度。 如果已設定的索引已經存在,它會起始與 gold 數據表的同步處理;否則,它會建立索引並觸發同步處理。 如果尚未建立向量搜尋端點和索引,這可能需要一些時間。

後續步驟

繼續進行 步驟 7。部署和監視