Uruchamianie własnego grafu SNAP przez API NSIS-Cloud
SNAP (Sentinel Application Platform) to otwartoźródłowa platforma i wspólna architektura opracowana przez Europejską Agencję Kosmiczną, przeznaczona do wykorzystania danych obserwacji Ziemi. SNAP umożliwia przetwarzanie i analizę danych teledetekcyjnych, oferując zarówno graficzny interfejs użytkownika (SNAP Desktop), jak i zestaw narzędzi do pracy w trybie wsadowym.
Graf przetwarzania można zbudować w narzędziu SNAP Desktop przy użyciu modułu Graph Builder. Utworzony w ten sposób graf zapisywany jest w formacie XML. Użytkownik powinien przygotować graf, w którym jako parametry zostaną zdefiniowane dane wejściowe i wyjściowe. Podczas uruchamiania przetwarzania konieczne będzie podanie wartości tych parametrów oraz lokalizacji grafu.
Poniższy przykład pokazuje, jak uruchomić przetwarzanie z wykorzystaniem SNAP i własnego grafu w infrastrukturze NSIS-Cloud. Kolejne kroki wyjaśniają, jak użyć API do wywołania przetwarzania w tym środowisku.
Zaimportuj niezbędne biblioteki
Na początku zaimportuj wymagane biblioteki Python:
import getpass
import json
import requests
from datetime import datetime
from IPython.display import JSON
Wygeneruj token użytkownika
Uwierzytelnij się przy użyciu nazwy użytkownika i hasła NSIS-Cloud, aby uzyskać token dostępu do API.
print('Podaj użytkownika i hasło dla użytkownika NSIS')
username = input('NSIS Username: ')
password = getpass.getpass(prompt='NSIS Password: ', stream=None)
def get_api_headers(username: str, password: str) -> dict:
auth_url = "https://identity.nsiscloud.polsa.gov.pl/auth/realms/NSIS-Cloud/protocol/openid-connect/token"
token_data = {
"username": username,
"password": password,
"client_id": "nsis-public",
"client_secret": "",
"grant_type": "password"
}
print(requests.post(auth_url, token_data))
access_token = requests.post(auth_url, token_data).json()["access_token"]
return {'Authorization': 'Bearer ' + access_token}
api_headers_ordering = get_api_headers(username, password)
Sprawdź WorkflowOptions potrzebne do stworzenia zamówienia
Zanim utworzysz zamówienie, sprawdź, które WorkflowOptions są wymagane przez workflow custom_snap.
ordering_url = "https://odp.nsiscloud.polsa.gov.pl/odata/v1"
url = f"{ordering_url}/Workflows?$expand=WorkflowOptions&$filter=(Name eq 'custom_snap')"
resp = requests.get(url, headers=api_headers_ordering)
if resp.status_code == 403:
print('regenerate token...')
api_headers_ordering = get_api_headers(username, password)
resp = requests.get(url, headers=api_headers_ordering)
JSON(resp.json())
Stwórz body dla SNAP processora z customowym graphem
Cały przepływ pracy polega na tym, że użytkownik podaje graf, który wcześniej wgrał do swojego prywatnego kubełka, oraz przygotowuje plik JSON z danymi do tego grafu.
Dane do grafu to produkty wejściowe, których może być jeden lub więcej. W strukturze JSON:
każdy klucz odpowiada parametrowi użytemu w grafie
każda wartość zawiera dwa pola:
source_type – określa, gdzie znajduje się produkt wejściowy
source – określa nazwę, link lub ścieżkę do produktu
Pole source_type może przyjmować jedną z następujących wartości:
s3 – produkt znajduje się w prywatnym kubełku użytkownika
Ten produkt musi znajdować się w tym samym kubełku, pod tym samym endpointem i z użyciem tych samych kluczy dostępowych, co miejsce, do którego użytkownik wgrał graf.
catalogue – produkt pochodzi z katalogu NSIS
W tym przypadku wystarczy podać nazwę produktu z katalogu.
temporary – produkt znajduje się w magazynie tymczasowym
W tym przypadku wartością jest link do tego zasobu.
Produkty dla jednego grafu mogą pochodzić z jednego lub z wielu źródeł. Zależy to od potrzeb użytkownika.
W poniższym przykładzie pokazano wszystkie trzy typy źródeł. Nazwy infile_a, infile_b oraz infile_c powinny być zdefiniowane w grafie, ale – jak wspomniano wcześniej – to użytkownik sam określa te klucze i podaje je zarówno w grafie, jak i później w zamówieniu.
Przykład definicji wejść
{
"infile_a": {
"source_type": "s3", # Jeśli produkt wrzucony jest na s3 w prywatnym kubełku, tym samym co graf
"source": "s3://my_super_bucket/input_products/S2A_MSIL1C_20250410T004721_N0511_R102_T53HQC_20250410T021138.SAFE"
# WAŻNE: klucze dostępowe, endpoint i bucket muszą być takie same jak dla grafu
},
"infile_b": {
"source_type": "catalogue", # Jeśli produkt jest w katalogu NSIS
"source": "S2A_MSIL1C_20250410T004721_N0511_R102_T53HQC_20250410T021138.SAFE"
},
"infile_c": {
"source_type": "temporary", # Jeśli produkt jest w tymczasowym kubełku i dostępny po linku
"source": "https://s3.waw4-1.cloudferro.com/swift/v1/tmp-storage/20250317_24630858_ziY6t6o1.zip?temp_url_sig=f8e4d680x3fcccdfd76098b9b075c8fa6dfcdcd0&temp_url_expires=1743422771"
},
}
Zacznij od podania danych do grafu
Utwórz obiekt JSON opisujący produkt lub produkty wejściowe używane przez graf.
products_in_graph = {
"<klucz_input_produktu>": {
"source_type": "<s3/catalogue/temporary>",
"source": "<source>"
}
...
}
Stwórz definicję produktów wyjściowych
Trzeba również utworzyć obiekt JSON opisujący produkty wyjściowe.
Tutaj także kluczem jest nazwa zdefiniowana przez użytkownika w grafie, a wartością jest nazwa produktu wynikowego.
Przetwarzanie może zwrócić więcej niż jeden produkt wyjściowy. Jeśli graf użytkownika został przygotowany tak, aby wygenerować wiele wyników, należy podać wszystkie oczekiwane produkty wyjściowe.
Przykład definicji wyjść
{
"outfile_a": "result_file.tiff",
"outfile_b": "result_file.dim"
}
Szablon dla produktów wyjściowych
output_filenames = {
"<klucz produktu wyjściowego>": "<nazwa produktu wyjściowego>",
....
}
Zdefiniuj workflow options
Gdy obiekty JSON dla danych wejściowych i wyjściowych są już przygotowane, utwórz listę WorkflowOptions.
workflow_options = [
{"Name": "s3_endpoint_url", "Value": "<s3_endpoint>"},
{"Name": "s3_access_key", "Value": "<access key do s3>"},
{"Name": "s3_secret_key", "Value": "<secret key do s3>"},
{"Name": "graph_path", "Value": "<ścieżka do grafu>"},
{"Name": "products_in_graph", "Value": products_in_graph},
{"Name": "output_filenames", "Value": output_filenames},
{"Name": "output_storage", "Value": "<TEMPORARY lub PRIVATE>"},
{"Name": "output_dir", "Value": "<ścieżka do miejsca zapisu produktów wyjściowych>"},
]
Znaczenie poszczególnych parametrów:
s3_endpoint_url – adres endpointu S3, do którego został wgrany graf, na przykład:
https://s3.waw4-1.cloudferro.com
s3_access_key – klucz dostępowy do S3
s3_secret_key – tajny klucz do S3
graph_path – ścieżka do grafu w prywatnej przestrzeni, na przykład:
s3://my_super_bucket/graphs/first_graph
products_in_graph – wcześniej przygotowany JSON z produktami wejściowymi
output_filenames – wcześniej przygotowany JSON z produktami wyjściowymi
output_storage – miejsce, w którym mają zostać zapisane produkty wyjściowe
TEMPORARY oznacza zapis do magazynu tymczasowego, a w wyniku użytkownik otrzyma link do pobrania produktu.
PRIVATE oznacza zapis produktu wyjściowego w prywatnym kubełku użytkownika na S3.
Jeśli zostanie wybrana opcja PRIVATE, miejsce przechowywania wyników musi być dokładnie takie samo jak miejsce przechowywania grafu, czyli musi używać tego samego endpointu, kubełka i kluczy dostępowych.
W takim przypadku należy również podać parametr output_dir, który określa folder docelowy dla produktów wyjściowych.
Stwórz body zamówienia
Po przygotowaniu workflow options zbuduj pełne body żądania dla zamówienia.
order_name = f"Zamówienie z grafem użytkownika {datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
{
"WorkflowName": "custom_snap",
"Name": order_name,
"InputProductReference": {
"Reference": ""
},
"WorkflowOptions": workflow_options
}
print()
print("Order body:")
JSON(order_body)
Stwórz zamówienie
Wyślij zamówienie do API, aby rozpocząć przetwarzanie.
url = f"{ordering_url}/ProductionOrder/OData.CSC.Order"
resp = requests.post(url, json.dumps(order_body), headers=api_headers_ordering)
if resp.status_code == 403:
print('regenerate token...')
api_headers_ordering = get_api_headers(username, password)
resp = requests.post(url, json.dumps(order_body), headers=api_headers_ordering)
print(resp.status_code)
order_id = resp.json()['value']['Id']
print('Zamówienie', order_id, 'złożone')
Sprawdź status zamówienia
Po utworzeniu zamówienia sprawdzaj jego status do momentu zakończenia przetwarzania.
Jeśli status ma wartość done, oznacza to, że zamówienie zostało zakończone pomyślnie.
Pole StatusMessage zawiera informacje o potencjalnych błędach i może również zwracać logi z etapu przetwarzania, jeśli ten etap został osiągnięty.
url = f"{ordering_url}/BatchOrder({str(order_id)})"
resp = requests.get(url, headers=api_headers_ordering)
if resp.status_code == 403:
print('regenerate token...')
api_headers_ordering = get_api_headers(username, password)
resp = requests.get(url, headers=api_headers_ordering)
print("Zamówienie", order_id, "status:", resp.json()['value'])