How to use "Extract" Objects?

Overview

Extract objects allow you to bring data into your repository from external sources registered as data sources in ARPIA. This guide covers step-by-step instructions for each Extract & Load tool.

Note: Before using Extract objects, ensure you have a registered data source and repository access configured. See Managing Data Sources for setup instructions.


AP DataPipe Engine - MySQL

The AP DataPipe Engine - MySQL provides a GUI-based interface for extracting data from MySQL-compatible databases (including MariaDB, Amazon Aurora MySQL, and other compatible systems).

AP DataPipe Engine - MySQL Interface

Step 1: Select Source and Destination Tables

  1. Open your Reasoning Flows project.
  2. Create a new object and select AP DataPipe Engine - MySQL.
  3. In the Source section, select the registered data source and the table you want to extract from.
  4. In the Destination section, select the repository table where data will be loaded.

Source and Destination Selection

Step 2: Create Destination Table (If Needed)

If no destination table exists, you can create one directly:

  1. Click the Create Table in Repository button.
  2. Review the SQL statement that will create the table in your destination repository.
  3. Important: You must define a primary key to create the table.
  4. Modify column names, types, or constraints as needed.
  5. Click Create Table.

Create Table Dialog

The new table will appear in your destination repository options.

Step 3: Configure Field Mapping

Once source and destination tables are selected:

  1. Click Get Fields to automatically map source fields to destination fields.
  2. Review the mapping and adjust as needed:
    • Remove fields you don't want to extract
    • Rename destination fields if required
    • Verify data type compatibility

Field Mapping Interface

Step 4: Configure DataPipe Settings

The DataPipe Settings control how the extraction behaves:

DataPipe Settings

SettingDescription
Load ModeChoose between TRUNCATE & INSERT (replace all data) or UPSERT (update existing, insert new)
Batch SizeNumber of records processed per batch (affects performance)
Filter ConditionOptional WHERE clause to extract a subset of data
Pre-Execution SQLSQL commands to run before extraction begins
Post-Execution SQLSQL commands to run after extraction completes

Step 5: Save and Execute

  1. Click Save to store your configuration.
  2. To run manually, click Execute or Run.
  3. To schedule automated runs, configure the schedule in the project settings.
  4. Monitor execution status in the Execution Log.

AP DataPipe Engine - File

The AP DataPipe Engine - File provides a GUI-based interface for extracting data from file sources such as CSV, JSON, and other structured formats.

Step 1: Select Source File and Destination Table

  1. Open your Reasoning Flows project.
  2. Create a new object and select AP DataPipe Engine - File.
  3. In the Source section, select or upload the file to extract.
  4. Configure file parsing options (delimiter, encoding, header row, etc.).
  5. In the Destination section, select or create the destination repository table.

Step 2: Configure Field Mapping

  1. Click Get Fields to detect columns from the source file.
  2. Map source columns to destination table fields.
  3. Configure data type conversions if needed.

Step 3: Configure DataPipe Settings

Configure load mode, batch size, and any pre/post-execution SQL as needed (similar to MySQL configuration above).

Step 4: Save and Execute

  1. Click Save to store your configuration.
  2. Click Execute to run the extraction.
  3. Monitor status in the Execution Log.

Python 3.12 DataPipe Engine

The Python 3.12 DataPipe Engine combines the structure of the AP DataPipe with the flexibility of custom Python code, allowing you to add transformation logic during the extract and load process.

Step 1: Create the DataPipe Object

  1. Open your Reasoning Flows project.
  2. Create a new object and select Python 3.12 DataPipe Engine.
  3. Configure source and destination connections in the settings panel.

Step 2: Write Extraction Logic

Use the code editor to write your extraction and transformation logic:

# Example: Extract, transform, and load with custom logic
import pandas as pd
from arpia import datasource, repository

# Connect to source
source_conn = datasource.connect('my_mysql_source')
df = source_conn.query("SELECT * FROM customers WHERE active = 1")

# Transform data
df['full_name'] = df['first_name'] + ' ' + df['last_name']
df['created_date'] = pd.to_datetime(df['created_date'])

# Load to repository
repository.write('clean_customers', df, mode='upsert', key='customer_id')

Step 3: Configure Settings and Dependencies

  1. Add any required Python packages in the Dependencies section.
  2. Configure execution parameters (timeout, memory allocation).
  3. Set up any required environment variables or credentials.

Step 4: Test and Execute

  1. Use the Test function to validate your code with a sample dataset.
  2. Review output and logs for errors.
  3. Click Execute for full extraction.
  4. Schedule as needed for recurring runs.

Scheduling Extractions

All Extract & Load objects can be scheduled for automated execution:

  1. Open the object's Settings or Schedule tab.
  2. Configure the schedule using:
    • Frequency: Daily, weekly, monthly, or custom cron expression
    • Time: Specific execution time
    • Timezone: Ensure correct timing for your region
  3. Enable the schedule and save.

Scheduled jobs appear in the project's execution calendar and can be monitored from the dashboard.


Troubleshooting

Common Issues

IssuePossible CauseSolution
Connection failedInvalid credentials or network issueVerify data source configuration and credentials
Table creation failedMissing primary keyEnsure at least one column is designated as primary key
Field mapping errorsData type mismatchCheck source and destination data types; add type conversions if needed
Extraction timeoutLarge dataset or slow sourceIncrease timeout setting; add filters to reduce data volume; optimize source query
Duplicate key errorsUPSERT mode with incorrect keyVerify the primary key column is correctly configured
Character encoding issuesSource file uses non-UTF8 encodingSpecify correct encoding in file parsing options

Viewing Execution Logs

  1. Navigate to the object's Execution Log or History tab.
  2. Select the specific execution to view details.
  3. Review timestamps, record counts, and any error messages.
  4. For Python DataPipe, check the console output for debugging information.

Next Steps