Knowledge Nodes (Knowledge Ontology Execution Nodes)

Build Intelligent Data Integrations with Enterprise-Grade Governance

Introduction

Welcome to the KOEL (Knowledge Ontology Execution Layer) Reasoning Flow Developer Guide. This documentation will teach you how to build custom data integrations and processing logic inside ARPIA Workshop that power Knowledge Ontology nodes.

The Power of KOEL Reasoning Flows

KOEL Reasoning Flows represent a paradigm shift in enterprise data integration. Instead of building separate APIs, microservices, or ETL pipelines for each data source, you can now create intelligent, schema-driven integrations that automatically benefit from:

  • 🔐 Enterprise Security: Built-in bearer token authentication, ISO 42001 compliance, and complete audit trails
  • 🎯 Dynamic Query Execution: Filter, sort, paginate, and aggregate data on-the-fly without hardcoding logic
  • 🔄 Universal Data Access: Seamlessly integrate MySQL, PostgreSQL, REST APIs, GraphQL, cloud services, and legacy systems
  • 📊 Dual Report Modes: Generate both detailed row-level data (DETAIL) and aggregated analytics (GROUP) from a single codebase
  • 🚀 Zero Infrastructure: Write Python in the IDE, deploy instantly—no servers, containers, or DevOps overhead
  • 📝 Automatic Documentation: Your column schema becomes self-documenting API contracts
  • 🔍 Full Traceability: Every execution is logged with payload, response, and timing for compliance and debugging
  • 🧩 Composable Architecture: Chain multiple reasoning flows, cache results, and build complex data pipelines
  • 🤖 AI-Powered Reasoning: Full integration with ARPIA's AI governance framework for intelligent decision-making and autonomous agents

The result? What traditionally takes weeks of backend development, API design, authentication setup, and deployment pipelines can now be accomplished in hours. You write focused Python logic that solves your specific integration challenge, and KOEL handles everything else—validation, transformation, pagination, security, and governance.

Native Integration with ARPIA AI Platform

Your KOEL nodes are first-class citizens in the ARPIA ecosystem. Once deployed, they automatically become available across the entire platform:

  • 🎨 App Studio: Use your nodes as data sources for visual applications, dashboards, and user interfaces—no additional configuration needed
  • 🔌 MCP (Model Context Protocol): Expose your nodes as MCP tools that AI agents (Claude, GPT-4, etc.) can discover and invoke autonomously
  • 🌐 API Collections: Your nodes are instantly available as REST APIs with auto-generated OpenAPI specs for external integrations
  • 🧠 AI Reasoning Governance: Every node execution participates in ARPIA's AI governance framework with:
    • Reasoning Audit Trails: Track which AI agents accessed what data and why
    • Policy Enforcement: Apply data access policies, rate limits, and compliance rules automatically
    • Context Management: AI agents receive proper context about data schemas, business rules, and constraints
    • Decision Lineage: Trace AI decisions back through data sources and reasoning steps

This means your reasoning flow becomes:

  • A visual component in App Studio (drag-and-drop data widgets)
  • An AI agent tool via MCP (Claude can query your CRM data)
  • A REST API endpoint (external systems can consume)
  • A governed data node (full compliance and audit)

All from a single Python file in Workshop. No duplicate code, no synchronization issues, no deployment complexity.

Real-World Impact

Whether you're connecting to external CRM systems, processing real-time IoT data, aggregating financial reports, or building ML-powered predictions, KOEL Reasoning Flows give you enterprise power with startup speed.

Imagine:

  • An AI assistant that autonomously queries your Salesforce data via MCP
  • A dashboard that visualizes that same data in App Studio
  • An external partner consuming it via REST API
  • All governed by the same security, validation, and audit framework

That's the power of KOEL: write once, deploy everywhere, govern consistently.


What is a Reasoning Flow?

A Reasoning Flow is a Python-based Workshop Project that acts as the "brain" behind a Knowledge Ontology node. When users query a KOEL node through the API, your reasoning flow:

  1. Receives a structured payload with filters, sorting, and pagination parameters
  2. Processes the request by querying databases, calling APIs, or computing results
  3. Returns a properly formatted dataset that matches your defined schema

Think of it as a serverless function that bridges external data sources with ARPIA's Knowledge Ontology system.

Your Role as a Developer

You write Python code in the Workshop IDE (index.py) that:

  • ✅ Reads the incoming request payload
  • ✅ Applies filters, sorting, and pagination to your queries
  • ✅ Integrates with external systems (MySQL, REST APIs, etc.)
  • ✅ Returns data using column names defined in your schema

What This Guide Covers

This comprehensive guide includes:

  • Quick Start: Get running in 5 minutes with a minimal example
  • Column Schema: How to define and use your data structure
  • Payload Structure: Understanding filters, sorting, pagination, and report types
  • Response Format: How to structure your output correctly
  • Report Types: DETAIL (raw rows) vs GROUP (aggregations) with examples
  • Filters & Sorting: Complete guide to handling all filter types and sorting
  • Integration Examples: Real-world code for MySQL, REST APIs, Slack, and more
  • Advanced Patterns: Production-ready patterns for error handling, caching, and performance
  • Troubleshooting: Common errors and how to fix them

Before You Start

Prerequisites:

  • Basic Python knowledge
  • Understanding of SQL (for database integrations)
  • Familiarity with REST APIs (for web service integrations)
  • Access to ARPIA Workshop IDE

Tools Available:

  • Repository database connection via dkconnect.connect()
  • Variable storage/retrieval via ap_var(dbl, 'var_name', value)
  • Custom Python packages via requirements.txt
  • Full Python 3.x standard library

Quick Start

Minimal Example

# index.py
from dkconnect import connect

# Connect to repository database
dbl = connect()

# 1. Read payload
payload = ap_var(dbl, 'payload')

# 2. Process data (example: simple query)
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name FROM customers LIMIT 10")
rows = cursor.fetchall()

# 3. Return response using Column Ids from schema
response = [
    {
        "customer_id": row["customer_id"],
        "customer_name": row["customer_name"]
    }
    for row in rows
]

# 4. Save response
ap_var(dbl, 'response', response)

# Clean up
cursor.close()
dbl.close()

That's it! KOEL handles column transformation and pagination metadata in the API response.


Column Schema Configuration

Defining Columns in Workshop UI

Navigate to Column Schema tab and define your columns:

NameColumn IdOrientationType
Customer Idcustomer_idLeftNormal
Customer Namecustomer_nameLeftNormal
Creation Datecreation_dateLeftDate
Total Salestotal_salesRightCurrency
Churn Predictionchurn_predictionRightPercentage

✅ Critical Rule: Use Column Id as Response Keys

Your Python code MUST return dictionaries using Column Id as keys:

# ✅ CORRECT - using Column Id
response = [
    {
        "customer_id": "12345",
        "customer_name": "Acme Corp",
        "creation_date": "2024-01-15",
        "total_sales": 50000.00,
        "churn_prediction": 0.75
    }
]

# ❌ WRONG - using Name instead of Column Id
response = [
    {
        "Customer Id": "12345",  # Wrong!
        "Customer Name": "Acme Corp"  # Wrong!
    }
]

Return ALL Columns in Schema

Always return all columns defined in your schema. KOEL will filter columns based on Detail_Columns in the payload.

# Even if payload requests only 2 columns, return all defined columns
response = [
    {
        "customer_id": "12345",
        "customer_name": "Acme Corp",
        "creation_date": "2024-01-15",
        "total_sales": 50000.00,
        "churn_prediction": 0.75
    }
]

Reading the Payload

Payload Structure

The payload is automatically saved by KOEL before your code runs. Access it with:

payload = ap_var(dbl, 'payload')

Payload Contents

{
    "reportType": "DETAIL",  # or "GROUP"
    "Column_Format": "col_name",
    "page": 1,
    "page_limit_rows": 100,
    "Detail_Columns": ["customer_id", "customer_name"],  # Column Ids requested
    "GroupByCol": ["customer_name"],  # For GROUP reports
    "GroupByValues": ["total_sales"],  # Aggregation columns
    "GroupFilter": {
        "filter_group_1": {
            "filter_1": {
                "Filter_Union": "AND",
                "Filter_Plugin": "M",
                "Filter_Column": "customer_id",
                "Filter_Type": "IN",
                "Filter_Value": ["12345", "12346"]
            }
        }
    },
    "order_by": [
        {"field": "customer_name", "direction": "ASC"}
    ],
    "debug": false
}

Extracting Values

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Get report type
report_type = payload.get('reportType', 'DETAIL')  # 'DETAIL' or 'GROUP'

# Get filters
filters = payload.get('GroupFilter', {})

# Get sorting
order_by = payload.get('order_by', [])

# For GROUP reports
group_by_cols = payload.get('GroupByCol', [])
group_by_values = payload.get('GroupByValues', [])

# Debug mode
debug_mode = payload.get('debug', False)

Response Format

Basic Response Structure

# List of dictionaries
response = [
    {
        "column_id_1": value1,
        "column_id_2": value2,
        "column_id_3": value3
    },
    {
        "column_id_1": value4,
        "column_id_2": value5,
        "column_id_3": value6
    }
]

# Save response
ap_var(dbl, 'response', response)

Data Type Handling

Ensure correct Python types for each column type:

response = [
    {
        "customer_id": "12345",              # String
        "customer_name": "Acme Corp",        # String
        "creation_date": "2024-01-15",       # String (YYYY-MM-DD)
        "total_sales": 50000.00,             # Float (Currency)
        "order_count": 25,                   # Integer
        "churn_prediction": 0.75,            # Float (0-1 for percentage)
        "is_active": True,                   # Boolean
        "metadata": {"key": "value"}         # Dict/Object
    }
]

Handling Null Values

response = [
    {
        "customer_id": row.get("id") or "",
        "customer_name": row.get("name") or "Unknown",
        "total_sales": float(row.get("sales") or 0.0),
        "last_order_date": row.get("last_order") if row.get("last_order") else None
    }
    for row in rows
]

Report Types

DETAIL Report

Returns raw rows - no aggregation. Your code should:

  1. Apply filters from GroupFilter
  2. Apply sorting from order_by
  3. Return individual rows

Don't worry about: pagination (KOEL handles it), column selection (KOEL filters it)

Example: DETAIL Report with Pagination

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Get pagination parameters
page = payload.get('page', 1)
page_size = payload.get('page_limit_rows', 100)
offset = (page - 1) * page_size

# Build WHERE clause from filters
where_clauses = []
params = []

filters = payload.get('GroupFilter', {})
for group_key, group_filters in filters.items():
    for filter_key, filter_obj in group_filters.items():
        col = filter_obj['Filter_Column']
        op = filter_obj['Filter_Type']
        val = filter_obj['Filter_Value']
        
        if op == 'IN':
            placeholders = ','.join(['%s'] * len(val))
            where_clauses.append(f"{col} IN ({placeholders})")
            params.extend(val)
        elif op == '=':
            where_clauses.append(f"{col} = %s")
            params.append(val)
        elif op == 'LIKE':
            where_clauses.append(f"{col} LIKE %s")
            params.append(f"%{val}%")

# Build ORDER BY clause
order_clauses = []
for order in payload.get('order_by', []):
    field = order['field']
    direction = order['direction']
    order_clauses.append(f"{field} {direction}")

# Construct query with pagination
query = "SELECT customer_id, customer_name, creation_date, total_sales FROM customers"

if where_clauses:
    query += " WHERE " + " AND ".join(where_clauses)

if order_clauses:
    query += " ORDER BY " + ", ".join(order_clauses)

# Add LIMIT and OFFSET
query += " LIMIT %s OFFSET %s"
params.extend([page_size, offset])

# Execute
cursor = dbl.cursor(dictionary=True)
cursor.execute(query, params)
rows = cursor.fetchall()

# Return all columns
response = [
    {
        "customer_id": row["customer_id"],
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)

cursor.close()
dbl.close()

print(f"✅ Page {page}: Returned {len(response)}/{page_size} rows")

GROUP Report

Returns aggregated data grouped by dimensions. Your code should:

  1. Use GroupByCol for GROUP BY dimensions
  2. Use GroupByValues for aggregations (SUM, AVG, COUNT)
  3. Apply filters and sorting

Example: GROUP Report with Pagination

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Get pagination parameters for GROUP report
page = payload.get('page', 1)
page_size = payload.get('grp_page_limit_rows', 50)  # Note: grp_page_limit_rows
offset = (page - 1) * page_size

# Get grouping configuration
group_by_cols = payload.get('GroupByCol', [])  # e.g., ["customer_name"]
group_by_values = payload.get('GroupByValues', [])  # e.g., ["total_sales"]

# Build GROUP BY query
group_cols_sql = ', '.join(group_by_cols)

# Build aggregations
agg_sql = []
for val_col in group_by_values:
    agg_sql.append(f"SUM({val_col}) as {val_col}")

select_parts = [group_cols_sql] + agg_sql

# Build filters (same as DETAIL)
where_clauses = []
params = []

filters = payload.get('GroupFilter', {})
for group_key, group_filters in filters.items():
    for filter_key, filter_obj in group_filters.items():
        col = filter_obj['Filter_Column']
        op = filter_obj['Filter_Type']
        val = filter_obj['Filter_Value']
        
        if op == '>=':
            where_clauses.append(f"{col} >= %s")
            params.append(val)

# Construct query
query = f"""
    SELECT {', '.join(select_parts)}
    FROM orders
"""

if where_clauses:
    query += " WHERE " + " AND ".join(where_clauses)

query += f" GROUP BY {group_cols_sql}"

# Add sorting
order_clauses = []
for order in payload.get('order_by', []):
    order_clauses.append(f"{order['field']} {order['direction']}")

if order_clauses:
    query += " ORDER BY " + ", ".join(order_clauses)

# Add LIMIT and OFFSET for pagination
query += " LIMIT %s OFFSET %s"
params.extend([page_size, offset])

# Execute
cursor = dbl.cursor(dictionary=True)
cursor.execute(query, params)
rows = cursor.fetchall()

# Return grouped data
response = [
    {
        "customer_name": row["customer_name"],
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)

cursor.close()
dbl.close()

print(f"✅ Page {page}: Returned {len(response)}/{page_size} grouped rows")

Filters & Sorting

Understanding GroupFilter Structure

Filters are nested: GroupFilter → filter_group → filter → conditions

{
    "GroupFilter": {
        "filter_group_1": {
            "filter_1": {
                "Filter_Union": "AND",      # AND or OR
                "Filter_Plugin": "M",       # M=Manual, N=Normal
                "Filter_Column": "customer_id",
                "Filter_Type": "IN",
                "Filter_Value": ["12345", "12346"]
            },
            "filter_2": {
                "Filter_Union": "OR",
                "Filter_Plugin": "M",
                "Filter_Column": "status",
                "Filter_Type": "=",
                "Filter_Value": "ACTIVE"
            }
        }
    }
}

Filter Types Reference

Filter_TypeSQL EquivalentValue TypeExample
=column = valueSingle"ACTIVE"
!=column != valueSingle"INACTIVE"
>column > valueSingle1000
<column < valueSingle500
>=column >= valueSingle100
<=column <= valueSingle5000
INcolumn IN (...)Array["A", "B", "C"]
NOT INcolumn NOT IN (...)Array["X", "Y"]
LIKEcolumn LIKE %value%String"Acme"
BETWEENcolumn BETWEEN x AND yArray [min, max][100, 500]

Filter Handler Function

def build_where_clause(filters):
    """
    Convert GroupFilter structure to SQL WHERE clause
    
    Returns: (where_sql, params)
    """
    where_clauses = []
    params = []
    
    for group_key, group_filters in filters.items():
        group_conditions = []
        
        for filter_key, filter_obj in group_filters.items():
            col = filter_obj['Filter_Column']
            op = filter_obj['Filter_Type']
            val = filter_obj['Filter_Value']
            union = filter_obj.get('Filter_Union', 'AND')
            
            if op == '=':
                group_conditions.append(f"{col} = %s")
                params.append(val)
            
            elif op == '!=':
                group_conditions.append(f"{col} != %s")
                params.append(val)
            
            elif op in ['>', '=', '<=']:
                group_conditions.append(f"{col} {op} %s")
                params.append(val)
            
            elif op == 'IN':
                placeholders = ','.join(['%s'] * len(val))
                group_conditions.append(f"{col} IN ({placeholders})")
                params.extend(val)
            
            elif op == 'NOT IN':
                placeholders = ','.join(['%s'] * len(val))
                group_conditions.append(f"{col} NOT IN ({placeholders})")
                params.extend(val)
            
            elif op == 'LIKE':
                group_conditions.append(f"{col} LIKE %s")
                params.append(f"%{val}%")
            
            elif op == 'BETWEEN':
                group_conditions.append(f"{col} BETWEEN %s AND %s")
                params.extend(val)
        
        # Combine conditions within group
        if group_conditions:
            where_clauses.append(f"({' AND '.join(group_conditions)})")
    
    # Combine groups with OR (typically)
    where_sql = ' OR '.join(where_clauses) if where_clauses else ""
    
    return where_sql, params


# Usage
filters = payload.get('GroupFilter', {})
where_sql, params = build_where_clause(filters)

if where_sql:
    query = f"SELECT * FROM customers WHERE {where_sql}"
    cursor.execute(query, params)
else:
    query = "SELECT * FROM customers"
    cursor.execute(query)

Sorting Handler

def build_order_clause(order_by):
    """
    Convert order_by array to SQL ORDER BY clause
    
    Returns: order_sql
    """
    if not order_by:
        return ""
    
    order_parts = []
    for order in order_by:
        field = order['field']
        direction = order['direction']  # 'ASC' or 'DESC'
        order_parts.append(f"{field} {direction}")
    
    return "ORDER BY " + ", ".join(order_parts)


# Usage
order_by = payload.get('order_by', [])
order_sql = build_order_clause(order_by)

query = f"SELECT * FROM customers WHERE status = 'ACTIVE' {order_sql}"

Integration Examples

Example 1: Local MySQL Query

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Simple query from repository database
cursor = dbl.cursor(dictionary=True)
cursor.execute("""
    SELECT 
        customer_id,
        customer_name,
        DATE_FORMAT(creation_date, '%Y-%m-%d') as creation_date,
        total_sales
    FROM customers
    WHERE status = 'ACTIVE'
    ORDER BY total_sales DESC
    LIMIT 1000
""")

rows = cursor.fetchall()

# Return response
response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"],
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)

cursor.close()
dbl.close()

Example 2: External MySQL Database

import mysql.connector
from dkconnect import connect

# Repository connection for variables
dbl = connect()
payload = ap_var(dbl, 'payload')

# Connect to external database
external_db = mysql.connector.connect(
    host="external-db.example.com",
    port=3306,
    user="readonly_user",
    password="secure_password",
    database="crm_prod"
)

try:
    cursor = external_db.cursor(dictionary=True)
    
    # Query external database
    cursor.execute("""
        SELECT 
            cust_id,
            cust_name,
            DATE(created_at) as creation_date,
            revenue_total
        FROM external_customers
        WHERE status = 'ACTIVE'
        ORDER BY revenue_total DESC
    """)
    
    rows = cursor.fetchall()
    
    # Map external columns to your schema Column Ids
    response = [
        {
            "customer_id": str(row["cust_id"]),
            "customer_name": row["cust_name"],
            "creation_date": str(row["creation_date"]),
            "total_sales": float(row["revenue_total"])
        }
        for row in rows
    ]
    
    ap_var(dbl, 'response', response)
    
    cursor.close()
    
finally:
    external_db.close()
    dbl.close()

print(f"✅ Fetched {len(response)} records from external database")

requirements.txt:

mysql-connector-python==8.2.0

Example 3: REST API Integration

import requests
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Call external API
api_response = requests.get(
    "https://api.example.com/v1/customers",
    headers={
        "Authorization": "Bearer YOUR_API_KEY",
        "Content-Type": "application/json"
    },
    params={
        "status": "active",
        "limit": 1000
    },
    timeout=30
)

api_response.raise_for_status()
api_data = api_response.json()

# Transform API response to match your Column Schema
response = [
    {
        "customer_id": item["id"],
        "customer_name": item["name"],
        "creation_date": item["created_at"][:10],  # Extract date part
        "total_sales": float(item["total_purchases"])
    }
    for item in api_data.get("customers", [])
]

ap_var(dbl, 'response', response)
dbl.close()

print(f"✅ Fetched {len(response)} records from API")

requirements.txt:

requests==2.31.0

Example 4: Slack Messages Integration

import requests
from datetime import datetime
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Slack API configuration
SLACK_TOKEN = "xoxb-your-slack-bot-token"
CHANNEL_ID = "C01234567"

# Fetch messages
response_api = requests.get(
    "https://slack.com/api/conversations.history",
    headers={"Authorization": f"Bearer {SLACK_TOKEN}"},
    params={
        "channel": CHANNEL_ID,
        "limit": 100
    }
)

slack_data = response_api.json()

if not slack_data.get("ok"):
    raise Exception(f"Slack API error: {slack_data.get('error')}")

# Transform to match Column Schema
# Schema: message_id, user_id, message_text, timestamp, reactions_count
response = [
    {
        "message_id": msg.get("ts", ""),
        "user_id": msg.get("user", ""),
        "message_text": msg.get("text", ""),
        "timestamp": datetime.fromtimestamp(
            float(msg.get("ts", 0))
        ).strftime("%Y-%m-%d %H:%M:%S"),
        "reactions_count": len(msg.get("reactions", []))
    }
    for msg in slack_data.get("messages", [])
]

ap_var(dbl, 'response', response)
dbl.close()

print(f"✅ Fetched {len(response)} Slack messages")

requirements.txt:

requests==2.31.0

Example 5: Combined Data Sources

import mysql.connector
import requests
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# 1. Get customer list from local database
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name FROM customers WHERE status = 'ACTIVE'")
local_customers = {row['customer_id']: row for row in cursor.fetchall()}
cursor.close()

# 2. Get sales data from external API
api_response = requests.get(
    "https://api.analytics.example.com/sales",
    headers={"Authorization": "Bearer API_KEY"},
    timeout=30
)
sales_data = api_response.json()

# 3. Merge data
response = []
for sale in sales_data.get("sales", []):
    cust_id = sale["customer_id"]
    
    if cust_id in local_customers:
        response.append({
            "customer_id": cust_id,
            "customer_name": local_customers[cust_id]["customer_name"],
            "creation_date": sale["order_date"],
            "total_sales": float(sale["amount"])
        })

ap_var(dbl, 'response', response)
dbl.close()

print(f"✅ Merged data for {len(response)} customers")

Advanced Patterns

Pattern 1: Dynamic Query Building

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Start with base query
query_parts = ["SELECT customer_id, customer_name, creation_date, total_sales FROM customers"]
params = []

# Add WHERE clause from filters
filters = payload.get('GroupFilter', {})
where_clauses = []

for group_key, group_filters in filters.items():
    for filter_key, filter_obj in group_filters.items():
        col = filter_obj['Filter_Column']
        op = filter_obj['Filter_Type']
        val = filter_obj['Filter_Value']
        
        if op == 'IN':
            placeholders = ','.join(['%s'] * len(val))
            where_clauses.append(f"{col} IN ({placeholders})")
            params.extend(val)
        elif op == 'LIKE':
            where_clauses.append(f"{col} LIKE %s")
            params.append(f"%{val}%")

if where_clauses:
    query_parts.append("WHERE " + " AND ".join(where_clauses))

# Add ORDER BY
order_by = payload.get('order_by', [])
if order_by:
    order_clauses = [f"{o['field']} {o['direction']}" for o in order_by]
    query_parts.append("ORDER BY " + ", ".join(order_clauses))

# Execute
final_query = " ".join(query_parts)
print(f"🔍 Query: {final_query}")
print(f"🔍 Params: {params}")

cursor = dbl.cursor(dictionary=True)
cursor.execute(final_query, params)
rows = cursor.fetchall()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Pattern 2: Error Handling

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

try:
    # Your data fetching code
    cursor = dbl.cursor(dictionary=True)
    cursor.execute("SELECT * FROM customers")
    rows = cursor.fetchall()
    
    response = [
        {
            "customer_id": str(row["customer_id"]),
            "customer_name": row["customer_name"],
            "creation_date": str(row["creation_date"]),
            "total_sales": float(row["total_sales"])
        }
        for row in rows
    ]
    
    ap_var(dbl, 'response', response)
    cursor.close()
    
except Exception as e:
    # Log error
    print(f"❌ Error: {str(e)}")
    
    # Return empty response or error indicator
    response = []
    ap_var(dbl, 'response', response)
    
finally:
    dbl.close()

Pattern 3: Handling Pagination

IMPORTANT: You MUST implement pagination in your queries using page and page_limit_rows/grp_page_limit_rows from the payload.

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Get pagination parameters
report_type = payload.get('reportType', 'DETAIL')
page = payload.get('page', 1)

# Different limits for DETAIL vs GROUP
if report_type == 'DETAIL':
    page_size = payload.get('page_limit_rows', 100)
else:  # GROUP
    page_size = payload.get('grp_page_limit_rows', 50)

# Calculate OFFSET
offset = (page - 1) * page_size

# Apply LIMIT and OFFSET in query
cursor = dbl.cursor(dictionary=True)
query = """
    SELECT customer_id, customer_name, creation_date, total_sales
    FROM customers
    WHERE status = 'ACTIVE'
    ORDER BY customer_id
    LIMIT %s OFFSET %s
"""

cursor.execute(query, (page_size, offset))
rows = cursor.fetchall()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)

cursor.close()
dbl.close()

print(f"✅ Page {page}: Returned {len(response)} rows (max {page_size})")

Pagination Best Practices

# Always extract pagination parameters
page = payload.get('page', 1)
report_type = payload.get('reportType', 'DETAIL')

# Use correct limit based on report type
if report_type == 'DETAIL':
    limit = payload.get('page_limit_rows', 100)
else:
    limit = payload.get('grp_page_limit_rows', 50)

# Calculate offset
offset = (page - 1) * limit

# Apply to query
query = f"""
    SELECT ... FROM table
    WHERE ...
    ORDER BY ...
    LIMIT {limit} OFFSET {offset}
"""

Pattern 4: Using Pandas for Data Processing

import pandas as pd
from dkconnect import connect, connect_engine

dbl = connect()
payload = ap_var(dbl, 'payload')

# Use SQLAlchemy engine for pandas
engine = connect_engine()

# Read data into DataFrame
df = pd.read_sql("""
    SELECT customer_id, customer_name, creation_date, order_amount
    FROM orders
    WHERE status = 'COMPLETED'
""", engine)

# Process with pandas
df_grouped = df.groupby('customer_id').agg({
    'customer_name': 'first',
    'creation_date': 'min',
    'order_amount': 'sum'
}).reset_index()

# Convert to response format
response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d"),
        "total_sales": float(row["order_amount"])
    }
    for _, row in df_grouped.iterrows()
]

ap_var(dbl, 'response', response)
dbl.close()

print(f"✅ Processed {len(response)} aggregated records")

requirements.txt:

pandas==2.0.0
sqlalchemy==2.0.0

Pattern 5: Caching with Custom Variables

from dkconnect import connect
from datetime import datetime

dbl = connect()
payload = ap_var(dbl, 'payload')

# Check if we have cached data (less than 1 hour old)
try:
    cache_data = ap_var(dbl, 'cached_customers')
    cache_time = ap_var(dbl, 'cache_timestamp')
    
    # Parse cache time
    cache_dt = datetime.fromisoformat(cache_time)
    age_minutes = (datetime.utcnow() - cache_dt).total_seconds() / 60
    
    if age_minutes < 60:
        print(f"✅ Using cached data ({age_minutes:.1f} minutes old)")
        ap_var(dbl, 'response', cache_data)
        dbl.close()
        exit()
except:
    pass  # No cache or expired

# Fetch fresh data
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name, creation_date, total_sales FROM customers")
rows = cursor.fetchall()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

# Cache for next execution
ap_var(dbl, 'cached_customers', response)
ap_var(dbl, 'cache_timestamp', datetime.utcnow().isoformat())

ap_var(dbl, 'response', response)

cursor.close()
dbl.close()

print(f"✅ Fetched and cached {len(response)} records")

Troubleshooting

Error: "No 'response' variable found"

Cause: You didn't call ap_var(dbl, 'response', dataset)

Solution:

# Always save response before script ends
ap_var(dbl, 'response', response)

Error: "Column 'xyz' not in schema whitelist"

Cause: Your response includes columns not defined in Column Schema

Solution: Only return columns defined in Workshop UI Column Schema

# Check your Column Schema tab
# Only return Column Ids defined there
response = [
    {
        "customer_id": row["id"],      # Defined in schema ✅
        "customer_name": row["name"],  # Defined in schema ✅
        # "internal_field": row["x"]   # NOT in schema ❌
    }
    for row in rows
]

Error: "Response validation failed"

Cause: Using wrong keys (Name instead of Column Id)

Solution:

# ❌ WRONG - using Name from UI
response = [{"Customer Id": "123"}]

# ✅ CORRECT - using Column Id from UI
response = [{"customer_id": "123"}]

Empty Response

Cause: Query returned no rows

Solution: Always return a list (even if empty)

cursor.execute(query)
rows = cursor.fetchall()

if not rows:
    response = []  # Empty list is valid
else:
    response = [process(row) for row in rows]

ap_var(dbl, 'response', response)

Script Crashes

Cause: Unhandled exception

Solution: Add try/except

try:
    # Your code
    response = fetch_data()
    ap_var(dbl, 'response', response)
except Exception as e:
    print(f"❌ Error: {str(e)}")
    ap_var(dbl, 'response', [])  # Return empty
finally:
    dbl.close()

Date Format Issues

Cause: Wrong date format

Solution: Use ISO format (YYYY-MM-DD)

from datetime import datetime

# Convert MySQL date to string
creation_date = row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None

# Convert string to date
from datetime import datetime
date_obj = datetime.strptime("2024-01-15", "%Y-%m-%d")

Quick Reference

Minimal Template

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

# Your code here
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT * FROM table")
rows = cursor.fetchall()

response = [
    {
        "column_id_1": row["field1"],
        "column_id_2": row["field2"]
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Key Functions

# Read payload
payload = ap_var(dbl, 'payload')

# Save response
ap_var(dbl, 'response', dataset)

# Save custom variable
ap_var(dbl, 'custom_var', value)

# Read custom variable
value = ap_var(dbl, 'custom_var')

# Database connection
dbl = connect()  # Repository DB
engine = connect_engine()  # SQLAlchemy engine

Column Schema Mapping

UI FieldUse In PythonExample
NameDisplay only"Customer Id"
Column IdResponse key"customer_id"
TypeData type hintNormal, Date, Currency