Run a custom SNAP graph through the NSIS-Cloud API
SNAP (Sentinel Application Platform) is an open-source platform and common architecture developed by the European Space Agency for the exploitation of Earth observation data. SNAP enables the processing and analysis of remote sensing data, offering both a graphical user interface (SNAP Desktop) and a set of tools for batch processing.
A processing graph can be built in SNAP Desktop using the Graph Builder module. The graph created in this way is saved in XML format. Users should prepare a graph in which the input and output datasets are defined as parameters. When launching processing, it is necessary to provide values for those parameters as well as the location of the graph.
The example below shows how to execute processing using SNAP and a custom graph on the NSIS-Cloud infrastructure. The steps describe how to use the API to invoke processing within that environment.
Import the necessary libraries
Start by importing the required Python libraries:
import getpass
import json
import requests
from datetime import datetime
from IPython.display import JSON
Generate a user token
Authenticate with your NSIS-Cloud username and password to obtain an access token for the API.
print('Enter username and password for NSIS user')
username = input('NSIS Username: ')
password = getpass.getpass(prompt='NSIS Password: ', stream=None)
def get_api_headers(username: str, password: str) -> dict:
auth_url = "https://identity.nsiscloud.polsa.gov.pl/auth/realms/NSIS-Cloud/protocol/openid-connect/token"
token_data = {
"username": username,
"password": password,
"client_id": "nsis-public",
"client_secret": "",
"grant_type": "password"
}
print(requests.post(auth_url, token_data))
access_token = requests.post(auth_url, token_data).json()["access_token"]
return {'Authorization': 'Bearer ' + access_token}
api_headers_ordering = get_api_headers(username, password)
Check the workflow options required to create the order
Before creating an order, verify which WorkflowOptions are required by the custom_snap workflow.
ordering_url = "https://odp.nsiscloud.polsa.gov.pl/odata/v1"
url = f"{ordering_url}/Workflows?$expand=WorkflowOptions&$filter=(Name eq 'custom_snap')"
resp = requests.get(url, headers=api_headers_ordering)
if resp.status_code == 403:
print('regenerate token...')
api_headers_ordering = get_api_headers(username, password)
resp = requests.get(url, headers=api_headers_ordering)
JSON(resp.json())
Create a body for the SNAP processor with a custom graph
The whole workflow is based on the user providing a graph, which has already been uploaded to private storage, together with a JSON file containing data for that graph.
The graph data consists of input products, which can be one or more. In the JSON structure:
each key corresponds to a parameter used in the graph
each value contains two fields:
source_type – indicates where the input product is located
source – the name, link, or path to the product
The source_type can be one of the following:
s3 – the product is stored in the user’s private storage
This product must be in the same bucket, use the same endpoint, and use the same access keys as the location where the graph was uploaded.
catalogue – the product comes from the NSIS catalogue
In this case, the value only needs to contain the catalogue product name.
temporary – the product is stored in temporary storage
In this case, the value is simply a temporary link.
Products for one graph can come from one or many different sources. This depends entirely on the user’s workflow.
In the example below, all three source types are used. The names infile_a, infile_b, and infile_c must match the parameter names defined in the graph itself.
Example input definition
{
"infile_a": {
"source_type": "s3", # If the product is uploaded to S3 in a private bucket, the same one as the graph
"source": "s3://my_super_bucket/input_products/S2A_MSIL1C_20250410T004721_N0511_R102_T53HQC_20250410T021138.SAFE"
# IMPORTANT: The access keys, endpoint, and bucket must be exactly the same as those used for the graph.
},
"infile_b": {
"source_type": "catalogue", # If the product is in the NSIS catalogue
"source": "S2A_MSIL1C_20250410T004721_N0511_R102_T53HQC_20250410T021138.SAFE"
},
"infile_c": {
"source_type": "temporary", # If the product is in temporary storage and accessible via a link
"source": "https://s3.waw4-1.cloudferro.com/swift/v1/tmp-storage/20250317_24630858_ziY6t6o1.zip?temp_url_sig=f8e4d680x3fcccdfd76098b9b075c8fa6dfcdcd0&temp_url_expires=1743422771"
},
}
Provide the input data for the graph
Create a JSON object describing the input product or products used by the graph.
products_in_graph = {
"<input_product_key>": {
"source_type": "<s3/catalogue/temporary>",
"source": "<source>"
}
...
}
Create the output product definition
You must also create a JSON object describing the output products.
Here again, the key is the name defined by the user in the graph, and the value is the name of the resulting product.
A workflow may return more than one output product. If the graph is designed to produce multiple outputs, all expected output products should be provided.
Example output definition
{
"outfile_a": "result_file.tiff",
"outfile_b": "result_file.dim"
}
Template for output products
output_filenames = {
"<output_product_key>": "<input_product_name>",
....
}
Define the workflow options
Once the input and output JSON objects are prepared, create the WorkflowOptions list.
workflow_options = [
{"Name": "s3_endpoint_url", "Value": "<s3_endpoint>"},
{"Name": "s3_access_key", "Value": "<access key do s3>"},
{"Name": "s3_secret_key", "Value": "<secret key do s3>"},
{"Name": "graph_path", "Value": "<ścieżka do grafu>"},
{"Name": "products_in_graph", "Value": products_in_graph},
{"Name": "output_filenames", "Value": output_filenames},
{"Name": "output_storage", "Value": "<TEMPORARY or PRIVATE>"},
{"Name": "output_dir", "Value": "<path to save output data>"},
]
Explanation of workflow options:
s3_endpoint_url – the S3 endpoint where the graph was uploaded, for example:
https://s3.waw4-1.cloudferro.com
s3_access_key – the S3 access key
s3_secret_key – the S3 secret key
graph_path – the path to the graph in private storage, for example:
s3://my_super_bucket/graphs/first_graph
products_in_graph – the JSON created earlier for input products
output_filenames – the JSON created earlier for output products
output_storage – the location where output products will be saved
TEMPORARY means the result will be stored in temporary storage and a download link will be returned.
PRIVATE means the result will be saved in the user’s private S3 bucket.
If PRIVATE is selected, the output must be stored in exactly the same location as the graph, meaning the same endpoint, bucket, and access keys must be used.
In that case, the output_dir parameter must also be provided to define the target folder for the output products.
Create the order body
After preparing the workflow options, build the full request body for the order.
order_name = f"Order with custom graph {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
{
"WorkflowName": "custom_snap",
"Name": order_name,
"InputProductReference": {
"Reference": ""
},
"WorkflowOptions": workflow_options
}
print()
print("Order body:")
JSON(order_body)
Create the order
Send the order to the API to start processing.
url = f"{ordering_url}/ProductionOrder/OData.CSC.Order"
resp = requests.post(url, json.dumps(order_body), headers=api_headers_ordering)
if resp.status_code == 403:
print('regenerate token...')
api_headers_ordering = get_api_headers(username, password)
resp = requests.post(url, json.dumps(order_body), headers=api_headers_ordering)
print(resp.status_code)
order_id = resp.json()['value']['Id']
print('Order', order_id, 'created')
Check the status of the order
After the order is created, check its status regularly until processing finishes.
If the status is done, the order has completed successfully.
The StatusMessage field provides information about possible errors and may also return logs from the processing stage, if processing was reached.
url = f"{ordering_url}/BatchOrder({str(order_id)})"
resp = requests.get(url, headers=api_headers_ordering)
if resp.status_code == 403:
print('regenerate token...')
api_headers_ordering = get_api_headers(username, password)
resp = requests.get(url, headers=api_headers_ordering)
print("Order", order_id, "status:", resp.json()['value'])