Straight-Through Processing
This page demonstrates a short program that uses the Quantemplate API to upload data to a preconfigured pipeline, run the pipeline, and download the results
Set up Straight-Through Processing directly in Quantemplate with new automation features
Straight-Through Processing can now be configured by business users within the Quantemplate UI, no need to use the APIs.
- Trigger a pipeline run when datasets or feeds update
- Auto-export pipeline outputs to multiple destinations when no validation failures are detected
- Chain multiple pipelines together to centralise processes
- Queue pipeline runs and prevent infinite loops
- Receive email notifications when pipelines are Auto Run or when validation issues block output.
See how on the Help Centre →
Prerequisites
Before trying this example, make sure you have followed the guide to Setting Up a Connection
Step 1: Configure Pipeline and permissions in-app
In the Quantemplate app, we need two pieces set up:
- A Dataset that's shared with the API User in 'Can edit' mode.
- A Pipeline that uses the Dataset 👆 as an input - also shared with the API User in 'Can edit' mode.
Make a note of the IDs in the page URLs. The important parts are the Organisation ID, the Dataset ID and the Pipeline ID. These are shown on the Data and Integrate tabs, respectively.
Pipeline Configuration
The operations performed on the input are up to you! You can test them out by running the pipeline in the app itself, to make sure they do what you want them to do.
For the purposes of this example, the pipeline will have just one input (the Dataset uploaded to the Data tab) and one output. It's possible to download multiple outputs, but for this simple demo we'll just deal with one.
Step 2: Call the API
With our Access Token retrieved from the authentication endpoint, we can then use it to make requests to the API.
In this example, we'll be calling the Upload Dataset endpoint, then Execute Pipeline. We'll check the status of the execution with List Executions every second until it finishes. Finally we'll use the Download Output endpoint to download the CSV output.
We also need to provide our access token, by setting the Authorization
header to Bearer <token>
Code (Python 3)
The code below demonstrates a small application using the endpoints mentioned above in order to upload data, run a pipeline and download the output. It requires the requests library.
#!/usr/bin/env python3
# a full end-to-end use case of the QT API: uploading a dataset, running a pipeline, and downloading the output
import requests
import time
# this class is a small example client for making multiple calls to the API
class ApiClient:
def __init__(self, user_id, secret):
self.user_id = user_id
self.secret = secret
self.base_endpoint = 'https://fabric.prod.quantemplate.com/external/v1'
def authenticate(self):
auth_result = requests.post('https://accounts.prod.quantemplate.com/auth/realms/qt/protocol/openid-connect/token', data = {
'grant_type': 'client_credentials',
'client_id': self.user_id,
'client_secret': self.secret
}).json()
self.access_token = auth_result['access_token']
def _headers(self):
return { 'Authorization': f'Bearer {self.access_token}' }
def download_dataset(self, org_id, dataset_id):
print(f'Downloading dataset {dataset_id}')
url = f'{self.base_endpoint}/organisations/{org_id}/datasets/{dataset_id}'
return requests.get(url, headers = self._headers()).content.decode('utf-8')
def download_execution_output(self, org_id, pipeline_id, execution_id, output_id):
print(f'Downloading output {output_id} for execution {execution_id}')
url = f'{self.base_endpoint}/organisations/{org_id}/pipelines/{pipeline_id}/executions/{execution_id}/outputs/{output_id}'
return requests.get(url, headers = self._headers()).content.decode('utf-8')
def upload_dataset(self, org_id, dataset_id, file_path):
print(f'Uploading {file_path} to dataset {dataset_id}')
url = f'{self.base_endpoint}/organisations/{org_id}/datasets/{dataset_id}'
with open(file_path, mode='rb') as f:
return requests.post(url, data = f, headers = self._headers())
def list_executions(self, org_id, pipeline_id):
print(f'Listing executions for pipeline {pipeline_id}')
url = f'{self.base_endpoint}/organisations/{org_id}/pipelines/{pipeline_id}/executions'
return requests.get(url, headers = self._headers()).json()
def execute_pipeline(self, org_id, pipeline_id):
print(f'Executing pipeline {pipeline_id}')
url = f'{self.base_endpoint}/organisations/{org_id}/pipelines/{pipeline_id}/executions'
response = requests.post(url, headers = self._headers()).json()
execution_id = response['id']
print(f'Executed pipeline {pipeline_id} with execution {execution_id}')
return execution_id
def get_execution(self, org_id, pipeline_id, execution_id):
executions = self.list_executions(org_id, pipeline_id)
return [e for e in executions if e['id'] == execution_id][0]
def wait_for_execution_to_finish(self, org_id, pipeline_id, execution_id):
while (True):
execution = self.get_execution(org_id, pipeline_id, execution_id)
if (execution['status'] == 'Started'):
time.sleep(1)
else:
return execution
# this method uses the API client to upload, run, and download
def run(user_id, client_secret, org_id, pipeline_input_id, pipeline_id, pipeline_input_file):
api = ApiClient(user_id, client_secret)
# authentication is necessary before interacting with the API
api.authenticate()
api.upload_dataset(org_id, pipeline_input_id, pipeline_input_file)
# our execute_pipeline client method returns us the execution id
execution_id = api.execute_pipeline(org_id, pipeline_id)
# this method polls the endpoint every second and returns the execution object
execution = api.wait_for_execution_to_finish(org_id, pipeline_id, execution_id)
status = execution['status']
if (status != 'Succeeded'):
print(f'Pipeline execution failed: {status}')
return
# our pipeline is configured to have exactly one output
output_id = execution['outputs'][0]['id']
# once we have the output id we can download it
output = api.download_execution_output(org_id, pipeline_id, execution_id, output_id)
# in this example app we simply print the output to the screen.
# we could just as easily write it to a file, or parse and upload to a SQL database
print(output)
run(user_id = '...', # insert your user ID here
client_secret = '...', # insert your client secret here
org_id = '...', # insert your target organisation ID here
pipeline_input_id = '...', # insert your input dataset ID here
pipeline_id = '...', # insert your pipeline ID here
pipeline_input_file = '...') # insert the path to your new CSV input here
When we substitute in our config values and run this program, we will see a series of logging statements and, if all goes well, the output of our pipeline in CSV format is printed to the terminal.
Updated about 1 month ago