Module scenariogeneration.xosc.xosc_reader
scenariogeneration https://github.com/pyoscx/scenariogeneration
This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at https://mozilla.org/MPL/2.0/.
Copyright (c) 2022 The scenariogeneration Authors.
Functions
def CatalogReader(catalog_reference, catalog_path)-
Expand source code
def CatalogReader(catalog_reference, catalog_path): """CatalogReader is a function that will read a openscenario catalog and return the corresponding scenariogeneration.xosc object. Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry NOTE: only Vehicle, and Pedestrian is implemented Parameters ---------- catalog_reference (CatalogReference): the catalog reference needed catalog_path (str): path to the catalog Returns ------- The catalog entry """ # TODO: add a raised error if the catalog doesn't contain the correct data loaded_catalog = catalog_reference.catalogname with open( os.path.join(catalog_path, catalog_reference.catalogname + ".xosc"), "r", ) as f: loaded_catalog = ET.parse(f) catalog = find_mandatory_field(loaded_catalog, "Catalog") for entry in catalog: if entry.tag == "Vehicle": if entry.attrib["name"] == catalog_reference.entryname: return Vehicle.parse(entry) elif entry.tag == "Pedestrian": if entry.attrib["name"] == catalog_reference.entryname: return Pedestrian.parse(entry) elif entry.tag == "Controller": if entry.attrib["name"] == catalog_reference.entryname: return Controller.parse(entry) elif entry.tag == "MiscObject": if entry.attrib["name"] == catalog_reference.entryname: return MiscObject.parse(entry) elif entry.tag == "Environment": if entry.attrib["name"] == catalog_reference.entryname: return Environment.parse(entry) elif entry.tag == "Maneuver": if entry.attrib["name"] == catalog_reference.entryname: return Maneuver.parse(entry) elif entry.tag == "Trajectory": if entry.attrib["name"] == catalog_reference.entryname: return Trajectory.parse(entry) elif entry.tag == "Route": if entry.attrib["name"] == catalog_reference.entryname: return Route.parse(entry) else: raise NotImplementedError( "This catalogtype is not supported yet." ) raise NoCatalogFoundError( "A catalog entry with the name " + catalog_reference.entryname + " could not be found in the given Catalog." )CatalogReader is a function that will read a openscenario catalog and return the corresponding scenariogeneration.xosc object.
Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry
NOTE: only Vehicle, and Pedestrian is implemented
Parameters
catalog_reference (CatalogReference): the catalog reference needed catalog_path (str): path to the catalogReturns
The catalog entry def ParameterDeclarationReader(file_path)-
Expand source code
def ParameterDeclarationReader(file_path): """ParameterDeclarationReader reads the parameter declaration of a xosc file and creates a ParameterDeclaration object from it. Parameters ---------- file_path (str): path to the xosc file wanted to be parsed """ param_decl = ParameterDeclarations() with open(file_path, "r") as f: loaded_xosc = ET.parse(f) paramdec = find_mandatory_field(loaded_xosc, "ParameterDeclarations") param_decl = ParameterDeclarations.parse(paramdec) return param_declParameterDeclarationReader reads the parameter declaration of a xosc file and creates a ParameterDeclaration object from it.
Parameters
file_path (str): path to the xosc file wanted to be parsed def ParseOpenScenario(file_path)-
Expand source code
def ParseOpenScenario(file_path): """ParseOpenScenario parses a openscenario file (of any type) and returns the python object. Parameters ---------- file_path (str): full path to the .xosc file Returns ------- xosc_object (Scenario, Catalog, or ParameterValueDistribution) """ with open(file_path, "r", encoding="utf-8") as f: loaded_xosc = ET.parse(f) if not validate_schema(loaded_xosc): warnings.warn( "The provided file is not valid according to the OpenSCENARIO schema." ) if loaded_xosc.find("ParameterValueDistribution") is not None: return ParameterValueDistribution.parse(loaded_xosc) elif loaded_xosc.find("Catalog") is not None: return Catalog.parse(loaded_xosc) elif loaded_xosc.find("Storyboard") is not None: return Scenario.parse(loaded_xosc) else: raise NotAValidElement( "The provided file is not on a OpenSCENARIO compatible format." )ParseOpenScenario parses a openscenario file (of any type) and returns the python object.
Parameters
file_path (str): full path to the .xosc fileReturns
xosc_object (Scenario, Catalog, or ParameterValueDistribution) def validate_schema(loaded_xosc: xml.etree.ElementTree.ElementTree) ‑> bool-
Expand source code
def validate_schema(loaded_xosc: ET.ElementTree) -> bool: """validate_schema checks if the loaded xosc file is valid according to the OpenSCENARIO schema. It returns True if valid, False otherwise. Parameters ---------- loaded_xosc (ElementTree): loaded xosc file Returns ------- matched (bool): True if valid, False otherwise """ minor_version = loaded_xosc.find("FileHeader").attrib["revMinor"] major_version = loaded_xosc.find("FileHeader").attrib["revMajor"] print(f"OpenSCENARIO version detected: {major_version}.{minor_version}") if int(minor_version) > 3 or int(major_version) != 1: raise ValueError( f"OpenSCENARIO version {major_version}.{minor_version} does not exist in the schema files provided." ) if minor_version == "3" and major_version == "1": minor_version = "3_1" xsd_path = os.path.join( Path(__file__).parent.parent.parent, "schemas", "OpenSCENARIO_" + major_version + "_" + minor_version + ".xsd", ) schema = xmlschema.XMLSchema(xsd_path) matched = schema.is_valid(loaded_xosc) return matchedvalidate_schema checks if the loaded xosc file is valid according to the OpenSCENARIO schema. It returns True if valid, False otherwise. Parameters
loaded_xosc (ElementTree): loaded xosc fileReturns
matched (bool): True if valid, False otherwise
Classes
class CatalogLoader-
Expand source code
class CatalogLoader: """CatalogLoader makes it possible to read certain elements from a catalog. Attributes ---------- all_catalogs (dict with all catalogs): all catalogs loaded Methods ------- load_catalog(catalog_reference,catalog_path) loads a catalog that can be parsed later on get_entry(catalog_reference) reads a loaded catalog and returns the object """ def __init__(self): """CatalogLoader makes it possible to read certain elements from a catalog. Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry """ self.all_catalogs = {} def load_catalog(self, catalog_reference, catalog_path): """CatalogLoader makes it possible to read certain elements from a catalog. Parameters ---------- catalog_reference (CatalogReference or str): name/reference to the catalog catalog_path (str): path to the catalog """ if isinstance(catalog_reference, CatalogReference): fullpath = os.path.join( catalog_path, catalog_reference.catalogname + ".xosc" ) name_ref = catalog_reference.catalogname else: fullpath = os.path.join(catalog_path, catalog_reference + ".xosc") name_ref = catalog_reference with open(fullpath, "r", encoding="utf-8") as f: catalog_element = find_mandatory_field(ET.parse(f), "Catalog") self.all_catalogs[name_ref] = catalog_element def parse(self, catalog_reference): """Parse reads reads a specific entry from a loaded catalog. Parameters ---------- catalog_reference (CatalogReference): reference to the catalog Returns ------- The catalog entry """ if not catalog_reference.catalogname in self.all_catalogs: raise NoCatalogFoundError( "Catalog " + catalog_reference.catalogname + " is not loaded yet." ) catalog = self.all_catalogs[catalog_reference.catalogname] for entry in catalog: if entry.tag == "Vehicle": if entry.attrib["name"] == catalog_reference.entryname: return Vehicle.parse(entry) elif entry.tag == "Pedestrian": if entry.attrib["name"] == catalog_reference.entryname: return Pedestrian.parse(entry) elif entry.tag == "Controller": if entry.attrib["name"] == catalog_reference.entryname: return Controller.parse(entry) elif entry.tag == "MiscObject": if entry.attrib["name"] == catalog_reference.entryname: return MiscObject.parse(entry) elif entry.tag == "Environment": if entry.attrib["name"] == catalog_reference.entryname: return Environment.parse(entry) elif entry.tag == "Maneuver": if entry.attrib["name"] == catalog_reference.entryname: return Maneuver.parse(entry) elif entry.tag == "Trajectory": if entry.attrib["name"] == catalog_reference.entryname: return Trajectory.parse(entry) elif entry.tag == "Route": if entry.attrib["name"] == catalog_reference.entryname: return Route.parse(entry) raise NotImplementedError("This catalogtype is not supported yet.") def read_entry(self, catalog_reference, catalog_path): """read_entry loads and reads a catalog directly (both load_catalog, and parse) The catalog will still be loaded and can be used with parse after this. Parameters ---------- catalog_reference (CatalogReference): reference to the catalog catalog_path (str): path to the catalog """ self.load_catalog(catalog_reference, catalog_path) return self.parse(catalog_reference)CatalogLoader makes it possible to read certain elements from a catalog.
Attributes
all_catalogs (dict with all catalogs): all catalogs loadedMethods
load_catalog(catalog_reference,catalog_path) loads a catalog that can be parsed later on get_entry(catalog_reference) reads a loaded catalog and returns the objectCatalogLoader makes it possible to read certain elements from a catalog.
Main use case for this is to be able to parametrize and write scenarios based on a catalog based entry
Methods
def load_catalog(self, catalog_reference, catalog_path)-
Expand source code
def load_catalog(self, catalog_reference, catalog_path): """CatalogLoader makes it possible to read certain elements from a catalog. Parameters ---------- catalog_reference (CatalogReference or str): name/reference to the catalog catalog_path (str): path to the catalog """ if isinstance(catalog_reference, CatalogReference): fullpath = os.path.join( catalog_path, catalog_reference.catalogname + ".xosc" ) name_ref = catalog_reference.catalogname else: fullpath = os.path.join(catalog_path, catalog_reference + ".xosc") name_ref = catalog_reference with open(fullpath, "r", encoding="utf-8") as f: catalog_element = find_mandatory_field(ET.parse(f), "Catalog") self.all_catalogs[name_ref] = catalog_elementCatalogLoader makes it possible to read certain elements from a catalog.
Parameters
catalog_reference (CatalogReference or str): name/reference to the catalog catalog_path (str): path to the catalog def parse(self, catalog_reference)-
Expand source code
def parse(self, catalog_reference): """Parse reads reads a specific entry from a loaded catalog. Parameters ---------- catalog_reference (CatalogReference): reference to the catalog Returns ------- The catalog entry """ if not catalog_reference.catalogname in self.all_catalogs: raise NoCatalogFoundError( "Catalog " + catalog_reference.catalogname + " is not loaded yet." ) catalog = self.all_catalogs[catalog_reference.catalogname] for entry in catalog: if entry.tag == "Vehicle": if entry.attrib["name"] == catalog_reference.entryname: return Vehicle.parse(entry) elif entry.tag == "Pedestrian": if entry.attrib["name"] == catalog_reference.entryname: return Pedestrian.parse(entry) elif entry.tag == "Controller": if entry.attrib["name"] == catalog_reference.entryname: return Controller.parse(entry) elif entry.tag == "MiscObject": if entry.attrib["name"] == catalog_reference.entryname: return MiscObject.parse(entry) elif entry.tag == "Environment": if entry.attrib["name"] == catalog_reference.entryname: return Environment.parse(entry) elif entry.tag == "Maneuver": if entry.attrib["name"] == catalog_reference.entryname: return Maneuver.parse(entry) elif entry.tag == "Trajectory": if entry.attrib["name"] == catalog_reference.entryname: return Trajectory.parse(entry) elif entry.tag == "Route": if entry.attrib["name"] == catalog_reference.entryname: return Route.parse(entry) raise NotImplementedError("This catalogtype is not supported yet.")Parse reads reads a specific entry from a loaded catalog.
Parameters
catalog_reference (CatalogReference): reference to the catalogReturns
The catalog entry def read_entry(self, catalog_reference, catalog_path)-
Expand source code
def read_entry(self, catalog_reference, catalog_path): """read_entry loads and reads a catalog directly (both load_catalog, and parse) The catalog will still be loaded and can be used with parse after this. Parameters ---------- catalog_reference (CatalogReference): reference to the catalog catalog_path (str): path to the catalog """ self.load_catalog(catalog_reference, catalog_path) return self.parse(catalog_reference)read_entry loads and reads a catalog directly (both load_catalog, and parse)
The catalog will still be loaded and can be used with parse after this.
Parameters
catalog_reference (CatalogReference): reference to the catalog catalog_path (str): path to the catalog