Module scenariogeneration.xosc.xosc_reader

scenariogeneration https://github.com/pyoscx/scenariogeneration

This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at https://mozilla.org/MPL/2.0/.

Copyright (c) 2022 The scenariogeneration Authors.

Expand source code
"""
  scenariogeneration
  https://github.com/pyoscx/scenariogeneration
 
  This Source Code Form is subject to the terms of the Mozilla Public
  License, v. 2.0. If a copy of the MPL was not distributed with this
  file, You can obtain one at https://mozilla.org/MPL/2.0/.
 
  Copyright (c) 2022 The scenariogeneration Authors.

"""
import xml.etree.ElementTree as ET
import os


from .parameters import ParameterValueDistribution
from .scenario import Scenario, Catalog
from .exceptions import NoCatalogFoundError, NotAValidElement
from .entities import Vehicle, Pedestrian, MiscObject
from .utils import ParameterDeclarations, Controller, Environment, CatalogReference
from .storyboard import Maneuver
from .position import Trajectory, Route


class CatalogLoader:
    """CatalogLoader makes it possible to read certain elements from a catalog


    Attributes
    ----------

        all_catalogs (dict with all catalogs): all catalogs loaded

    Methods
    -------
        load_catalog(catalog_reference,catalog_path)
            loads a catalog that can be parsed later on

        get_entry(catalog_reference)
            reads a loaded catalog and returns the object
    """

    def __init__(self):
        """CatalogLoader makes it possible to read certain elements from a catalog

        Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry

        """
        self.all_catalogs = {}

    def load_catalog(self, catalog_reference, catalog_path):
        """CatalogLoader makes it possible to read certain elements from a catalog

        Parameters
        ----------
            catalog_reference (CatalogReference or str): name/reference to the catalog

            catalog_path (str): path to the catalog
        """
        if isinstance(catalog_reference, CatalogReference):
            fullpath = os.path.join(
                catalog_path, catalog_reference.catalogname + ".xosc"
            )
            name_ref = catalog_reference.catalogname
        else:
            fullpath = os.path.join(catalog_path, catalog_reference + ".xosc")
            name_ref = catalog_reference

        with open(fullpath, "r") as f:
            catalog_element = ET.parse(f).find("Catalog")
            self.all_catalogs[name_ref] = catalog_element

    def parse(self, catalog_reference):
        """parse reads reads a specific entry from a loaded catalog

        Parameters
        ----------
            catalog_reference (CatalogReference): reference to the catalog

        Returns
        -------
            The catalog entry

        """
        if not catalog_reference.catalogname in self.all_catalogs:
            raise NoCatalogFoundError(
                "Catalog " + catalog_reference.catalogname + " is not loaded yet."
            )
        catalog = self.all_catalogs[catalog_reference.catalogname]
        for entry in catalog:
            if entry.tag == "Vehicle":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Vehicle.parse(entry)
            elif entry.tag == "Pedestrian":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Pedestrian.parse(entry)
            elif entry.tag == "Controller":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Controller.parse(entry)
            elif entry.tag == "MiscObject":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return MiscObject.parse(entry)
            elif entry.tag == "Environment":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Environment.parse(entry)
            elif entry.tag == "Maneuver":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Maneuver.parse(entry)
            elif entry.tag == "Trajectory":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Trajectory.parse(entry)
            elif entry.tag == "Route":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Route.parse(entry)
            else:
                raise NotImplementedError("This catalogtype is not supported yet.")

    def read_entry(self, catalog_reference, catalog_path):
        """read_entry loads and reads a catalog directly (both load_catalog, and parse)

        The catalog will still be loaded and can be used with parse after this.

        Parameters
        ----------
            catalog_reference (CatalogReference): reference to the catalog

            catalog_path (str): path to the catalog
        """
        self.load_catalog(catalog_reference, catalog_path)
        return self.parse(catalog_reference)


def CatalogReader(catalog_reference, catalog_path):
    """CatalogReader is a function that will read a openscenario catalog and return the corresponding scenariogeneration.xosc object

    Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry

    NOTE: only Vehicle, and Pedestrian is implemented

    Parameters
    ----------
        catalog_reference (CatalogReference): the catalog reference needed

        catalog_path (str): path to the catalog

    Returns
    -------
        The catalog entry
    """

    # TODO: add a raised error if the catalog doesn't contain the correct data
    loaded_catalog = catalog_reference.catalogname

    with open(
        os.path.join(catalog_path, catalog_reference.catalogname + ".xosc"), "r"
    ) as f:
        loaded_catalog = ET.parse(f)

        catalog = loaded_catalog.find("Catalog")

        for entry in catalog:
            if entry.tag == "Vehicle":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Vehicle.parse(entry)
            elif entry.tag == "Pedestrian":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Pedestrian.parse(entry)
            elif entry.tag == "Controller":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Controller.parse(entry)
            elif entry.tag == "MiscObject":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return MiscObject.parse(entry)
            elif entry.tag == "Environment":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Environment.parse(entry)
            elif entry.tag == "Maneuver":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Maneuver.parse(entry)
            elif entry.tag == "Trajectory":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Trajectory.parse(entry)
            elif entry.tag == "Route":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Route.parse(entry)
            else:
                raise NotImplementedError("This catalogtype is not supported yet.")

        raise NoCatalogFoundError(
            "A catalog entry with the name "
            + catalog_reference.entryname
            + " could not be found in the given Catalog."
        )


def ParameterDeclarationReader(file_path):
    """ParameterDeclarationReader reads the parameter declaration of a xosc file and creates a ParameterDeclaration object from it

    Parameters
    ----------
        file_path (str): path to the xosc file wanted to be parsed

    """
    param_decl = ParameterDeclarations()
    with open(file_path, "r") as f:
        loaded_xosc = ET.parse(f)
        paramdec = loaded_xosc.find("ParameterDeclarations")
        param_decl = ParameterDeclarations.parse(paramdec)

    return param_decl


def ParseOpenScenario(file_path):
    """ParseOpenScenario parses a openscenario file (of any type) and returns the python object

    Parameters
    ----------
        file_path (str): full path to the .xosc file

    Returns
    -------
        xosc_object (Scenario, Catalog, or ParameterValueDistribution)
    """
    with open(file_path, "r") as f:
        loaded_xosc = ET.parse(f)
        if loaded_xosc.find("ParameterValueDistribution") is not None:
            return ParameterValueDistribution.parse(loaded_xosc)
        elif loaded_xosc.find("Catalog") is not None:
            return Catalog.parse(loaded_xosc)
        elif loaded_xosc.find("Storyboard") is not None:
            return Scenario.parse(loaded_xosc)
        else:
            raise NotAValidElement(
                "The provided file is not on a OpenSCENARIO compatible format."
            )

Functions

def CatalogReader(catalog_reference, catalog_path)

CatalogReader is a function that will read a openscenario catalog and return the corresponding scenariogeneration.xosc object

Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry

NOTE: only Vehicle, and Pedestrian is implemented

Parameters

catalog_reference (CatalogReference): the catalog reference needed

catalog_path (str): path to the catalog

Returns

The catalog entry
Expand source code
def CatalogReader(catalog_reference, catalog_path):
    """CatalogReader is a function that will read a openscenario catalog and return the corresponding scenariogeneration.xosc object

    Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry

    NOTE: only Vehicle, and Pedestrian is implemented

    Parameters
    ----------
        catalog_reference (CatalogReference): the catalog reference needed

        catalog_path (str): path to the catalog

    Returns
    -------
        The catalog entry
    """

    # TODO: add a raised error if the catalog doesn't contain the correct data
    loaded_catalog = catalog_reference.catalogname

    with open(
        os.path.join(catalog_path, catalog_reference.catalogname + ".xosc"), "r"
    ) as f:
        loaded_catalog = ET.parse(f)

        catalog = loaded_catalog.find("Catalog")

        for entry in catalog:
            if entry.tag == "Vehicle":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Vehicle.parse(entry)
            elif entry.tag == "Pedestrian":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Pedestrian.parse(entry)
            elif entry.tag == "Controller":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Controller.parse(entry)
            elif entry.tag == "MiscObject":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return MiscObject.parse(entry)
            elif entry.tag == "Environment":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Environment.parse(entry)
            elif entry.tag == "Maneuver":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Maneuver.parse(entry)
            elif entry.tag == "Trajectory":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Trajectory.parse(entry)
            elif entry.tag == "Route":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Route.parse(entry)
            else:
                raise NotImplementedError("This catalogtype is not supported yet.")

        raise NoCatalogFoundError(
            "A catalog entry with the name "
            + catalog_reference.entryname
            + " could not be found in the given Catalog."
        )
def ParameterDeclarationReader(file_path)

ParameterDeclarationReader reads the parameter declaration of a xosc file and creates a ParameterDeclaration object from it

Parameters

file_path (str): path to the xosc file wanted to be parsed
Expand source code
def ParameterDeclarationReader(file_path):
    """ParameterDeclarationReader reads the parameter declaration of a xosc file and creates a ParameterDeclaration object from it

    Parameters
    ----------
        file_path (str): path to the xosc file wanted to be parsed

    """
    param_decl = ParameterDeclarations()
    with open(file_path, "r") as f:
        loaded_xosc = ET.parse(f)
        paramdec = loaded_xosc.find("ParameterDeclarations")
        param_decl = ParameterDeclarations.parse(paramdec)

    return param_decl
def ParseOpenScenario(file_path)

ParseOpenScenario parses a openscenario file (of any type) and returns the python object

Parameters

file_path (str): full path to the .xosc file

Returns

xosc_object (Scenario, Catalog, or ParameterValueDistribution)
Expand source code
def ParseOpenScenario(file_path):
    """ParseOpenScenario parses a openscenario file (of any type) and returns the python object

    Parameters
    ----------
        file_path (str): full path to the .xosc file

    Returns
    -------
        xosc_object (Scenario, Catalog, or ParameterValueDistribution)
    """
    with open(file_path, "r") as f:
        loaded_xosc = ET.parse(f)
        if loaded_xosc.find("ParameterValueDistribution") is not None:
            return ParameterValueDistribution.parse(loaded_xosc)
        elif loaded_xosc.find("Catalog") is not None:
            return Catalog.parse(loaded_xosc)
        elif loaded_xosc.find("Storyboard") is not None:
            return Scenario.parse(loaded_xosc)
        else:
            raise NotAValidElement(
                "The provided file is not on a OpenSCENARIO compatible format."
            )

Classes

class CatalogLoader

CatalogLoader makes it possible to read certain elements from a catalog

Attributes

all_catalogs (dict with all catalogs): all catalogs loaded

Methods

load_catalog(catalog_reference,catalog_path)
    loads a catalog that can be parsed later on

get_entry(catalog_reference)
    reads a loaded catalog and returns the object

CatalogLoader makes it possible to read certain elements from a catalog

Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry

Expand source code
class CatalogLoader:
    """CatalogLoader makes it possible to read certain elements from a catalog


    Attributes
    ----------

        all_catalogs (dict with all catalogs): all catalogs loaded

    Methods
    -------
        load_catalog(catalog_reference,catalog_path)
            loads a catalog that can be parsed later on

        get_entry(catalog_reference)
            reads a loaded catalog and returns the object
    """

    def __init__(self):
        """CatalogLoader makes it possible to read certain elements from a catalog

        Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry

        """
        self.all_catalogs = {}

    def load_catalog(self, catalog_reference, catalog_path):
        """CatalogLoader makes it possible to read certain elements from a catalog

        Parameters
        ----------
            catalog_reference (CatalogReference or str): name/reference to the catalog

            catalog_path (str): path to the catalog
        """
        if isinstance(catalog_reference, CatalogReference):
            fullpath = os.path.join(
                catalog_path, catalog_reference.catalogname + ".xosc"
            )
            name_ref = catalog_reference.catalogname
        else:
            fullpath = os.path.join(catalog_path, catalog_reference + ".xosc")
            name_ref = catalog_reference

        with open(fullpath, "r") as f:
            catalog_element = ET.parse(f).find("Catalog")
            self.all_catalogs[name_ref] = catalog_element

    def parse(self, catalog_reference):
        """parse reads reads a specific entry from a loaded catalog

        Parameters
        ----------
            catalog_reference (CatalogReference): reference to the catalog

        Returns
        -------
            The catalog entry

        """
        if not catalog_reference.catalogname in self.all_catalogs:
            raise NoCatalogFoundError(
                "Catalog " + catalog_reference.catalogname + " is not loaded yet."
            )
        catalog = self.all_catalogs[catalog_reference.catalogname]
        for entry in catalog:
            if entry.tag == "Vehicle":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Vehicle.parse(entry)
            elif entry.tag == "Pedestrian":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Pedestrian.parse(entry)
            elif entry.tag == "Controller":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Controller.parse(entry)
            elif entry.tag == "MiscObject":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return MiscObject.parse(entry)
            elif entry.tag == "Environment":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Environment.parse(entry)
            elif entry.tag == "Maneuver":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Maneuver.parse(entry)
            elif entry.tag == "Trajectory":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Trajectory.parse(entry)
            elif entry.tag == "Route":
                if entry.attrib["name"] == catalog_reference.entryname:
                    return Route.parse(entry)
            else:
                raise NotImplementedError("This catalogtype is not supported yet.")

    def read_entry(self, catalog_reference, catalog_path):
        """read_entry loads and reads a catalog directly (both load_catalog, and parse)

        The catalog will still be loaded and can be used with parse after this.

        Parameters
        ----------
            catalog_reference (CatalogReference): reference to the catalog

            catalog_path (str): path to the catalog
        """
        self.load_catalog(catalog_reference, catalog_path)
        return self.parse(catalog_reference)

Methods

def load_catalog(self, catalog_reference, catalog_path)

CatalogLoader makes it possible to read certain elements from a catalog

Parameters

catalog_reference (CatalogReference or str): name/reference to the catalog

catalog_path (str): path to the catalog
Expand source code
def load_catalog(self, catalog_reference, catalog_path):
    """CatalogLoader makes it possible to read certain elements from a catalog

    Parameters
    ----------
        catalog_reference (CatalogReference or str): name/reference to the catalog

        catalog_path (str): path to the catalog
    """
    if isinstance(catalog_reference, CatalogReference):
        fullpath = os.path.join(
            catalog_path, catalog_reference.catalogname + ".xosc"
        )
        name_ref = catalog_reference.catalogname
    else:
        fullpath = os.path.join(catalog_path, catalog_reference + ".xosc")
        name_ref = catalog_reference

    with open(fullpath, "r") as f:
        catalog_element = ET.parse(f).find("Catalog")
        self.all_catalogs[name_ref] = catalog_element
def parse(self, catalog_reference)

parse reads reads a specific entry from a loaded catalog

Parameters

catalog_reference (CatalogReference): reference to the catalog

Returns

The catalog entry
Expand source code
def parse(self, catalog_reference):
    """parse reads reads a specific entry from a loaded catalog

    Parameters
    ----------
        catalog_reference (CatalogReference): reference to the catalog

    Returns
    -------
        The catalog entry

    """
    if not catalog_reference.catalogname in self.all_catalogs:
        raise NoCatalogFoundError(
            "Catalog " + catalog_reference.catalogname + " is not loaded yet."
        )
    catalog = self.all_catalogs[catalog_reference.catalogname]
    for entry in catalog:
        if entry.tag == "Vehicle":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Vehicle.parse(entry)
        elif entry.tag == "Pedestrian":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Pedestrian.parse(entry)
        elif entry.tag == "Controller":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Controller.parse(entry)
        elif entry.tag == "MiscObject":
            if entry.attrib["name"] == catalog_reference.entryname:
                return MiscObject.parse(entry)
        elif entry.tag == "Environment":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Environment.parse(entry)
        elif entry.tag == "Maneuver":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Maneuver.parse(entry)
        elif entry.tag == "Trajectory":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Trajectory.parse(entry)
        elif entry.tag == "Route":
            if entry.attrib["name"] == catalog_reference.entryname:
                return Route.parse(entry)
        else:
            raise NotImplementedError("This catalogtype is not supported yet.")
def read_entry(self, catalog_reference, catalog_path)

read_entry loads and reads a catalog directly (both load_catalog, and parse)

The catalog will still be loaded and can be used with parse after this.

Parameters

catalog_reference (CatalogReference): reference to the catalog

catalog_path (str): path to the catalog
Expand source code
def read_entry(self, catalog_reference, catalog_path):
    """read_entry loads and reads a catalog directly (both load_catalog, and parse)

    The catalog will still be loaded and can be used with parse after this.

    Parameters
    ----------
        catalog_reference (CatalogReference): reference to the catalog

        catalog_path (str): path to the catalog
    """
    self.load_catalog(catalog_reference, catalog_path)
    return self.parse(catalog_reference)