Skip to main content

Uruchamianie własnego grafu SNAP przez API NSIS-Cloud

SNAP (Sentinel Application Platform) to otwartoźródłowa platforma i wspólna architektura opracowana przez Europejską Agencję Kosmiczną, przeznaczona do wykorzystania danych obserwacji Ziemi. SNAP umożliwia przetwarzanie i analizę danych teledetekcyjnych, oferując zarówno graficzny interfejs użytkownika (SNAP Desktop), jak i zestaw narzędzi do pracy w trybie wsadowym.

Graf przetwarzania można zbudować w narzędziu SNAP Desktop przy użyciu modułu Graph Builder. Utworzony w ten sposób graf zapisywany jest w formacie XML. Użytkownik powinien przygotować graf, w którym jako parametry zostaną zdefiniowane dane wejściowe i wyjściowe. Podczas uruchamiania przetwarzania konieczne będzie podanie wartości tych parametrów oraz lokalizacji grafu.

Poniższy przykład pokazuje, jak uruchomić przetwarzanie z wykorzystaniem SNAP i własnego grafu w infrastrukturze NSIS-Cloud. Kolejne kroki wyjaśniają, jak użyć API do wywołania przetwarzania w tym środowisku.

Zaimportuj niezbędne biblioteki

Na początku zaimportuj wymagane biblioteki Python:

import getpass
import json
import requests

from datetime import datetime
from IPython.display import JSON

Wygeneruj token użytkownika

Uwierzytelnij się przy użyciu nazwy użytkownika i hasła NSIS-Cloud, aby uzyskać token dostępu do API.

print('Podaj użytkownika i hasło dla użytkownika NSIS')
username = input('NSIS Username: ')
password = getpass.getpass(prompt='NSIS Password: ', stream=None)

def get_api_headers(username: str, password: str) -> dict:
    auth_url = "https://identity.nsiscloud.polsa.gov.pl/auth/realms/NSIS-Cloud/protocol/openid-connect/token"
    token_data = {
        "username": username,
        "password": password,
        "client_id": "nsis-public",
        "client_secret": "",
        "grant_type": "password"
    }
    print(requests.post(auth_url, token_data))
    access_token = requests.post(auth_url, token_data).json()["access_token"]
    return {'Authorization': 'Bearer ' + access_token}

api_headers_ordering = get_api_headers(username, password)

Sprawdź WorkflowOptions potrzebne do stworzenia zamówienia

Zanim utworzysz zamówienie, sprawdź, które WorkflowOptions są wymagane przez workflow custom_snap.

ordering_url = "https://odp.nsiscloud.polsa.gov.pl/odata/v1"
url = f"{ordering_url}/Workflows?$expand=WorkflowOptions&$filter=(Name eq 'custom_snap')"

resp = requests.get(url, headers=api_headers_ordering)
if resp.status_code == 403:
    print('regenerate token...')
    api_headers_ordering = get_api_headers(username, password)
    resp = requests.get(url, headers=api_headers_ordering)
JSON(resp.json())

Stwórz body dla SNAP processora z customowym graphem

Cały przepływ pracy polega na tym, że użytkownik podaje graf, który wcześniej wgrał do swojego prywatnego kubełka, oraz przygotowuje plik JSON z danymi do tego grafu.

Dane do grafu to produkty wejściowe, których może być jeden lub więcej. W strukturze JSON:

  • każdy klucz odpowiada parametrowi użytemu w grafie

  • każda wartość zawiera dwa pola:

    • source_type – określa, gdzie znajduje się produkt wejściowy

    • source – określa nazwę, link lub ścieżkę do produktu

Pole source_type może przyjmować jedną z następujących wartości:

  • s3 – produkt znajduje się w prywatnym kubełku użytkownika

    • Ten produkt musi znajdować się w tym samym kubełku, pod tym samym endpointem i z użyciem tych samych kluczy dostępowych, co miejsce, do którego użytkownik wgrał graf.

  • catalogue – produkt pochodzi z katalogu NSIS

    • W tym przypadku wystarczy podać nazwę produktu z katalogu.

  • temporary – produkt znajduje się w magazynie tymczasowym

    • W tym przypadku wartością jest link do tego zasobu.

Produkty dla jednego grafu mogą pochodzić z jednego lub z wielu źródeł. Zależy to od potrzeb użytkownika.

W poniższym przykładzie pokazano wszystkie trzy typy źródeł. Nazwy infile_a, infile_b oraz infile_c powinny być zdefiniowane w grafie, ale – jak wspomniano wcześniej – to użytkownik sam określa te klucze i podaje je zarówno w grafie, jak i później w zamówieniu.

Przykład definicji wejść

{
    "infile_a": {
        "source_type": "s3",  # Jeśli produkt wrzucony jest na s3 w prywatnym kubełku, tym samym co graf
        "source": "s3://my_super_bucket/input_products/S2A_MSIL1C_20250410T004721_N0511_R102_T53HQC_20250410T021138.SAFE"
        # WAŻNE: klucze dostępowe, endpoint i bucket muszą być takie same jak dla grafu
    },
    "infile_b": {
        "source_type": "catalogue",  # Jeśli produkt jest w katalogu NSIS
        "source": "S2A_MSIL1C_20250410T004721_N0511_R102_T53HQC_20250410T021138.SAFE"
    },
    "infile_c": {
        "source_type": "temporary",  # Jeśli produkt jest w tymczasowym kubełku i dostępny po linku
        "source": "https://s3.waw4-1.cloudferro.com/swift/v1/tmp-storage/20250317_24630858_ziY6t6o1.zip?temp_url_sig=f8e4d680x3fcccdfd76098b9b075c8fa6dfcdcd0&temp_url_expires=1743422771"
    },
}

Zacznij od podania danych do grafu

Utwórz obiekt JSON opisujący produkt lub produkty wejściowe używane przez graf.

products_in_graph = {
    "<klucz_input_produktu>": {
        "source_type": "<s3/catalogue/temporary>",
        "source": "<source>"
    }
    ...
}

Stwórz definicję produktów wyjściowych

Trzeba również utworzyć obiekt JSON opisujący produkty wyjściowe.

Tutaj także kluczem jest nazwa zdefiniowana przez użytkownika w grafie, a wartością jest nazwa produktu wynikowego.

Przetwarzanie może zwrócić więcej niż jeden produkt wyjściowy. Jeśli graf użytkownika został przygotowany tak, aby wygenerować wiele wyników, należy podać wszystkie oczekiwane produkty wyjściowe.

Przykład definicji wyjść

{
    "outfile_a": "result_file.tiff",
    "outfile_b": "result_file.dim"
}

Szablon dla produktów wyjściowych

output_filenames = {
    "<klucz produktu wyjściowego>": "<nazwa produktu wyjściowego>",
    ....
}

Zdefiniuj workflow options

Gdy obiekty JSON dla danych wejściowych i wyjściowych są już przygotowane, utwórz listę WorkflowOptions.

workflow_options = [
    {"Name": "s3_endpoint_url", "Value": "<s3_endpoint>"},
    {"Name": "s3_access_key", "Value": "<access key do s3>"},
    {"Name": "s3_secret_key", "Value": "<secret key do s3>"},
    {"Name": "graph_path", "Value": "<ścieżka do grafu>"},
    {"Name": "products_in_graph", "Value": products_in_graph},
    {"Name": "output_filenames", "Value": output_filenames},
    {"Name": "output_storage", "Value": "<TEMPORARY lub PRIVATE>"},
    {"Name": "output_dir", "Value": "<ścieżka do miejsca zapisu produktów wyjściowych>"},
]

Znaczenie poszczególnych parametrów:

  • s3_endpoint_url – adres endpointu S3, do którego został wgrany graf, na przykład:

    https://s3.waw4-1.cloudferro.com
    
  • s3_access_key – klucz dostępowy do S3

  • s3_secret_key – tajny klucz do S3

  • graph_path – ścieżka do grafu w prywatnej przestrzeni, na przykład:

    s3://my_super_bucket/graphs/first_graph
    
  • products_in_graph – wcześniej przygotowany JSON z produktami wejściowymi

  • output_filenames – wcześniej przygotowany JSON z produktami wyjściowymi

  • output_storage – miejsce, w którym mają zostać zapisane produkty wyjściowe

    • TEMPORARY oznacza zapis do magazynu tymczasowego, a w wyniku użytkownik otrzyma link do pobrania produktu.

    • PRIVATE oznacza zapis produktu wyjściowego w prywatnym kubełku użytkownika na S3.

Jeśli zostanie wybrana opcja PRIVATE, miejsce przechowywania wyników musi być dokładnie takie samo jak miejsce przechowywania grafu, czyli musi używać tego samego endpointu, kubełka i kluczy dostępowych.

W takim przypadku należy również podać parametr output_dir, który określa folder docelowy dla produktów wyjściowych.

Stwórz body zamówienia

Po przygotowaniu workflow options zbuduj pełne body żądania dla zamówienia.

order_name = f"Zamówienie z grafem użytkownika {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
{
    "WorkflowName": "custom_snap",
    "Name": order_name,
    "InputProductReference": {
        "Reference": ""
    },
    "WorkflowOptions": workflow_options
}
print()
print("Order body:")
JSON(order_body)

Stwórz zamówienie

Wyślij zamówienie do API, aby rozpocząć przetwarzanie.

url = f"{ordering_url}/ProductionOrder/OData.CSC.Order"

resp = requests.post(url, json.dumps(order_body), headers=api_headers_ordering)
if resp.status_code == 403:
    print('regenerate token...')
    api_headers_ordering = get_api_headers(username, password)
    resp = requests.post(url, json.dumps(order_body), headers=api_headers_ordering)
print(resp.status_code)
order_id = resp.json()['value']['Id']
print('Zamówienie', order_id, 'złożone')

Sprawdź status zamówienia

Po utworzeniu zamówienia sprawdzaj jego status do momentu zakończenia przetwarzania.

Jeśli status ma wartość done, oznacza to, że zamówienie zostało zakończone pomyślnie.

Pole StatusMessage zawiera informacje o potencjalnych błędach i może również zwracać logi z etapu przetwarzania, jeśli ten etap został osiągnięty.

url = f"{ordering_url}/BatchOrder({str(order_id)})"
resp = requests.get(url, headers=api_headers_ordering)
if resp.status_code == 403:
    print('regenerate token...')
    api_headers_ordering = get_api_headers(username, password)
    resp = requests.get(url, headers=api_headers_ordering)
print("Zamówienie", order_id, "status:", resp.json()['value'])