End-to-end Pipeline Processing

This page demonstrates a short program that uses the Quantemplate API to upload data to a preconfigured pipeline, run the pipeline, and download the results

📘

Prerequisites

Before trying this example, make sure you have followed the guide to Setting Up a Connection

Step 1: Configure Pipeline and permissions in-app

In the Quantemplate app, we need two pieces set up:

  1. A Dataset that's shared with the API User in 'Can edit' mode.
  2. A Pipeline that uses the Dataset :point-up-2: as an input - also shared with the API User in 'Can edit' mode.

Make a note of the IDs in the page URLs. The important parts are the Organisation ID, the Dataset ID and the Pipeline ID. These are shown on the Data and Integrate tabs, respectively.

2268

Check the URL to get the relevant IDs, and share both the Dataset and the Pipeline with the whole organisation. 'Can edit' access is required.

Pipeline Configuration

The operations performed on the input are up to you! You can test them out by running the pipeline in the app itself, to make sure they do what you want them to do.

For the purposes of this example, the pipeline will have just one input (the Dataset uploaded to the Data tab) and one output. It's possible to download multiple outputs, but for this simple demo we'll just deal with one.

349

For the purposes of this example, the pipeline will have just one input and one output.

Step 2: Call the API

With our Access Token retrieved from the authentication endpoint, we can then use it to make requests to the API.

In this example, we'll be calling the Upload Dataset endpoint, then Execute Pipeline. We'll check the status of the execution with List Executions every second until it finishes. Finally we'll use the Download Output endpoint to download the CSV output.

We also need to provide our access token, by setting the Authorization header to Bearer <token>

Code (Python 3)

The code below demonstrates a small application using the endpoints mentioned above in order to upload data, run a pipeline and download the output. It requires the requests library.

#!/usr/bin/env python3

# a full end-to-end use case of the QT API: uploading a dataset, running a pipeline, and downloading the output

import requests
import time


# this class is a small example client for making multiple calls to the API

class ApiClient:
  def __init__(self, user_id, secret):
    self.user_id = user_id
    self.secret = secret 
    self.base_endpoint = 'https://fabric.prod.quantemplate.com/external/v1'

    
  def authenticate(self):
    auth_result = requests.post('https://accounts.prod.quantemplate.com/auth/realms/qt/protocol/openid-connect/token', data = {
      'grant_type': 'client_credentials',
      'client_id': self.user_id,
      'client_secret': self.secret
    }).json()

    self.access_token = auth_result['access_token']

  def _headers(self):
    return { 'Authorization': f'Bearer {self.access_token}' }

  
  def download_dataset(self, org_id, dataset_id):
    print(f'Downloading dataset {dataset_id}')
    url = f'{self.base_endpoint}/organisations/{org_id}/datasets/{dataset_id}'
    return requests.get(url, headers = self._headers()).content.decode('utf-8')

  def download_execution_output(self, org_id, pipeline_id, execution_id, output_id):
    print(f'Downloading output {output_id} for execution {execution_id}')
    url = f'{self.base_endpoint}/organisations/{org_id}/pipelines/{pipeline_id}/executions/{execution_id}/outputs/{output_id}'
    return requests.get(url, headers = self._headers()).content.decode('utf-8')

  
  def upload_dataset(self, org_id, dataset_id, file_path):
    print(f'Uploading {file_path} to dataset {dataset_id}')
    url = f'{self.base_endpoint}/organisations/{org_id}/datasets/{dataset_id}'
    with open(file_path, mode='rb') as f:
      return requests.post(url, data = f, headers = self._headers())

  def list_executions(self, org_id, pipeline_id):
    print(f'Listing executions for pipeline {pipeline_id}')
    url = f'{self.base_endpoint}/organisations/{org_id}/pipelines/{pipeline_id}/executions'
    return requests.get(url, headers = self._headers()).json()

  
  def execute_pipeline(self, org_id, pipeline_id):
    print(f'Executing pipeline {pipeline_id}')
    url = f'{self.base_endpoint}/organisations/{org_id}/pipelines/{pipeline_id}/executions'
    response = requests.post(url, headers = self._headers()).json()
    execution_id = response['id']
    print(f'Executed pipeline {pipeline_id} with execution {execution_id}')
    return execution_id

  
  def get_execution(self, org_id, pipeline_id, execution_id):
    executions = self.list_executions(org_id, pipeline_id)
    return [e for e in executions if e['id'] == execution_id][0]

  
  def wait_for_execution_to_finish(self, org_id, pipeline_id, execution_id):
    while (True):
      execution = self.get_execution(org_id, pipeline_id, execution_id)
      if (execution['status'] == 'Started'):
        time.sleep(1)
      else:
        return execution


# this method uses the API client to upload, run, and download      
      
def run(user_id, client_secret, org_id, pipeline_input_id, pipeline_id, pipeline_input_file):
  api = ApiClient(user_id, client_secret)

  # authentication is necessary before interacting with the API
  api.authenticate()

  api.upload_dataset(org_id, pipeline_input_id, pipeline_input_file)

  # our execute_pipeline client method returns us the execution id
  execution_id = api.execute_pipeline(org_id, pipeline_id)

  # this method polls the endpoint every second and returns the execution object
  execution = api.wait_for_execution_to_finish(org_id, pipeline_id, execution_id)
  status = execution['status']
  if (status != 'Succeeded'):
    print(f'Pipeline execution failed: {status}')
    return
  
  # our pipeline is configured to have exactly one output
  output_id = execution['outputs'][0]['id']

  # once we have the output id we can download it
  output = api.download_execution_output(org_id, pipeline_id, execution_id, output_id)
  
  # in this example app we simply print the output to the screen.
  # we could just as easily write it to a file, or parse and upload to a SQL database
  print(output)

run(user_id = '...', # insert your user ID here
    client_secret = '...', # insert your client secret here
    org_id = '...', # insert your target organisation ID here
    pipeline_input_id = '...', # insert your input dataset ID here
    pipeline_id = '...', # insert your pipeline ID here
    pipeline_input_file = '...') # insert the path to your new CSV input here

When we substitute in our config values and run this program, we will see a series of logging statements and, if all goes well, the output of our pipeline in CSV format is printed to the terminal.