Knowledge Nodes (Knowledge Ontology Execution Nodes)
Build Intelligent Data Integrations with Enterprise-Grade Governance
Introduction
Welcome to the KOEL (Knowledge Ontology Execution Layer) Reasoning Flow Developer Guide. This documentation will teach you how to build custom data integrations and processing logic inside ARPIA Workshop that power Knowledge Ontology nodes.
The Power of KOEL Reasoning Flows
KOEL Reasoning Flows represent a paradigm shift in enterprise data integration. Instead of building separate APIs, microservices, or ETL pipelines for each data source, you can now create intelligent, schema-driven integrations that automatically benefit from:
- 🔐 Enterprise Security: Built-in bearer token authentication, ISO 42001 compliance, and complete audit trails
- 🎯 Dynamic Query Execution: Filter, sort, paginate, and aggregate data on-the-fly without hardcoding logic
- 🔄 Universal Data Access: Seamlessly integrate MySQL, PostgreSQL, REST APIs, GraphQL, cloud services, and legacy systems
- 📊 Dual Report Modes: Generate both detailed row-level data (DETAIL) and aggregated analytics (GROUP) from a single codebase
- 🚀 Zero Infrastructure: Write Python in the IDE, deploy instantly—no servers, containers, or DevOps overhead
- 📝 Automatic Documentation: Your column schema becomes self-documenting API contracts
- 🔍 Full Traceability: Every execution is logged with payload, response, and timing for compliance and debugging
- 🧩 Composable Architecture: Chain multiple reasoning flows, cache results, and build complex data pipelines
- 🤖 AI-Powered Reasoning: Full integration with ARPIA's AI governance framework for intelligent decision-making and autonomous agents
The result? What traditionally takes weeks of backend development, API design, authentication setup, and deployment pipelines can now be accomplished in hours. You write focused Python logic that solves your specific integration challenge, and KOEL handles everything else—validation, transformation, pagination, security, and governance.
Native Integration with ARPIA AI Platform
Your KOEL nodes are first-class citizens in the ARPIA ecosystem. Once deployed, they automatically become available across the entire platform:
- 🎨 App Studio: Use your nodes as data sources for visual applications, dashboards, and user interfaces—no additional configuration needed
- 🔌 MCP (Model Context Protocol): Expose your nodes as MCP tools that AI agents (Claude, GPT-4, etc.) can discover and invoke autonomously
- 🌐 API Collections: Your nodes are instantly available as REST APIs with auto-generated OpenAPI specs for external integrations
- 🧠 AI Reasoning Governance: Every node execution participates in ARPIA's AI governance framework with:
- Reasoning Audit Trails: Track which AI agents accessed what data and why
- Policy Enforcement: Apply data access policies, rate limits, and compliance rules automatically
- Context Management: AI agents receive proper context about data schemas, business rules, and constraints
- Decision Lineage: Trace AI decisions back through data sources and reasoning steps
This means your reasoning flow becomes:
- A visual component in App Studio (drag-and-drop data widgets)
- An AI agent tool via MCP (Claude can query your CRM data)
- A REST API endpoint (external systems can consume)
- A governed data node (full compliance and audit)
All from a single Python file in Workshop. No duplicate code, no synchronization issues, no deployment complexity.
Real-World Impact
Whether you're connecting to external CRM systems, processing real-time IoT data, aggregating financial reports, or building ML-powered predictions, KOEL Reasoning Flows give you enterprise power with startup speed.
Imagine:
- An AI assistant that autonomously queries your Salesforce data via MCP
- A dashboard that visualizes that same data in App Studio
- An external partner consuming it via REST API
- All governed by the same security, validation, and audit framework
That's the power of KOEL: write once, deploy everywhere, govern consistently.
What is a Reasoning Flow?
A Reasoning Flow is a Python-based Workshop Project that acts as the "brain" behind a Knowledge Ontology node. When users query a KOEL node through the API, your reasoning flow:
- Receives a structured payload with filters, sorting, and pagination parameters
- Processes the request by querying databases, calling APIs, or computing results
- Returns a properly formatted dataset that matches your defined schema
Think of it as a serverless function that bridges external data sources with ARPIA's Knowledge Ontology system.
Your Role as a Developer
You write Python code in the Workshop IDE (index.py) that:
- ✅ Reads the incoming request payload
- ✅ Applies filters, sorting, and pagination to your queries
- ✅ Integrates with external systems (MySQL, REST APIs, etc.)
- ✅ Returns data using column names defined in your schema
What This Guide Covers
This comprehensive guide includes:
- Quick Start: Get running in 5 minutes with a minimal example
- Column Schema: How to define and use your data structure
- Payload Structure: Understanding filters, sorting, pagination, and report types
- Response Format: How to structure your output correctly
- Report Types: DETAIL (raw rows) vs GROUP (aggregations) with examples
- Filters & Sorting: Complete guide to handling all filter types and sorting
- Integration Examples: Real-world code for MySQL, REST APIs, Slack, and more
- Advanced Patterns: Production-ready patterns for error handling, caching, and performance
- Troubleshooting: Common errors and how to fix them
Before You Start
Prerequisites:
- Basic Python knowledge
- Understanding of SQL (for database integrations)
- Familiarity with REST APIs (for web service integrations)
- Access to ARPIA Workshop IDE
Tools Available:
- Repository database connection via
dkconnect.connect() - Variable storage/retrieval via
ap_var(dbl, 'var_name', value) - Custom Python packages via
requirements.txt - Full Python 3.x standard library
Creating Your Reasoning Flow
Before writing any code, create your Flow in Workshop:
- In Reasoning Flows, press New Flow.
- Enter a Name.
- Set Project Type — for most KOEL integrations use
Batch Process. UseAPI Callsif your node needs to run on-demand via REST. - Set Project Category to
KOEL. - Add an optional Description.
- Press Create.
Your flow's index.py opens in the Workshop IDE — ready for development.
Project Category is organizational only — it does not change how the flow executes. It groups your KOEL flows in the workspace for navigation and filtering. For the full reference of all Project Types and Categories, see Reasoning Flows.
Image placeholder — New Flow dialog showing Name, Project Type, and Project Category = KOEL.
Quick Start
Minimal Example
# index.py
from dkconnect import connect
# Connect to repository database
dbl = connect()
# 1. Read payload
payload = ap_var(dbl, 'payload')
# 2. Process data (example: simple query)
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name FROM customers LIMIT 10")
rows = cursor.fetchall()
# 3. Return response using Column Ids from schema
response = [
{
"customer_id": row["customer_id"],
"customer_name": row["customer_name"]
}
for row in rows
]
# 4. Save response
ap_var(dbl, 'response', response)
# Clean up
cursor.close()
dbl.close()
That's it! KOEL handles column transformation and pagination metadata in the API response.
Column Schema Configuration
Defining Columns in Workshop UI
Navigate to Column Schema tab and define your columns:
| Name | Column Id | Orientation | Type |
|---|---|---|---|
| Customer Id | customer_id | Left | Normal |
| Customer Name | customer_name | Left | Normal |
| Creation Date | creation_date | Left | Date |
| Total Sales | total_sales | Right | Currency |
| Churn Prediction | churn_prediction | Right | Percentage |
✅ Critical Rule: Use Column Id as Response Keys
Your Python code MUST return dictionaries using Column Id as keys:
# ✅ CORRECT - using Column Id
response = [
{
"customer_id": "12345",
"customer_name": "Acme Corp",
"creation_date": "2024-01-15",
"total_sales": 50000.00,
"churn_prediction": 0.75
}
]
# ❌ WRONG - using Name instead of Column Id
response = [
{
"Customer Id": "12345", # Wrong!
"Customer Name": "Acme Corp" # Wrong!
}
]
Return ALL Columns in Schema
Always return all columns defined in your schema. KOEL will filter columns based on Detail_Columns in the payload.
Reading the Payload
Payload Structure
The payload is automatically saved by KOEL before your code runs. Access it with:
payload = ap_var(dbl, 'payload')
Payload Contents
{
"reportType": "DETAIL", # or "GROUP"
"Column_Format": "col_name",
"page": 1,
"page_limit_rows": 100,
"Detail_Columns": ["customer_id", "customer_name"],
"GroupByCol": ["customer_name"],
"GroupByValues": ["total_sales"],
"GroupFilter": {
"filter_group_1": {
"filter_1": {
"Filter_Union": "AND",
"Filter_Plugin": "M",
"Filter_Column": "customer_id",
"Filter_Type": "IN",
"Filter_Value": ["12345", "12346"]
}
}
},
"order_by": [
{"field": "customer_name", "direction": "ASC"}
],
"debug": false
}
Extracting Values
report_type = payload.get('reportType', 'DETAIL')
filters = payload.get('GroupFilter', {})
order_by = payload.get('order_by', [])
group_by_cols = payload.get('GroupByCol', [])
group_by_values = payload.get('GroupByValues', [])
debug_mode = payload.get('debug', False)
Response Format
Basic Response Structure
response = [
{
"column_id_1": value1,
"column_id_2": value2,
}
]
ap_var(dbl, 'response', response)
Data Type Handling
response = [
{
"customer_id": "12345", # String
"customer_name": "Acme Corp", # String
"creation_date": "2024-01-15", # String (YYYY-MM-DD)
"total_sales": 50000.00, # Float (Currency)
"order_count": 25, # Integer
"churn_prediction": 0.75, # Float (0-1 for percentage)
"is_active": True, # Boolean
"metadata": {"key": "value"} # Dict/Object
}
]
Handling Null Values
response = [
{
"customer_id": row.get("id") or "",
"customer_name": row.get("name") or "Unknown",
"total_sales": float(row.get("sales") or 0.0),
"last_order_date": row.get("last_order") if row.get("last_order") else None
}
for row in rows
]
Report Types
DETAIL Report
Returns raw rows — no aggregation. Apply filters, apply sorting, return individual rows.
Don't worry about: pagination (KOEL handles it), column selection (KOEL filters it).
Example: DETAIL Report with Pagination
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
page = payload.get('page', 1)
page_size = payload.get('page_limit_rows', 100)
offset = (page - 1) * page_size
where_clauses = []
params = []
filters = payload.get('GroupFilter', {})
for group_key, group_filters in filters.items():
for filter_key, filter_obj in group_filters.items():
col = filter_obj['Filter_Column']
op = filter_obj['Filter_Type']
val = filter_obj['Filter_Value']
if op == 'IN':
placeholders = ','.join(['%s'] * len(val))
where_clauses.append(f"{col} IN ({placeholders})")
params.extend(val)
elif op == '=':
where_clauses.append(f"{col} = %s")
params.append(val)
elif op == 'LIKE':
where_clauses.append(f"{col} LIKE %s")
params.append(f"%{val}%")
order_clauses = [f"{o['field']} {o['direction']}" for o in payload.get('order_by', [])]
query = "SELECT customer_id, customer_name, creation_date, total_sales FROM customers"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
if order_clauses:
query += " ORDER BY " + ", ".join(order_clauses)
query += " LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor = dbl.cursor(dictionary=True)
cursor.execute(query, params)
rows = cursor.fetchall()
response = [
{
"customer_id": row["customer_id"],
"customer_name": row["customer_name"],
"creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
GROUP Report
Returns aggregated data. Use GroupByCol for dimensions, GroupByValues for aggregations.
Example: GROUP Report with Pagination
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
page = payload.get('page', 1)
page_size = payload.get('grp_page_limit_rows', 50)
offset = (page - 1) * page_size
group_by_cols = payload.get('GroupByCol', [])
group_by_values = payload.get('GroupByValues', [])
group_cols_sql = ', '.join(group_by_cols)
agg_sql = [f"SUM({col}) as {col}" for col in group_by_values]
where_clauses = []
params = []
filters = payload.get('GroupFilter', {})
for group_key, group_filters in filters.items():
for filter_key, filter_obj in group_filters.items():
col = filter_obj['Filter_Column']
op = filter_obj['Filter_Type']
val = filter_obj['Filter_Value']
if op == '>=':
where_clauses.append(f"{col} >= %s")
params.append(val)
query = f"SELECT {', '.join([group_cols_sql] + agg_sql)} FROM orders"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
query += f" GROUP BY {group_cols_sql}"
order_clauses = [f"{o['field']} {o['direction']}" for o in payload.get('order_by', [])]
if order_clauses:
query += " ORDER BY " + ", ".join(order_clauses)
query += " LIMIT %s OFFSET %s"
params.extend([page_size, offset])
cursor = dbl.cursor(dictionary=True)
cursor.execute(query, params)
rows = cursor.fetchall()
response = [
{
"customer_name": row["customer_name"],
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
Filters & Sorting
Filter Types Reference
| Filter_Type | SQL Equivalent | Value Type | Example |
|---|---|---|---|
= | column = value | Single | "ACTIVE" |
!= | column != value | Single | "INACTIVE" |
> | column > value | Single | 1000 |
< | column < value | Single | 500 |
>= | column >= value | Single | 100 |
<= | column <= value | Single | 5000 |
IN | column IN (...) | Array | ["A", "B", "C"] |
NOT IN | column NOT IN (...) | Array | ["X", "Y"] |
LIKE | column LIKE %value% | String | "Acme" |
BETWEEN | column BETWEEN x AND y | Array [min, max] | [100, 500] |
Filter Handler Function
def build_where_clause(filters):
where_clauses = []
params = []
for group_key, group_filters in filters.items():
group_conditions = []
for filter_key, filter_obj in group_filters.items():
col = filter_obj['Filter_Column']
op = filter_obj['Filter_Type']
val = filter_obj['Filter_Value']
if op in ['=', '!=', '>', '<', '>=', '<=']:
group_conditions.append(f"{col} {op} %s")
params.append(val)
elif op == 'IN':
placeholders = ','.join(['%s'] * len(val))
group_conditions.append(f"{col} IN ({placeholders})")
params.extend(val)
elif op == 'NOT IN':
placeholders = ','.join(['%s'] * len(val))
group_conditions.append(f"{col} NOT IN ({placeholders})")
params.extend(val)
elif op == 'LIKE':
group_conditions.append(f"{col} LIKE %s")
params.append(f"%{val}%")
elif op == 'BETWEEN':
group_conditions.append(f"{col} BETWEEN %s AND %s")
params.extend(val)
if group_conditions:
where_clauses.append(f"({' AND '.join(group_conditions)})")
where_sql = ' OR '.join(where_clauses) if where_clauses else ""
return where_sql, params
Sorting Handler
def build_order_clause(order_by):
if not order_by:
return ""
order_parts = [f"{o['field']} {o['direction']}" for o in order_by]
return "ORDER BY " + ", ".join(order_parts)
Integration Examples
Example 1: Local MySQL Query
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
cursor = dbl.cursor(dictionary=True)
cursor.execute("""
SELECT
customer_id,
customer_name,
DATE_FORMAT(creation_date, '%Y-%m-%d') as creation_date,
total_sales
FROM customers
WHERE status = 'ACTIVE'
ORDER BY total_sales DESC
LIMIT 1000
""")
rows = cursor.fetchall()
response = [
{
"customer_id": str(row["customer_id"]),
"customer_name": row["customer_name"],
"creation_date": row["creation_date"],
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
Example 2: External MySQL Database
import mysql.connector
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
external_db = mysql.connector.connect(
host="external-db.example.com",
port=3306,
user="readonly_user",
password="secure_password",
database="crm_prod"
)
try:
cursor = external_db.cursor(dictionary=True)
cursor.execute("""
SELECT cust_id, cust_name, DATE(created_at) as creation_date, revenue_total
FROM external_customers
WHERE status = 'ACTIVE'
ORDER BY revenue_total DESC
""")
rows = cursor.fetchall()
response = [
{
"customer_id": str(row["cust_id"]),
"customer_name": row["cust_name"],
"creation_date": str(row["creation_date"]),
"total_sales": float(row["revenue_total"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
finally:
external_db.close()
dbl.close()
requirements.txt:
mysql-connector-python==8.2.0
Example 3: REST API Integration
import requests
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
api_response = requests.get(
"https://api.example.com/v1/customers",
headers={"Authorization": "Bearer YOUR_API_KEY"},
params={"status": "active", "limit": 1000},
timeout=30
)
api_response.raise_for_status()
api_data = api_response.json()
response = [
{
"customer_id": item["id"],
"customer_name": item["name"],
"creation_date": item["created_at"][:10],
"total_sales": float(item["total_purchases"])
}
for item in api_data.get("customers", [])
]
ap_var(dbl, 'response', response)
dbl.close()
requirements.txt:
requests==2.31.0
Example 4: Slack Messages Integration
import requests
from datetime import datetime
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
SLACK_TOKEN = "xoxb-your-slack-bot-token"
CHANNEL_ID = "C01234567"
response_api = requests.get(
"https://slack.com/api/conversations.history",
headers={"Authorization": f"Bearer {SLACK_TOKEN}"},
params={"channel": CHANNEL_ID, "limit": 100}
)
slack_data = response_api.json()
if not slack_data.get("ok"):
raise Exception(f"Slack API error: {slack_data.get('error')}")
response = [
{
"message_id": msg.get("ts", ""),
"user_id": msg.get("user", ""),
"message_text": msg.get("text", ""),
"timestamp": datetime.fromtimestamp(float(msg.get("ts", 0))).strftime("%Y-%m-%d %H:%M:%S"),
"reactions_count": len(msg.get("reactions", []))
}
for msg in slack_data.get("messages", [])
]
ap_var(dbl, 'response', response)
dbl.close()
Example 5: Combined Data Sources
import requests
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name FROM customers WHERE status = 'ACTIVE'")
local_customers = {row['customer_id']: row for row in cursor.fetchall()}
cursor.close()
api_response = requests.get(
"https://api.analytics.example.com/sales",
headers={"Authorization": "Bearer API_KEY"},
timeout=30
)
sales_data = api_response.json()
response = []
for sale in sales_data.get("sales", []):
cust_id = sale["customer_id"]
if cust_id in local_customers:
response.append({
"customer_id": cust_id,
"customer_name": local_customers[cust_id]["customer_name"],
"creation_date": sale["order_date"],
"total_sales": float(sale["amount"])
})
ap_var(dbl, 'response', response)
dbl.close()
Advanced Patterns
Pattern 1: Dynamic Query Building
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
query_parts = ["SELECT customer_id, customer_name, creation_date, total_sales FROM customers"]
params = []
filters = payload.get('GroupFilter', {})
where_clauses = []
for group_key, group_filters in filters.items():
for filter_key, filter_obj in group_filters.items():
col = filter_obj['Filter_Column']
op = filter_obj['Filter_Type']
val = filter_obj['Filter_Value']
if op == 'IN':
placeholders = ','.join(['%s'] * len(val))
where_clauses.append(f"{col} IN ({placeholders})")
params.extend(val)
elif op == 'LIKE':
where_clauses.append(f"{col} LIKE %s")
params.append(f"%{val}%")
if where_clauses:
query_parts.append("WHERE " + " AND ".join(where_clauses))
order_by = payload.get('order_by', [])
if order_by:
order_clauses = [f"{o['field']} {o['direction']}" for o in order_by]
query_parts.append("ORDER BY " + ", ".join(order_clauses))
final_query = " ".join(query_parts)
cursor = dbl.cursor(dictionary=True)
cursor.execute(final_query, params)
rows = cursor.fetchall()
response = [
{
"customer_id": str(row["customer_id"]),
"customer_name": row["customer_name"],
"creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
Pattern 2: Error Handling
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
try:
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT * FROM customers")
rows = cursor.fetchall()
response = [
{
"customer_id": str(row["customer_id"]),
"customer_name": row["customer_name"],
"creation_date": str(row["creation_date"]),
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
except Exception as e:
print(f"❌ Error: {str(e)}")
ap_var(dbl, 'response', [])
finally:
dbl.close()
Pattern 3: Handling Pagination
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
report_type = payload.get('reportType', 'DETAIL')
page = payload.get('page', 1)
page_size = payload.get('page_limit_rows', 100) if report_type == 'DETAIL' else payload.get('grp_page_limit_rows', 50)
offset = (page - 1) * page_size
cursor = dbl.cursor(dictionary=True)
cursor.execute("""
SELECT customer_id, customer_name, creation_date, total_sales
FROM customers
WHERE status = 'ACTIVE'
ORDER BY customer_id
LIMIT %s OFFSET %s
""", (page_size, offset))
rows = cursor.fetchall()
response = [
{
"customer_id": str(row["customer_id"]),
"customer_name": row["customer_name"],
"creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
Pattern 4: Using Pandas for Data Processing
import pandas as pd
from dkconnect import connect, connect_engine
dbl = connect()
payload = ap_var(dbl, 'payload')
engine = connect_engine()
df = pd.read_sql("""
SELECT customer_id, customer_name, creation_date, order_amount
FROM orders
WHERE status = 'COMPLETED'
""", engine)
df_grouped = df.groupby('customer_id').agg({
'customer_name': 'first',
'creation_date': 'min',
'order_amount': 'sum'
}).reset_index()
response = [
{
"customer_id": str(row["customer_id"]),
"customer_name": row["customer_name"],
"creation_date": row["creation_date"].strftime("%Y-%m-%d"),
"total_sales": float(row["order_amount"])
}
for _, row in df_grouped.iterrows()
]
ap_var(dbl, 'response', response)
dbl.close()
requirements.txt:
pandas==2.0.0
sqlalchemy==2.0.0
Pattern 5: Caching with Custom Variables
from dkconnect import connect
from datetime import datetime
dbl = connect()
payload = ap_var(dbl, 'payload')
try:
cache_data = ap_var(dbl, 'cached_customers')
cache_time = ap_var(dbl, 'cache_timestamp')
cache_dt = datetime.fromisoformat(cache_time)
age_minutes = (datetime.utcnow() - cache_dt).total_seconds() / 60
if age_minutes < 60:
print(f"✅ Using cached data ({age_minutes:.1f} minutes old)")
ap_var(dbl, 'response', cache_data)
dbl.close()
exit()
except:
pass
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name, creation_date, total_sales FROM customers")
rows = cursor.fetchall()
response = [
{
"customer_id": str(row["customer_id"]),
"customer_name": row["customer_name"],
"creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
"total_sales": float(row["total_sales"])
}
for row in rows
]
ap_var(dbl, 'cached_customers', response)
ap_var(dbl, 'cache_timestamp', datetime.utcnow().isoformat())
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
Troubleshooting
Error: "No 'response' variable found"
Always call ap_var(dbl, 'response', dataset) before the script ends.
Error: "Column 'xyz' not in schema whitelist"
Only return Column Ids defined in your Column Schema tab.
Error: "Response validation failed"
Use Column Id (e.g. customer_id), not the display Name (e.g. Customer Id).
Empty Response
Always return a list — even if empty:
ap_var(dbl, 'response', [])
Script Crashes
Wrap in try/except and always return a response:
try:
response = fetch_data()
ap_var(dbl, 'response', response)
except Exception as e:
print(f"❌ Error: {str(e)}")
ap_var(dbl, 'response', [])
finally:
dbl.close()
Date Format Issues
Use ISO format (YYYY-MM-DD):
creation_date = row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None
Quick Reference
Minimal Template
from dkconnect import connect
dbl = connect()
payload = ap_var(dbl, 'payload')
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT * FROM table")
rows = cursor.fetchall()
response = [
{
"column_id_1": row["field1"],
"column_id_2": row["field2"]
}
for row in rows
]
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()
Key Functions
payload = ap_var(dbl, 'payload') # Read payload
ap_var(dbl, 'response', dataset) # Save response
ap_var(dbl, 'custom_var', value) # Save custom variable
value = ap_var(dbl, 'custom_var') # Read custom variable
dbl = connect() # Repository DB connection
engine = connect_engine() # SQLAlchemy engine
Column Schema Mapping
| UI Field | Use In Python | Example |
|---|---|---|
| Name | Display only | "Customer Id" |
| Column Id | Response key | "customer_id" |
| Type | Data type hint | Normal, Date, Currency |
