How To Dev - Example: Using Advanced Smart City API in Python

×

Warning message

You can't delete this newsletter because it has not been sent to all its subscribers.

 

See example of the usage of Python for developing MicroService and make it accessible from Proc.Logic /IoT app: https://www.snap4city.org/645

See also Python usage of API: https://www.snap4city.org/829


get token in Python


import requests

# this is only an example do not use in production,

# consider that each call it will create a new session on keycloak for each request,

# the token should be reused until it is expired and when expired it can be 'refreshed' using the refresh token

 

def getTokenViaUserCredentials(username,password):

    payload = {

        'f': 'json',

        'client_id': xxxxx-tool',

        'grant_type': 'password',

        'username': username,

        'password': password

    }

 

    header = {

        'Content-Type': 'application/x-www-form-urlencoded'

    }

 

    urlToken = "https://www.snap4city.org/auth/realms/master/protocol/openid-connect/token"

    response = requests.request("POST", urlToken, data=payload, headers=header)

    token = response.json()

    return token

 

def getTokenViaRefreshToken(old_token):

    if not 'refresh_token' in old_token:

        return None

   

    payload = {

                'f': 'json',

                'client_id': xxxxx-tool',

                'grant_type': 'refresh_token',

              'refresh_token': old_token['refresh_token']

               }

 

    header = {

        'Content-Type': 'application/x-www-form-urlencoded'

    }

 

    urlToken = "https://www.snap4city.org/auth/realms/master/protocol/openid-connect/token"

    response = requests.request("POST", urlToken, data=payload, headers=header)

    token = response.json()

    return token


Access data via Smart City API: get historical and last data via ServiceURI, in Python


#get the token of the user

token = getTokenViaUserCredentials ('…user…','…password…')

print('token', token)

if 'access_token' in token :

    head = {

                "Content-Type": "application/json",

                "Authorization": f"Bearer {token['access_token']}"

            }

    api_url ="https://<xxxxx>/ServiceMap/api/v1/"

    service_uri = "http://www.disit.org/km4city/resource/iot/<xxxxx>/<xxxxx>/ccccc_Device_01"

    response = requests.request("GET", api_url + "?serviceUri=" + service_uri +"&fromTime=60-day", headers=head)

    print(response.json())

else:

    print('failed getting access token', token)


Send data on platform, via Broker: send/update values of a device, in Python


if 'access_token' in token :

    head = {

                "Content-Type": "application/json",

                "Authorization": f"Bearer {token['access_token']}"

            }

    api_url = "https://iot-app.snap4city.org/orion-broker-<xxxxx>/v2/"

    device_id = "ccccc_Device_01"

    payload = {

        "dateObserved": {

            "value": "2023-01-27T01:01:02.000Z",

            "type": "string"

          },

          "FatigueValue": {

            "value": 13.1,

            "type": "number"

      }   

    }

    response = requests.request("PATCH", api_url + "entities/" + device_id +"/attrs?type=Sensor&elementid=" + device_id, json=payload, headers=head)

    if response.status_code!=204 :

        print(response.json())

    else:

        print(device_id+" updated!")

else:

    print('failed getting access token', token)