Module scenariogeneration.xodr.junction_creator
scenariogeneration https://github.com/pyoscx/scenariogeneration
This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at https://mozilla.org/MPL/2.0/.
Copyright (c) 2022 The scenariogeneration Authors.
Expand source code
"""
scenariogeneration
https://github.com/pyoscx/scenariogeneration
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at https://mozilla.org/MPL/2.0/.
Copyright (c) 2022 The scenariogeneration Authors.
"""
from .enumerations import JunctionType, ElementType, ContactPoint
from .geometry import Spiral, Line
from .generators import (
create_road,
_get_related_lanesection,
_create_junction_links,
LaneDef,
)
from .links import Junction, Connection
from .exceptions import (
NotEnoughInputArguments,
UndefinedRoadNetwork,
NotSameAmountOfLanesError,
MixingDrivingDirection,
)
import pyclothoids as pcloth
import numpy as np
STD_START_CLOTH = 1 / 1000000000
class CommonJunctionCreator:
"""CommonJunctionCreator is a helper class to create custom common junctions.
Parameters
----------
id (int): the id of the junction
name (str): name of the junction
startnum (int): the starting id of this junctions roads
Default: 100
Attributes
----------
id (int): the id of the junction
startnum (int): the starting id of this junctions roads
incoming_roads (list of Road): all incoming roads for the junction
junction_roads (list of Road): all generated connecting roads
junction (Junction): the junction xodr element for the junction
Methods
-------
add_incoming_road_circular_geometry(road, radius, angle, road_connection)
Adds a road on a circle defining the junction geometry
Note: cannot be used together with 'add_incoming_road_cartesian_geometry'
add_incoming_road_cartesian_geometry(road, x, y, heading, road_connection)
Adds a road on a generic x, y, heading geometry
Note: cannot be used together with 'add_incoming_road_circular_geometry'
add_connection(first_road_id, second_road_id, first_lane_id, second_lane_id)
"""
def __init__(self, id, name, startnum=100):
self.id = id
self.incoming_roads = []
self._radie = []
self._angles = []
self._x = []
self._y = []
self._h = []
self.junction_roads = []
self._number_of_left_lanes = []
self._number_of_right_lanes = []
self.startnum = startnum
self.junction = Junction(name, id, junction_type=JunctionType.default)
self._circular_junction = False
self._generic_junction = False
def add_incoming_road_circular_geometry(
self, road, radius, angle, road_connection=None
):
"""add_incoming_road_circular_geometry adds an incoming road to a junction, assuming a cirular geometry of the junction,
Meaning all roads will be placed on a circle based on the radius and angle.
The radius can be different from different incoming roads, however the origin stays the same.
NOTE: Note, this method can not be used toghether with add_incoming_road_cartesian_geometry
Parameters
----------
road (Road): the incoming road
radius (float): the radius on where to put the road
angle (float): the angle on where to put the road
road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor'
Default: None
"""
self.incoming_roads.append(road)
self._handle_connection_input(road, road_connection)
self._radie.append(radius)
self._angles.append(angle)
self._circular_junction = True
def add_incoming_road_cartesian_geometry(
self, road, x, y, heading, road_connection=None
):
"""add_incoming_road_cartesian_geometry adds an incoming road to a junction, assuming a
local coordinate system for the junction.
NOTE: Note, this method can not be used toghether with add_incoming_road_circular_geometry
Parameters
----------
road (Road): the incoming road
x (float): the local x-position of the road
y (float): the local y-position of the road
heading (float): the local heading of the road (pointing in to the junction)
road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor'
Default: None
"""
self.incoming_roads.append(road)
self._handle_connection_input(road, road_connection)
self._x.append(x)
self._y.append(y)
self._h.append(heading)
self._generic_junction = True
def _get_minimum_lanes_to_connect(self, incoming_road, linked_road):
(
incoming_connection,
incoming_sign,
incoming_lane_section,
) = _get_related_lanesection(incoming_road, linked_road)
linked_connection, sign, linked_lane_section = _get_related_lanesection(
linked_road, incoming_road
)
incoming_left_lanes = len(
incoming_road.lanes.lanesections[incoming_lane_section].leftlanes
)
incoming_right_lanes = len(
incoming_road.lanes.lanesections[incoming_lane_section].rightlanes
)
linked_left_lanes = len(
linked_road.lanes.lanesections[linked_lane_section].leftlanes
)
linked_right_lanes = len(
linked_road.lanes.lanesections[linked_lane_section].rightlanes
)
_incoming_lane_ids = []
_linked_lane_ids = []
if sign > 0:
_incoming_lane_ids.extend(
[
x
for x in range(
1, min(incoming_left_lanes, linked_left_lanes) + 1, 1
)
]
)
_linked_lane_ids.extend(
[
x
for x in range(
1, min(incoming_left_lanes, linked_left_lanes) + 1, 1
)
]
)
_incoming_lane_ids.extend(
[
-x
for x in range(
1, min(incoming_right_lanes, linked_right_lanes) + 1, 1
)
]
)
_linked_lane_ids.extend(
[
-x
for x in range(
1, min(incoming_right_lanes, linked_right_lanes) + 1, 1
)
]
)
elif sign < 0:
_incoming_lane_ids.extend(
[
x
for x in range(
1, min(incoming_left_lanes, linked_right_lanes) + 1, 1
)
]
)
_linked_lane_ids.extend(
[
-x
for x in range(
1, min(incoming_left_lanes, linked_right_lanes) + 1, 1
)
]
)
_incoming_lane_ids.extend(
[
-x
for x in range(
1, min(incoming_right_lanes, linked_left_lanes) + 1, 1
)
]
)
_linked_lane_ids.extend(
[
x
for x in range(
1, min(incoming_right_lanes, linked_left_lanes) + 1, 1
)
]
)
return _incoming_lane_ids, _linked_lane_ids
def add_connection(
self, road_one_id, road_two_id, lane_one_id=None, lane_two_id=None
):
"""add_connection adds a connection between two roads by generating three clothoids to fit
their given positions in the local coordinate system (circular or cartesian)
All roads has to be added to the junction with either:
add_incoming_road_cartesian_geometry
add_incoming_road_circular_geometry
NOTE: if no lane ids are provided, add_connection will add as many connections as the two roads have in common
Parameters
----------
road_one_id (int): the id of the first road to connect
road_two_id (int): the id of the second road to connect
lane_one_id (int or list of int): the lane id(s) on the first road to connect
lane_two_id (int or list of int): the lane id(s) on the second road to connect
"""
if (lane_one_id == None) and (lane_two_id == None):
# check if the connection is symetric (same number of lanes on both sides)
if (
self._number_of_left_lanes[self._get_list_index(road_one_id)]
== self._number_of_right_lanes[self._get_list_index(road_one_id)]
and self._number_of_left_lanes[self._get_list_index(road_two_id)]
== self._number_of_right_lanes[self._get_list_index(road_two_id)]
and self._number_of_left_lanes[self._get_list_index(road_one_id)]
== self._number_of_right_lanes[self._get_list_index(road_two_id)]
):
self._create_connecting_roads_with_equal_lanes(road_one_id, road_two_id)
else:
self._create_connecting_roads_unequal_lanes(road_one_id, road_two_id)
elif lane_one_id is not None and lane_two_id is not None:
if isinstance(lane_one_id, list):
for i in range(len(lane_one_id)):
self._create_connecting_road_with_lane_input(
road_one_id, road_two_id, lane_one_id[i], lane_two_id[i]
)
else:
self._create_connecting_road_with_lane_input(
road_one_id, road_two_id, lane_one_id, lane_two_id
)
else:
raise NotEnoughInputArguments(
"if lane input is used, both has to be provided"
)
def _handle_connection_input(self, road, road_connection):
"""Checker to see if enough data is provided for an incoming road
Parameters
----------
road (Road): the incoming road
road_connection (str): the connection type (predecessor or successor)
"""
# if isinstance(road.planview, AdjustablePlanview):
if road_connection is not None:
if road_connection == "successor":
road.add_successor(ElementType.junction, self.id)
elif road_connection == "predecessor":
road.add_predecessor(ElementType.junction, self.id)
else:
raise ValueError(
"road_connection can only be of the values 'successor', or 'predecessor'. Not "
+ road_connection
)
else:
if not (
(road.successor and road.successor.element_id == self.id)
or (road.predecessor and road.predecessor.element_id == self.id)
):
raise UndefinedRoadNetwork(
"road : "
+ str(road.id)
+ " is not connected to junction: "
+ str(self.id)
)
if road.successor and road.successor.element_id == self.id:
self._number_of_left_lanes.append(
len(road.lanes.lanesections[-1].leftlanes)
)
self._number_of_right_lanes.append(
len(road.lanes.lanesections[-1].rightlanes)
)
elif road.predecessor and road.predecessor.element_id == self.id:
self._number_of_left_lanes.append(len(road.lanes.lanesections[0].leftlanes))
self._number_of_right_lanes.append(
len(road.lanes.lanesections[0].rightlanes)
)
def _get_list_index(self, id):
"""helping method to get the index of the road based on a road id
Parameters
----------
id (int): the road id
Returns
-------
index (int)
"""
for i in range(len(self.incoming_roads)):
if self.incoming_roads[i].id == id:
return i
def _set_offset_for_incoming_road(self, road_idx, connecting_road_id, offset):
"""_set_offset_for_incoming_road is a helper function to set the correct offsets for a incoming road
Parameters
----------
road_idx (int): the index of the road
connecting_road_id (int): the id of the connecting road
offset (int): the lane offset
"""
if self._get_connection_type(road_idx) == "successor":
self.incoming_roads[road_idx].lane_offset_suc[
str(connecting_road_id)
] = offset
else:
self.incoming_roads[road_idx].lane_offset_pred[
str(connecting_road_id)
] = offset
def _get_contact_point_connecting_road(self, road_id):
"""_get_contact_point_connecting_road is a helper method to get the ContactPoint for a connecting road
Parameters
----------
road_id (int): id of the incoming road
Returns
-------
contact_point (ContactPoint)
"""
incoming_road = self.incoming_roads[self._get_list_index(road_id)]
if incoming_road.successor and incoming_road.successor.element_id == self.id:
return ContactPoint.end
elif (
incoming_road.predecessor
and incoming_road.predecessor.element_id == self.id
):
return ContactPoint.start
else:
raise AttributeError("road is not connected to this junction")
def _get_connecting_lane_section(self, idx):
"""_get_connecting_lane_section is a helper method to get the connected
Parameters
----------
idx (int): the road index
"""
incoming_road = self.incoming_roads[idx]
if incoming_road.successor and incoming_road.successor.element_id == self.id:
return -1
elif (
incoming_road.predecessor
and incoming_road.predecessor.element_id == self.id
):
return 0
else:
raise AttributeError("road is not connected to this junction")
def _create_connecting_roads_unequal_lanes(self, road_one_id, road_two_id):
"""_create_connecting_roads_unequal_lanes is a helper method that connects two roads that have different number of lanes going in to the junciton, will only connect lanes that are common between the roads
Parameters
----------
road_one_id (int): id of the first road
road_two_id (int): id of the second road
"""
idx1 = self._get_list_index(road_one_id)
idx2 = self._get_list_index(road_two_id)
# check if the road has _angles/radius for these roads
if self._circular_junction:
roadgeoms = self._create_geometry_from_circular(idx1, idx2)
elif self._generic_junction:
roadgeoms = self._create_geometry_from_carthesian(idx1, idx2)
first_road_lane_ids, _ = self._get_minimum_lanes_to_connect(
self.incoming_roads[idx1], self.incoming_roads[idx2]
)
left_lane_defs, right_lane_defs = self._get_lane_defs(
idx1, idx2, sum([x.length for x in roadgeoms]), True
)
tmp_junc_road = create_road(
roadgeoms,
self.startnum,
left_lanes=left_lane_defs,
right_lanes=right_lane_defs,
lane_width=3,
road_type=self.id,
)
tmp_junc_road.add_predecessor(
ElementType.road,
road_one_id,
contact_point=self._get_contact_point_connecting_road(road_one_id),
)
tmp_junc_road.add_successor(
ElementType.road,
road_two_id,
contact_point=self._get_contact_point_connecting_road(road_two_id),
)
(
first_road_lane_ids,
connecting_road_lane_ids,
) = self._get_minimum_lanes_to_connect(self.incoming_roads[idx1], tmp_junc_road)
connection = Connection(road_one_id, tmp_junc_road.id, ContactPoint.start)
for i in range(len(first_road_lane_ids)):
connection.add_lanelink(first_road_lane_ids[i], connecting_road_lane_ids[i])
self.junction.add_connection(connection)
(
second_road_lane_ids,
connecting_road_lane_ids,
) = self._get_minimum_lanes_to_connect(self.incoming_roads[idx2], tmp_junc_road)
connection = Connection(
tmp_junc_road.successor.element_id, tmp_junc_road.id, ContactPoint.end
)
for i in range(len(second_road_lane_ids)):
connection.add_lanelink(
second_road_lane_ids[i], connecting_road_lane_ids[i]
)
self.junction.add_connection(connection)
self.junction_roads.append(tmp_junc_road)
self.startnum += 1
def _get_lane_defs(
self, idx1, idx2, connecting_road_length, allow_empty_lane=False
):
def _get_lane_widths(idx, l_or_r):
connected_lane_section = self._get_connecting_lane_section(idx)
lane_widths = []
if l_or_r == "left":
lanes = (
self.incoming_roads[idx]
.lanes.lanesections[connected_lane_section]
.leftlanes
)
elif l_or_r == "right":
lanes = (
self.incoming_roads[idx]
.lanes.lanesections[connected_lane_section]
.rightlanes
)
for ll in lanes:
if connected_lane_section == 0:
lane_widths.append(ll.get_width(0))
else:
lane_widths.append(
ll.get_width(
self.incoming_roads[idx].planview.get_total_length()
- self.incoming_roads[idx]
.lanes.lanesections[connected_lane_section]
.s
)
)
return lane_widths
incomming_connected_lane_section = self._get_connecting_lane_section(idx1)
outgoing_connected_lane_section = self._get_connecting_lane_section(idx2)
if incomming_connected_lane_section == -1:
n_l_lanes = len(
self.incoming_roads[idx1]
.lanes.lanesections[incomming_connected_lane_section]
.leftlanes
)
n_r_lanes = len(
self.incoming_roads[idx1]
.lanes.lanesections[incomming_connected_lane_section]
.rightlanes
)
left_start_widths = _get_lane_widths(idx1, "left")
right_start_widths = _get_lane_widths(idx1, "right")
elif incomming_connected_lane_section == 0:
n_r_lanes = len(
self.incoming_roads[idx1]
.lanes.lanesections[incomming_connected_lane_section]
.leftlanes
)
n_l_lanes = len(
self.incoming_roads[idx1]
.lanes.lanesections[incomming_connected_lane_section]
.rightlanes
)
left_start_widths = _get_lane_widths(idx1, "right")
right_start_widths = _get_lane_widths(idx1, "left")
if outgoing_connected_lane_section == -1:
left_end_widths = _get_lane_widths(idx2, "right")
right_end_widths = _get_lane_widths(idx2, "left")
elif outgoing_connected_lane_section == 0:
left_end_widths = _get_lane_widths(idx2, "left")
right_end_widths = _get_lane_widths(idx2, "right")
left_lanes = 0
if len(left_start_widths) == len(left_end_widths):
left_lanes = LaneDef(
0,
connecting_road_length,
n_l_lanes,
n_l_lanes,
None,
left_start_widths,
left_end_widths,
)
elif allow_empty_lane:
num_lanes_to_connect = min(len(left_start_widths), len(left_end_widths))
left_lanes = LaneDef(
0,
connecting_road_length,
num_lanes_to_connect,
num_lanes_to_connect,
None,
left_start_widths[0:num_lanes_to_connect],
left_end_widths[0:num_lanes_to_connect],
)
right_lanes = 0
if (len(right_start_widths)) == (len(right_end_widths)):
right_lanes = LaneDef(
0,
connecting_road_length,
n_r_lanes,
n_r_lanes,
None,
right_start_widths,
right_end_widths,
)
elif allow_empty_lane:
num_lanes_to_connect = min(len(right_start_widths), len(right_end_widths))
right_lanes = LaneDef(
0,
connecting_road_length,
num_lanes_to_connect,
num_lanes_to_connect,
None,
right_start_widths[0:num_lanes_to_connect],
right_end_widths[0:num_lanes_to_connect],
)
return left_lanes, right_lanes
def _create_connecting_roads_with_equal_lanes(self, road_one_id, road_two_id):
"""_create_connecting_roads_with_equal_lanes is a helper method that connects two roads that have the
same number of left and right lanes
Parameters
----------
road_one_id (int): id of the first road
road_two_id (int): id of the second road
"""
idx1 = self._get_list_index(road_one_id)
idx2 = self._get_list_index(road_two_id)
# check if the road has _angles/radius for these roads
if self._circular_junction:
roadgeoms = self._create_geometry_from_circular(idx1, idx2)
elif self._generic_junction:
roadgeoms = self._create_geometry_from_carthesian(idx1, idx2)
left_lane_defs, right_lane_defs = self._get_lane_defs(
idx1, idx2, sum([x.length for x in roadgeoms])
)
tmp_junc_road = create_road(
roadgeoms,
self.startnum,
left_lanes=left_lane_defs,
right_lanes=right_lane_defs,
lane_width=1,
road_type=self.id,
)
tmp_junc_road.add_predecessor(
ElementType.road,
road_one_id,
contact_point=self._get_contact_point_connecting_road(road_one_id),
)
tmp_junc_road.add_successor(
ElementType.road,
road_two_id,
contact_point=self._get_contact_point_connecting_road(road_two_id),
)
self._add_connection_full(tmp_junc_road)
self.junction_roads.append(tmp_junc_road)
self.startnum += 1
def _get_connection_type(self, road_idx):
if (
self.incoming_roads[road_idx].successor
and self.incoming_roads[road_idx].successor.element_id == self.id
):
return "successor"
else:
return "predecessor"
def _get_lane_width(self, lane_id, road_idx):
if np.sign(lane_id) == -1:
start_width = (
self.incoming_roads[road_idx]
.lanes.lanesections[self._get_connecting_lane_section(road_idx)]
.rightlanes[abs(lane_id) - 1]
.get_width(0)
)
else:
start_width = (
self.incoming_roads[road_idx]
.lanes.lanesections[self._get_connecting_lane_section(road_idx)]
.leftlanes[abs(lane_id) - 1]
.get_width(0)
)
return start_width
def _create_connecting_road_with_lane_input(
self, road_one_id, road_two_id, lane_one_id, lane_two_id
):
"""_create_connecting_road_with_lane_input is a helper method that connects two roads with one lane on each road.
It allows for connecting two roads that may have lanes of different widths and creates an appropriate geometry for the
generated connecting road based on its start lane width and end lane width as read from the two input roads being connected
Parameters
----------
road_one_id (int): the id of the first road to connect
road_two_id (int): the id of the second road to connect
lane_one_id (int): the lane id(s) on the first road to connect
lane_two_id (int): the lane id(s) on the second road to connect
"""
idx1 = self._get_list_index(road_one_id)
idx2 = self._get_list_index(road_two_id)
start_offset = 0.0
end_offset = 0.0
if (
self._get_connection_type(idx2) == self._get_connection_type(idx1)
and np.sign(lane_one_id) == np.sign(lane_two_id)
) or (
self._get_connection_type(idx2) != self._get_connection_type(idx1)
and np.sign(lane_one_id) != np.sign(lane_two_id)
):
raise MixingDrivingDirection(
"driving direction not consistent when trying to make connection between roads:"
+ str(road_one_id)
+ " and "
+ str(road_two_id)
)
if np.sign(lane_one_id) == -1:
for lane_iter in range((np.sign(lane_one_id) * lane_one_id) - 1):
start_offset += (
self.incoming_roads[idx1]
.lanes.lanesections[self._get_connecting_lane_section(idx1)]
.rightlanes[lane_iter]
.get_width(0)
)
else:
for lane_iter in range((np.sign(lane_one_id) * lane_one_id) - 1):
start_offset += (
self.incoming_roads[idx1]
.lanes.lanesections[self._get_connecting_lane_section(idx1)]
.leftlanes[lane_iter]
.get_width(0)
)
if np.sign(lane_two_id) == -1:
for lane_iter in range((np.sign(lane_two_id) * lane_two_id) - 1):
end_offset += (
self.incoming_roads[idx2]
.lanes.lanesections[self._get_connecting_lane_section(idx2)]
.rightlanes[lane_iter]
.get_width(0)
)
else:
for lane_iter in range((np.sign(lane_two_id) * lane_two_id) - 1):
end_offset += (
self.incoming_roads[idx2]
.lanes.lanesections[self._get_connecting_lane_section(idx2)]
.leftlanes[lane_iter]
.get_width(0)
)
start_width = self._get_lane_width(lane_one_id, idx1)
end_width = self._get_lane_width(lane_two_id, idx2)
if self._get_connection_type(idx1) == "successor":
angle_offset_start = np.sign(lane_one_id) * np.pi / 2
else:
angle_offset_start = -np.sign(lane_one_id) * np.pi / 2
if self._get_connection_type(idx2) == "successor":
angle_offset_end = np.sign(lane_two_id) * np.pi / 2
else:
angle_offset_end = -np.sign(lane_two_id) * np.pi / 2
if self._circular_junction:
an1 = self._angles[idx2] - self._angles[idx1] - np.pi
# adjust angle if multiple of pi
if an1 > np.pi:
an1 = -(2 * np.pi - an1)
start_x = -self._radie[idx1]
start_y = start_offset
start_h = 0
end_x = self._radie[idx2] * np.cos(an1) + end_offset * np.cos(
self._angles[idx2] + angle_offset_end
)
end_y = self._radie[idx2] * np.sin(an1) + end_offset * np.sin(
self._angles[idx2] + angle_offset_end
)
end_h = an1
elif self._generic_junction:
start_x = self._x[idx1] + start_offset * np.cos(
self._h[idx1] + angle_offset_start
)
start_y = self._y[idx1] + start_offset * np.sin(
self._h[idx1] + angle_offset_start
)
start_h = self._h[idx1]
end_x = self._x[idx2] + end_offset * np.cos(
self._h[idx2] + angle_offset_end
)
end_y = self._y[idx2] + end_offset * np.sin(
self._h[idx2] + angle_offset_end
)
end_h = self._h[idx2] - np.pi
clothoids = pcloth.SolveG2(
start_x,
start_y,
start_h,
STD_START_CLOTH,
end_x,
end_y,
end_h,
STD_START_CLOTH,
)
roadgeoms = [
Spiral(x.KappaStart, x.KappaEnd, length=x.length) for x in clothoids
]
if self._get_connection_type(idx1) == "successor":
if lane_one_id < 0:
num_left_lanes = 0
num_right_lanes = 1
else:
num_left_lanes = 1
num_right_lanes = 0
else:
if lane_one_id < 0:
num_left_lanes = 1
num_right_lanes = 0
else:
num_left_lanes = 0
num_right_lanes = 1
tmp_junc_road = create_road(
roadgeoms,
self.startnum,
left_lanes=num_left_lanes,
right_lanes=num_right_lanes,
lane_width=start_width,
road_type=self.id,
lane_width_end=end_width,
)
pred_lane_offset = np.sign(lane_one_id) * (abs(lane_one_id) - 1)
if self._get_connection_type(idx1) == "predecessor":
pred_lane_offset = -pred_lane_offset
succ_lane_offset = np.sign(lane_two_id) * (abs(lane_two_id) - 1)
if self._get_connection_type(idx2) == "predecessor":
succ_lane_offset = -succ_lane_offset
tmp_junc_road.add_predecessor(
ElementType.road,
road_one_id,
contact_point=self._get_contact_point_connecting_road(road_one_id),
lane_offset=pred_lane_offset,
)
tmp_junc_road.add_successor(
ElementType.road,
road_two_id,
contact_point=self._get_contact_point_connecting_road(road_two_id),
lane_offset=succ_lane_offset,
)
# add offsets to the incomming roads
self._set_offset_for_incoming_road(idx1, tmp_junc_road.id, -pred_lane_offset)
self._set_offset_for_incoming_road(idx2, tmp_junc_road.id, -succ_lane_offset)
self.junction_roads.append(tmp_junc_road)
connection = Connection(road_one_id, tmp_junc_road.id, ContactPoint.start)
if num_left_lanes:
connection.add_lanelink(lane_one_id, 1)
else:
connection.add_lanelink(lane_one_id, -1)
self.junction.add_connection(connection)
self.startnum += 1
def _add_connection_full(self, connecting_road):
"""_add_connection_full is a helper method creating connections for connections with equal amount of lanes
Parameters
----------
connecting_road (Road): the connecting road
"""
conne1 = Connection(
connecting_road.successor.element_id, connecting_road.id, ContactPoint.end
)
_, sign, _ = _get_related_lanesection(
connecting_road,
self.incoming_roads[
self._get_list_index(connecting_road.successor.element_id)
],
)
_create_junction_links(
conne1,
len(connecting_road.lanes.lanesections[-1].rightlanes),
-1,
sign,
to_offset=connecting_road.lane_offset_suc[
str(connecting_road.successor.element_id)
],
)
_create_junction_links(
conne1,
len(connecting_road.lanes.lanesections[-1].leftlanes),
1,
sign,
to_offset=connecting_road.lane_offset_suc[
str(connecting_road.successor.element_id)
],
)
self.junction.add_connection(conne1)
# handle predecessor lanes
conne2 = Connection(
connecting_road.predecessor.element_id,
connecting_road.id,
ContactPoint.start,
)
_, sign, _ = _get_related_lanesection(
connecting_road,
self.incoming_roads[
self._get_list_index(connecting_road.predecessor.element_id)
],
)
_create_junction_links(
conne2,
len(connecting_road.lanes.lanesections[0].rightlanes),
-1,
sign,
from_offset=connecting_road.lane_offset_pred[
str(connecting_road.predecessor.element_id)
],
)
_create_junction_links(
conne2,
len(connecting_road.lanes.lanesections[0].leftlanes),
1,
sign,
from_offset=connecting_road.lane_offset_pred[
str(connecting_road.predecessor.element_id)
],
)
self.junction.add_connection(conne2)
def _create_geometry_from_carthesian(self, idx1, idx2):
"""_create_geometry_from_carthesian creates a connecting road between two roads added with carthesian coordinates
Parameters
----------
idx1 (int): index of the first road
idx2 (int): index of the second road
Returns
-------
list of Geometries
"""
an1 = self._h[idx2] - self._h[idx1] - np.pi
# adjust angle if multiple of pi
if an1 > np.pi:
an1 = -(2 * np.pi - an1)
clothoids = pcloth.SolveG2(
self._x[idx1],
self._y[idx1],
self._h[idx1],
STD_START_CLOTH,
self._x[idx2],
self._y[idx2],
self._h[idx2] - np.pi,
STD_START_CLOTH,
)
roadgeoms = [
Spiral(x.KappaStart, x.KappaEnd, length=x.length) for x in clothoids
]
return roadgeoms
def _create_geometry_from_circular(self, idx1, idx2):
"""_create_geometry_from_circular creates a connecting road between two roads added with circular geometry
Parameters
----------
idx1 (int): index of the first road
idx2 (int): index of the second road
Returns
-------
list of Geometries
"""
an1 = self._angles[idx2] - self._angles[idx1] - np.pi
# adjust angle if multiple of pi
if an1 > np.pi:
an1 = -(2 * np.pi - an1)
if np.sign(an1) == 0:
roadgeoms = [Line(self._radie[idx1] + self._radie[idx2])]
else:
clothoids = pcloth.SolveG2(
-self._radie[idx1],
0,
0,
STD_START_CLOTH,
self._radie[idx2] * np.cos(an1),
self._radie[idx2] * np.sin(an1),
an1,
STD_START_CLOTH,
)
roadgeoms = [
Spiral(x.KappaStart, x.KappaEnd, length=x.length) for x in clothoids
]
return roadgeoms
def get_connecting_roads(self):
"""returns the connecting roads generated for the junction
Returns
-------
list of Road
"""
return self.junction_roads
class DirectJunctionCreator:
"""DirectJunctionCreator is a helper class to create custom direct junctions.
Parameters
----------
id (int): the id of the junction
name (str): name of the junction
Attributes
----------
id (int): the id of the junction
junction (Junction): the junction xodr element for the junction
Methods
-------
add_connection(first_road_id, second_road_id, first_lane_id, second_lane_id)
"""
def __init__(self, id, name):
"""Initalize the DirectJunctionCreator
Parameters
----------
id (int): the id of the junction
name (str): name of the junction
"""
self.id = id
self.junction = Junction(name, id, JunctionType.direct)
self._incoming_lane_ids = []
self._linked_lane_ids = []
def _get_minimum_lanes_to_connect(self, incoming_road, linked_road):
incoming_connection, _, incoming_lane_section = _get_related_lanesection(
incoming_road, linked_road
)
linked_connection, sign, linked_lane_section = _get_related_lanesection(
linked_road, incoming_road
)
incoming_left_lanes = len(
incoming_road.lanes.lanesections[incoming_lane_section].leftlanes
)
incoming_right_lanes = len(
incoming_road.lanes.lanesections[incoming_lane_section].rightlanes
)
linked_left_lanes = len(
linked_road.lanes.lanesections[linked_lane_section].leftlanes
)
linked_right_lanes = len(
linked_road.lanes.lanesections[linked_lane_section].rightlanes
)
self._incoming_lane_ids = []
self._linked_lane_ids = []
# if incoming_connection == "successor" and linked_connection == "predecessor" or incoming_connection == "predecessor" and linked_connection == "successor":
if sign > 0:
self._incoming_lane_ids.extend(
[x for x in range(-min(incoming_right_lanes, linked_right_lanes), 0, 1)]
)
self._linked_lane_ids.extend(
[x for x in range(-min(incoming_right_lanes, linked_right_lanes), 0, 1)]
)
self._incoming_lane_ids.extend(
[
x
for x in range(
1, min(incoming_left_lanes, linked_left_lanes) + 1, 1
)
]
)
self._linked_lane_ids.extend(
[
x
for x in range(
1, min(incoming_left_lanes, linked_left_lanes) + 1, 1
)
]
)
elif (
sign < 0
): # incoming_connection == "successor" and linked_connection == "successor" or incoming_connection == "predecessor" and linked_connection == "predecessor":
self._incoming_lane_ids.extend(
[-x for x in range(-min(incoming_left_lanes, linked_right_lanes), 0, 1)]
)
self._linked_lane_ids.extend(
[x for x in range(-min(incoming_left_lanes, linked_right_lanes), 0, 1)]
)
self._incoming_lane_ids.extend(
[
-x
for x in range(
1, min(incoming_right_lanes, linked_left_lanes) + 1, 1
)
]
)
self._linked_lane_ids.extend(
[
x
for x in range(
1, min(incoming_right_lanes, linked_left_lanes) + 1, 1
)
]
)
def _get_contact_point_linked_road(self, incoming_road):
"""_get_contact_point_linked_road is a helper method to get the ContactPoint for a linked road
Parameters
----------
road_id (int): id of the incoming road
Returns
-------
contact_point (ContactPoint)
"""
if incoming_road.successor and incoming_road.successor.element_id == self.id:
return ContactPoint.end
elif (
incoming_road.predecessor
and incoming_road.predecessor.element_id == self.id
):
return ContactPoint.start
else:
raise AttributeError("road is not connected to this junction")
def add_connection(
self, incoming_road, linked_road, incoming_lane_ids=None, linked_lane_ids=None
):
"""add_connection adds a connection between an incoming_road and a linked_road.
Withouth any lane information, it will add connections to all lanes that the two roads have in common
Parameters
----------
incoming_road (Road): the incoming road
linked_road (Road): the linked road
incoming_lane_ids (int or list of ints): the incoming lane ids to connect
Default: None
linked_lane_ids (int or list of ints): the linked lane ids to connect
Default: None
"""
linked_lane_offset = 0
inc_lane_offset = 0
incoming_main_road = False
if incoming_lane_ids == None and linked_lane_ids == None:
self._get_minimum_lanes_to_connect(incoming_road, linked_road)
elif incoming_lane_ids is not None and linked_lane_ids is not None:
if not isinstance(incoming_lane_ids, list):
self._incoming_lane_ids = [incoming_lane_ids]
else:
self._incoming_lane_ids = incoming_lane_ids
if not isinstance(linked_lane_ids, list):
self._linked_lane_ids = [linked_lane_ids]
if abs(linked_lane_ids) == 1:
incoming_main_road = True
else:
self._linked_lane_ids = linked_lane_ids
if min([abs(x) for x in self._linked_lane_ids]) == 1:
incoming_main_road = True
# sanity check
for i in range(len(self._incoming_lane_ids)):
if self._get_contact_point_linked_road(
incoming_road
) == self._get_contact_point_linked_road(linked_road):
if np.sign(self._incoming_lane_ids[i]) == np.sign(
self._linked_lane_ids[i]
):
raise MixingDrivingDirection(
"driving direction not consistent when trying to make connection between roads:"
+ str(incoming_road.id)
+ " and "
+ str(linked_road.id)
)
else:
if np.sign(self._incoming_lane_ids[i]) != np.sign(
self._linked_lane_ids[i]
):
raise MixingDrivingDirection(
"driving direction not consistent when trying to make connection between roads:"
+ str(incoming_road.id)
+ " and "
+ str(linked_road.id)
)
if len(self._linked_lane_ids) != len(self._linked_lane_ids):
raise NotSameAmountOfLanesError(
"the incoming_lane_ids and linked_lane_ids are not the same length"
)
if abs(self._incoming_lane_ids[0]) != abs(self._linked_lane_ids[0]):
lane_offset = abs(
abs(self._incoming_lane_ids[0]) - abs(self._linked_lane_ids[0])
)
if incoming_main_road:
linked_lane_offset = np.sign(self._linked_lane_ids[0]) * lane_offset
inc_lane_offset = (
-1
* np.sign(self._incoming_lane_ids[0] * self._linked_lane_ids[0])
* linked_lane_offset
)
else:
inc_lane_offset = np.sign(self._incoming_lane_ids[0]) * lane_offset
linked_lane_offset = (
-1
* np.sign(self._incoming_lane_ids[0] * self._linked_lane_ids[0])
* inc_lane_offset
)
if (
incoming_road.predecessor
and incoming_road.predecessor.element_id == self.id
):
incoming_road.pred_direct_junction[linked_road.id] = inc_lane_offset
else:
incoming_road.succ_direct_junction[linked_road.id] = inc_lane_offset
if linked_road.predecessor and linked_road.predecessor.element_id == self.id:
linked_road.pred_direct_junction[incoming_road.id] = linked_lane_offset
else:
linked_road.succ_direct_junction[incoming_road.id] = linked_lane_offset
connection = Connection(
incoming_road.id,
linked_road.id,
self._get_contact_point_linked_road(linked_road),
)
for i in range(len(self._incoming_lane_ids)):
connection.add_lanelink(
self._incoming_lane_ids[i], self._linked_lane_ids[i]
)
self.junction.add_connection(connection)
Classes
class CommonJunctionCreator (id, name, startnum=100)
-
CommonJunctionCreator is a helper class to create custom common junctions.
Parameters
id (int): the id of the junction name (str): name of the junction startnum (int): the starting id of this junctions roads Default: 100
Attributes
id (int): the id of the junction startnum (int): the starting id of this junctions roads incoming_roads (list of Road): all incoming roads for the junction junction_roads (list of Road): all generated connecting roads junction (Junction): the junction xodr element for the junction
Methods
add_incoming_road_circular_geometry(road, radius, angle, road_connection) Adds a road on a circle defining the junction geometry Note: cannot be used together with 'add_incoming_road_cartesian_geometry' add_incoming_road_cartesian_geometry(road, x, y, heading, road_connection) Adds a road on a generic x, y, heading geometry Note: cannot be used together with 'add_incoming_road_circular_geometry' add_connection(first_road_id, second_road_id, first_lane_id, second_lane_id)
Expand source code
class CommonJunctionCreator: """CommonJunctionCreator is a helper class to create custom common junctions. Parameters ---------- id (int): the id of the junction name (str): name of the junction startnum (int): the starting id of this junctions roads Default: 100 Attributes ---------- id (int): the id of the junction startnum (int): the starting id of this junctions roads incoming_roads (list of Road): all incoming roads for the junction junction_roads (list of Road): all generated connecting roads junction (Junction): the junction xodr element for the junction Methods ------- add_incoming_road_circular_geometry(road, radius, angle, road_connection) Adds a road on a circle defining the junction geometry Note: cannot be used together with 'add_incoming_road_cartesian_geometry' add_incoming_road_cartesian_geometry(road, x, y, heading, road_connection) Adds a road on a generic x, y, heading geometry Note: cannot be used together with 'add_incoming_road_circular_geometry' add_connection(first_road_id, second_road_id, first_lane_id, second_lane_id) """ def __init__(self, id, name, startnum=100): self.id = id self.incoming_roads = [] self._radie = [] self._angles = [] self._x = [] self._y = [] self._h = [] self.junction_roads = [] self._number_of_left_lanes = [] self._number_of_right_lanes = [] self.startnum = startnum self.junction = Junction(name, id, junction_type=JunctionType.default) self._circular_junction = False self._generic_junction = False def add_incoming_road_circular_geometry( self, road, radius, angle, road_connection=None ): """add_incoming_road_circular_geometry adds an incoming road to a junction, assuming a cirular geometry of the junction, Meaning all roads will be placed on a circle based on the radius and angle. The radius can be different from different incoming roads, however the origin stays the same. NOTE: Note, this method can not be used toghether with add_incoming_road_cartesian_geometry Parameters ---------- road (Road): the incoming road radius (float): the radius on where to put the road angle (float): the angle on where to put the road road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor' Default: None """ self.incoming_roads.append(road) self._handle_connection_input(road, road_connection) self._radie.append(radius) self._angles.append(angle) self._circular_junction = True def add_incoming_road_cartesian_geometry( self, road, x, y, heading, road_connection=None ): """add_incoming_road_cartesian_geometry adds an incoming road to a junction, assuming a local coordinate system for the junction. NOTE: Note, this method can not be used toghether with add_incoming_road_circular_geometry Parameters ---------- road (Road): the incoming road x (float): the local x-position of the road y (float): the local y-position of the road heading (float): the local heading of the road (pointing in to the junction) road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor' Default: None """ self.incoming_roads.append(road) self._handle_connection_input(road, road_connection) self._x.append(x) self._y.append(y) self._h.append(heading) self._generic_junction = True def _get_minimum_lanes_to_connect(self, incoming_road, linked_road): ( incoming_connection, incoming_sign, incoming_lane_section, ) = _get_related_lanesection(incoming_road, linked_road) linked_connection, sign, linked_lane_section = _get_related_lanesection( linked_road, incoming_road ) incoming_left_lanes = len( incoming_road.lanes.lanesections[incoming_lane_section].leftlanes ) incoming_right_lanes = len( incoming_road.lanes.lanesections[incoming_lane_section].rightlanes ) linked_left_lanes = len( linked_road.lanes.lanesections[linked_lane_section].leftlanes ) linked_right_lanes = len( linked_road.lanes.lanesections[linked_lane_section].rightlanes ) _incoming_lane_ids = [] _linked_lane_ids = [] if sign > 0: _incoming_lane_ids.extend( [ x for x in range( 1, min(incoming_left_lanes, linked_left_lanes) + 1, 1 ) ] ) _linked_lane_ids.extend( [ x for x in range( 1, min(incoming_left_lanes, linked_left_lanes) + 1, 1 ) ] ) _incoming_lane_ids.extend( [ -x for x in range( 1, min(incoming_right_lanes, linked_right_lanes) + 1, 1 ) ] ) _linked_lane_ids.extend( [ -x for x in range( 1, min(incoming_right_lanes, linked_right_lanes) + 1, 1 ) ] ) elif sign < 0: _incoming_lane_ids.extend( [ x for x in range( 1, min(incoming_left_lanes, linked_right_lanes) + 1, 1 ) ] ) _linked_lane_ids.extend( [ -x for x in range( 1, min(incoming_left_lanes, linked_right_lanes) + 1, 1 ) ] ) _incoming_lane_ids.extend( [ -x for x in range( 1, min(incoming_right_lanes, linked_left_lanes) + 1, 1 ) ] ) _linked_lane_ids.extend( [ x for x in range( 1, min(incoming_right_lanes, linked_left_lanes) + 1, 1 ) ] ) return _incoming_lane_ids, _linked_lane_ids def add_connection( self, road_one_id, road_two_id, lane_one_id=None, lane_two_id=None ): """add_connection adds a connection between two roads by generating three clothoids to fit their given positions in the local coordinate system (circular or cartesian) All roads has to be added to the junction with either: add_incoming_road_cartesian_geometry add_incoming_road_circular_geometry NOTE: if no lane ids are provided, add_connection will add as many connections as the two roads have in common Parameters ---------- road_one_id (int): the id of the first road to connect road_two_id (int): the id of the second road to connect lane_one_id (int or list of int): the lane id(s) on the first road to connect lane_two_id (int or list of int): the lane id(s) on the second road to connect """ if (lane_one_id == None) and (lane_two_id == None): # check if the connection is symetric (same number of lanes on both sides) if ( self._number_of_left_lanes[self._get_list_index(road_one_id)] == self._number_of_right_lanes[self._get_list_index(road_one_id)] and self._number_of_left_lanes[self._get_list_index(road_two_id)] == self._number_of_right_lanes[self._get_list_index(road_two_id)] and self._number_of_left_lanes[self._get_list_index(road_one_id)] == self._number_of_right_lanes[self._get_list_index(road_two_id)] ): self._create_connecting_roads_with_equal_lanes(road_one_id, road_two_id) else: self._create_connecting_roads_unequal_lanes(road_one_id, road_two_id) elif lane_one_id is not None and lane_two_id is not None: if isinstance(lane_one_id, list): for i in range(len(lane_one_id)): self._create_connecting_road_with_lane_input( road_one_id, road_two_id, lane_one_id[i], lane_two_id[i] ) else: self._create_connecting_road_with_lane_input( road_one_id, road_two_id, lane_one_id, lane_two_id ) else: raise NotEnoughInputArguments( "if lane input is used, both has to be provided" ) def _handle_connection_input(self, road, road_connection): """Checker to see if enough data is provided for an incoming road Parameters ---------- road (Road): the incoming road road_connection (str): the connection type (predecessor or successor) """ # if isinstance(road.planview, AdjustablePlanview): if road_connection is not None: if road_connection == "successor": road.add_successor(ElementType.junction, self.id) elif road_connection == "predecessor": road.add_predecessor(ElementType.junction, self.id) else: raise ValueError( "road_connection can only be of the values 'successor', or 'predecessor'. Not " + road_connection ) else: if not ( (road.successor and road.successor.element_id == self.id) or (road.predecessor and road.predecessor.element_id == self.id) ): raise UndefinedRoadNetwork( "road : " + str(road.id) + " is not connected to junction: " + str(self.id) ) if road.successor and road.successor.element_id == self.id: self._number_of_left_lanes.append( len(road.lanes.lanesections[-1].leftlanes) ) self._number_of_right_lanes.append( len(road.lanes.lanesections[-1].rightlanes) ) elif road.predecessor and road.predecessor.element_id == self.id: self._number_of_left_lanes.append(len(road.lanes.lanesections[0].leftlanes)) self._number_of_right_lanes.append( len(road.lanes.lanesections[0].rightlanes) ) def _get_list_index(self, id): """helping method to get the index of the road based on a road id Parameters ---------- id (int): the road id Returns ------- index (int) """ for i in range(len(self.incoming_roads)): if self.incoming_roads[i].id == id: return i def _set_offset_for_incoming_road(self, road_idx, connecting_road_id, offset): """_set_offset_for_incoming_road is a helper function to set the correct offsets for a incoming road Parameters ---------- road_idx (int): the index of the road connecting_road_id (int): the id of the connecting road offset (int): the lane offset """ if self._get_connection_type(road_idx) == "successor": self.incoming_roads[road_idx].lane_offset_suc[ str(connecting_road_id) ] = offset else: self.incoming_roads[road_idx].lane_offset_pred[ str(connecting_road_id) ] = offset def _get_contact_point_connecting_road(self, road_id): """_get_contact_point_connecting_road is a helper method to get the ContactPoint for a connecting road Parameters ---------- road_id (int): id of the incoming road Returns ------- contact_point (ContactPoint) """ incoming_road = self.incoming_roads[self._get_list_index(road_id)] if incoming_road.successor and incoming_road.successor.element_id == self.id: return ContactPoint.end elif ( incoming_road.predecessor and incoming_road.predecessor.element_id == self.id ): return ContactPoint.start else: raise AttributeError("road is not connected to this junction") def _get_connecting_lane_section(self, idx): """_get_connecting_lane_section is a helper method to get the connected Parameters ---------- idx (int): the road index """ incoming_road = self.incoming_roads[idx] if incoming_road.successor and incoming_road.successor.element_id == self.id: return -1 elif ( incoming_road.predecessor and incoming_road.predecessor.element_id == self.id ): return 0 else: raise AttributeError("road is not connected to this junction") def _create_connecting_roads_unequal_lanes(self, road_one_id, road_two_id): """_create_connecting_roads_unequal_lanes is a helper method that connects two roads that have different number of lanes going in to the junciton, will only connect lanes that are common between the roads Parameters ---------- road_one_id (int): id of the first road road_two_id (int): id of the second road """ idx1 = self._get_list_index(road_one_id) idx2 = self._get_list_index(road_two_id) # check if the road has _angles/radius for these roads if self._circular_junction: roadgeoms = self._create_geometry_from_circular(idx1, idx2) elif self._generic_junction: roadgeoms = self._create_geometry_from_carthesian(idx1, idx2) first_road_lane_ids, _ = self._get_minimum_lanes_to_connect( self.incoming_roads[idx1], self.incoming_roads[idx2] ) left_lane_defs, right_lane_defs = self._get_lane_defs( idx1, idx2, sum([x.length for x in roadgeoms]), True ) tmp_junc_road = create_road( roadgeoms, self.startnum, left_lanes=left_lane_defs, right_lanes=right_lane_defs, lane_width=3, road_type=self.id, ) tmp_junc_road.add_predecessor( ElementType.road, road_one_id, contact_point=self._get_contact_point_connecting_road(road_one_id), ) tmp_junc_road.add_successor( ElementType.road, road_two_id, contact_point=self._get_contact_point_connecting_road(road_two_id), ) ( first_road_lane_ids, connecting_road_lane_ids, ) = self._get_minimum_lanes_to_connect(self.incoming_roads[idx1], tmp_junc_road) connection = Connection(road_one_id, tmp_junc_road.id, ContactPoint.start) for i in range(len(first_road_lane_ids)): connection.add_lanelink(first_road_lane_ids[i], connecting_road_lane_ids[i]) self.junction.add_connection(connection) ( second_road_lane_ids, connecting_road_lane_ids, ) = self._get_minimum_lanes_to_connect(self.incoming_roads[idx2], tmp_junc_road) connection = Connection( tmp_junc_road.successor.element_id, tmp_junc_road.id, ContactPoint.end ) for i in range(len(second_road_lane_ids)): connection.add_lanelink( second_road_lane_ids[i], connecting_road_lane_ids[i] ) self.junction.add_connection(connection) self.junction_roads.append(tmp_junc_road) self.startnum += 1 def _get_lane_defs( self, idx1, idx2, connecting_road_length, allow_empty_lane=False ): def _get_lane_widths(idx, l_or_r): connected_lane_section = self._get_connecting_lane_section(idx) lane_widths = [] if l_or_r == "left": lanes = ( self.incoming_roads[idx] .lanes.lanesections[connected_lane_section] .leftlanes ) elif l_or_r == "right": lanes = ( self.incoming_roads[idx] .lanes.lanesections[connected_lane_section] .rightlanes ) for ll in lanes: if connected_lane_section == 0: lane_widths.append(ll.get_width(0)) else: lane_widths.append( ll.get_width( self.incoming_roads[idx].planview.get_total_length() - self.incoming_roads[idx] .lanes.lanesections[connected_lane_section] .s ) ) return lane_widths incomming_connected_lane_section = self._get_connecting_lane_section(idx1) outgoing_connected_lane_section = self._get_connecting_lane_section(idx2) if incomming_connected_lane_section == -1: n_l_lanes = len( self.incoming_roads[idx1] .lanes.lanesections[incomming_connected_lane_section] .leftlanes ) n_r_lanes = len( self.incoming_roads[idx1] .lanes.lanesections[incomming_connected_lane_section] .rightlanes ) left_start_widths = _get_lane_widths(idx1, "left") right_start_widths = _get_lane_widths(idx1, "right") elif incomming_connected_lane_section == 0: n_r_lanes = len( self.incoming_roads[idx1] .lanes.lanesections[incomming_connected_lane_section] .leftlanes ) n_l_lanes = len( self.incoming_roads[idx1] .lanes.lanesections[incomming_connected_lane_section] .rightlanes ) left_start_widths = _get_lane_widths(idx1, "right") right_start_widths = _get_lane_widths(idx1, "left") if outgoing_connected_lane_section == -1: left_end_widths = _get_lane_widths(idx2, "right") right_end_widths = _get_lane_widths(idx2, "left") elif outgoing_connected_lane_section == 0: left_end_widths = _get_lane_widths(idx2, "left") right_end_widths = _get_lane_widths(idx2, "right") left_lanes = 0 if len(left_start_widths) == len(left_end_widths): left_lanes = LaneDef( 0, connecting_road_length, n_l_lanes, n_l_lanes, None, left_start_widths, left_end_widths, ) elif allow_empty_lane: num_lanes_to_connect = min(len(left_start_widths), len(left_end_widths)) left_lanes = LaneDef( 0, connecting_road_length, num_lanes_to_connect, num_lanes_to_connect, None, left_start_widths[0:num_lanes_to_connect], left_end_widths[0:num_lanes_to_connect], ) right_lanes = 0 if (len(right_start_widths)) == (len(right_end_widths)): right_lanes = LaneDef( 0, connecting_road_length, n_r_lanes, n_r_lanes, None, right_start_widths, right_end_widths, ) elif allow_empty_lane: num_lanes_to_connect = min(len(right_start_widths), len(right_end_widths)) right_lanes = LaneDef( 0, connecting_road_length, num_lanes_to_connect, num_lanes_to_connect, None, right_start_widths[0:num_lanes_to_connect], right_end_widths[0:num_lanes_to_connect], ) return left_lanes, right_lanes def _create_connecting_roads_with_equal_lanes(self, road_one_id, road_two_id): """_create_connecting_roads_with_equal_lanes is a helper method that connects two roads that have the same number of left and right lanes Parameters ---------- road_one_id (int): id of the first road road_two_id (int): id of the second road """ idx1 = self._get_list_index(road_one_id) idx2 = self._get_list_index(road_two_id) # check if the road has _angles/radius for these roads if self._circular_junction: roadgeoms = self._create_geometry_from_circular(idx1, idx2) elif self._generic_junction: roadgeoms = self._create_geometry_from_carthesian(idx1, idx2) left_lane_defs, right_lane_defs = self._get_lane_defs( idx1, idx2, sum([x.length for x in roadgeoms]) ) tmp_junc_road = create_road( roadgeoms, self.startnum, left_lanes=left_lane_defs, right_lanes=right_lane_defs, lane_width=1, road_type=self.id, ) tmp_junc_road.add_predecessor( ElementType.road, road_one_id, contact_point=self._get_contact_point_connecting_road(road_one_id), ) tmp_junc_road.add_successor( ElementType.road, road_two_id, contact_point=self._get_contact_point_connecting_road(road_two_id), ) self._add_connection_full(tmp_junc_road) self.junction_roads.append(tmp_junc_road) self.startnum += 1 def _get_connection_type(self, road_idx): if ( self.incoming_roads[road_idx].successor and self.incoming_roads[road_idx].successor.element_id == self.id ): return "successor" else: return "predecessor" def _get_lane_width(self, lane_id, road_idx): if np.sign(lane_id) == -1: start_width = ( self.incoming_roads[road_idx] .lanes.lanesections[self._get_connecting_lane_section(road_idx)] .rightlanes[abs(lane_id) - 1] .get_width(0) ) else: start_width = ( self.incoming_roads[road_idx] .lanes.lanesections[self._get_connecting_lane_section(road_idx)] .leftlanes[abs(lane_id) - 1] .get_width(0) ) return start_width def _create_connecting_road_with_lane_input( self, road_one_id, road_two_id, lane_one_id, lane_two_id ): """_create_connecting_road_with_lane_input is a helper method that connects two roads with one lane on each road. It allows for connecting two roads that may have lanes of different widths and creates an appropriate geometry for the generated connecting road based on its start lane width and end lane width as read from the two input roads being connected Parameters ---------- road_one_id (int): the id of the first road to connect road_two_id (int): the id of the second road to connect lane_one_id (int): the lane id(s) on the first road to connect lane_two_id (int): the lane id(s) on the second road to connect """ idx1 = self._get_list_index(road_one_id) idx2 = self._get_list_index(road_two_id) start_offset = 0.0 end_offset = 0.0 if ( self._get_connection_type(idx2) == self._get_connection_type(idx1) and np.sign(lane_one_id) == np.sign(lane_two_id) ) or ( self._get_connection_type(idx2) != self._get_connection_type(idx1) and np.sign(lane_one_id) != np.sign(lane_two_id) ): raise MixingDrivingDirection( "driving direction not consistent when trying to make connection between roads:" + str(road_one_id) + " and " + str(road_two_id) ) if np.sign(lane_one_id) == -1: for lane_iter in range((np.sign(lane_one_id) * lane_one_id) - 1): start_offset += ( self.incoming_roads[idx1] .lanes.lanesections[self._get_connecting_lane_section(idx1)] .rightlanes[lane_iter] .get_width(0) ) else: for lane_iter in range((np.sign(lane_one_id) * lane_one_id) - 1): start_offset += ( self.incoming_roads[idx1] .lanes.lanesections[self._get_connecting_lane_section(idx1)] .leftlanes[lane_iter] .get_width(0) ) if np.sign(lane_two_id) == -1: for lane_iter in range((np.sign(lane_two_id) * lane_two_id) - 1): end_offset += ( self.incoming_roads[idx2] .lanes.lanesections[self._get_connecting_lane_section(idx2)] .rightlanes[lane_iter] .get_width(0) ) else: for lane_iter in range((np.sign(lane_two_id) * lane_two_id) - 1): end_offset += ( self.incoming_roads[idx2] .lanes.lanesections[self._get_connecting_lane_section(idx2)] .leftlanes[lane_iter] .get_width(0) ) start_width = self._get_lane_width(lane_one_id, idx1) end_width = self._get_lane_width(lane_two_id, idx2) if self._get_connection_type(idx1) == "successor": angle_offset_start = np.sign(lane_one_id) * np.pi / 2 else: angle_offset_start = -np.sign(lane_one_id) * np.pi / 2 if self._get_connection_type(idx2) == "successor": angle_offset_end = np.sign(lane_two_id) * np.pi / 2 else: angle_offset_end = -np.sign(lane_two_id) * np.pi / 2 if self._circular_junction: an1 = self._angles[idx2] - self._angles[idx1] - np.pi # adjust angle if multiple of pi if an1 > np.pi: an1 = -(2 * np.pi - an1) start_x = -self._radie[idx1] start_y = start_offset start_h = 0 end_x = self._radie[idx2] * np.cos(an1) + end_offset * np.cos( self._angles[idx2] + angle_offset_end ) end_y = self._radie[idx2] * np.sin(an1) + end_offset * np.sin( self._angles[idx2] + angle_offset_end ) end_h = an1 elif self._generic_junction: start_x = self._x[idx1] + start_offset * np.cos( self._h[idx1] + angle_offset_start ) start_y = self._y[idx1] + start_offset * np.sin( self._h[idx1] + angle_offset_start ) start_h = self._h[idx1] end_x = self._x[idx2] + end_offset * np.cos( self._h[idx2] + angle_offset_end ) end_y = self._y[idx2] + end_offset * np.sin( self._h[idx2] + angle_offset_end ) end_h = self._h[idx2] - np.pi clothoids = pcloth.SolveG2( start_x, start_y, start_h, STD_START_CLOTH, end_x, end_y, end_h, STD_START_CLOTH, ) roadgeoms = [ Spiral(x.KappaStart, x.KappaEnd, length=x.length) for x in clothoids ] if self._get_connection_type(idx1) == "successor": if lane_one_id < 0: num_left_lanes = 0 num_right_lanes = 1 else: num_left_lanes = 1 num_right_lanes = 0 else: if lane_one_id < 0: num_left_lanes = 1 num_right_lanes = 0 else: num_left_lanes = 0 num_right_lanes = 1 tmp_junc_road = create_road( roadgeoms, self.startnum, left_lanes=num_left_lanes, right_lanes=num_right_lanes, lane_width=start_width, road_type=self.id, lane_width_end=end_width, ) pred_lane_offset = np.sign(lane_one_id) * (abs(lane_one_id) - 1) if self._get_connection_type(idx1) == "predecessor": pred_lane_offset = -pred_lane_offset succ_lane_offset = np.sign(lane_two_id) * (abs(lane_two_id) - 1) if self._get_connection_type(idx2) == "predecessor": succ_lane_offset = -succ_lane_offset tmp_junc_road.add_predecessor( ElementType.road, road_one_id, contact_point=self._get_contact_point_connecting_road(road_one_id), lane_offset=pred_lane_offset, ) tmp_junc_road.add_successor( ElementType.road, road_two_id, contact_point=self._get_contact_point_connecting_road(road_two_id), lane_offset=succ_lane_offset, ) # add offsets to the incomming roads self._set_offset_for_incoming_road(idx1, tmp_junc_road.id, -pred_lane_offset) self._set_offset_for_incoming_road(idx2, tmp_junc_road.id, -succ_lane_offset) self.junction_roads.append(tmp_junc_road) connection = Connection(road_one_id, tmp_junc_road.id, ContactPoint.start) if num_left_lanes: connection.add_lanelink(lane_one_id, 1) else: connection.add_lanelink(lane_one_id, -1) self.junction.add_connection(connection) self.startnum += 1 def _add_connection_full(self, connecting_road): """_add_connection_full is a helper method creating connections for connections with equal amount of lanes Parameters ---------- connecting_road (Road): the connecting road """ conne1 = Connection( connecting_road.successor.element_id, connecting_road.id, ContactPoint.end ) _, sign, _ = _get_related_lanesection( connecting_road, self.incoming_roads[ self._get_list_index(connecting_road.successor.element_id) ], ) _create_junction_links( conne1, len(connecting_road.lanes.lanesections[-1].rightlanes), -1, sign, to_offset=connecting_road.lane_offset_suc[ str(connecting_road.successor.element_id) ], ) _create_junction_links( conne1, len(connecting_road.lanes.lanesections[-1].leftlanes), 1, sign, to_offset=connecting_road.lane_offset_suc[ str(connecting_road.successor.element_id) ], ) self.junction.add_connection(conne1) # handle predecessor lanes conne2 = Connection( connecting_road.predecessor.element_id, connecting_road.id, ContactPoint.start, ) _, sign, _ = _get_related_lanesection( connecting_road, self.incoming_roads[ self._get_list_index(connecting_road.predecessor.element_id) ], ) _create_junction_links( conne2, len(connecting_road.lanes.lanesections[0].rightlanes), -1, sign, from_offset=connecting_road.lane_offset_pred[ str(connecting_road.predecessor.element_id) ], ) _create_junction_links( conne2, len(connecting_road.lanes.lanesections[0].leftlanes), 1, sign, from_offset=connecting_road.lane_offset_pred[ str(connecting_road.predecessor.element_id) ], ) self.junction.add_connection(conne2) def _create_geometry_from_carthesian(self, idx1, idx2): """_create_geometry_from_carthesian creates a connecting road between two roads added with carthesian coordinates Parameters ---------- idx1 (int): index of the first road idx2 (int): index of the second road Returns ------- list of Geometries """ an1 = self._h[idx2] - self._h[idx1] - np.pi # adjust angle if multiple of pi if an1 > np.pi: an1 = -(2 * np.pi - an1) clothoids = pcloth.SolveG2( self._x[idx1], self._y[idx1], self._h[idx1], STD_START_CLOTH, self._x[idx2], self._y[idx2], self._h[idx2] - np.pi, STD_START_CLOTH, ) roadgeoms = [ Spiral(x.KappaStart, x.KappaEnd, length=x.length) for x in clothoids ] return roadgeoms def _create_geometry_from_circular(self, idx1, idx2): """_create_geometry_from_circular creates a connecting road between two roads added with circular geometry Parameters ---------- idx1 (int): index of the first road idx2 (int): index of the second road Returns ------- list of Geometries """ an1 = self._angles[idx2] - self._angles[idx1] - np.pi # adjust angle if multiple of pi if an1 > np.pi: an1 = -(2 * np.pi - an1) if np.sign(an1) == 0: roadgeoms = [Line(self._radie[idx1] + self._radie[idx2])] else: clothoids = pcloth.SolveG2( -self._radie[idx1], 0, 0, STD_START_CLOTH, self._radie[idx2] * np.cos(an1), self._radie[idx2] * np.sin(an1), an1, STD_START_CLOTH, ) roadgeoms = [ Spiral(x.KappaStart, x.KappaEnd, length=x.length) for x in clothoids ] return roadgeoms def get_connecting_roads(self): """returns the connecting roads generated for the junction Returns ------- list of Road """ return self.junction_roads
Methods
def add_connection(self, road_one_id, road_two_id, lane_one_id=None, lane_two_id=None)
-
add_connection adds a connection between two roads by generating three clothoids to fit their given positions in the local coordinate system (circular or cartesian) All roads has to be added to the junction with either: add_incoming_road_cartesian_geometry add_incoming_road_circular_geometry
NOTE: if no lane ids are provided, add_connection will add as many connections as the two roads have in common
Parameters
road_one_id (int): the id of the first road to connect road_two_id (int): the id of the second road to connect lane_one_id (int or list of int): the lane id(s) on the first road to connect lane_two_id (int or list of int): the lane id(s) on the second road to connect
Expand source code
def add_connection( self, road_one_id, road_two_id, lane_one_id=None, lane_two_id=None ): """add_connection adds a connection between two roads by generating three clothoids to fit their given positions in the local coordinate system (circular or cartesian) All roads has to be added to the junction with either: add_incoming_road_cartesian_geometry add_incoming_road_circular_geometry NOTE: if no lane ids are provided, add_connection will add as many connections as the two roads have in common Parameters ---------- road_one_id (int): the id of the first road to connect road_two_id (int): the id of the second road to connect lane_one_id (int or list of int): the lane id(s) on the first road to connect lane_two_id (int or list of int): the lane id(s) on the second road to connect """ if (lane_one_id == None) and (lane_two_id == None): # check if the connection is symetric (same number of lanes on both sides) if ( self._number_of_left_lanes[self._get_list_index(road_one_id)] == self._number_of_right_lanes[self._get_list_index(road_one_id)] and self._number_of_left_lanes[self._get_list_index(road_two_id)] == self._number_of_right_lanes[self._get_list_index(road_two_id)] and self._number_of_left_lanes[self._get_list_index(road_one_id)] == self._number_of_right_lanes[self._get_list_index(road_two_id)] ): self._create_connecting_roads_with_equal_lanes(road_one_id, road_two_id) else: self._create_connecting_roads_unequal_lanes(road_one_id, road_two_id) elif lane_one_id is not None and lane_two_id is not None: if isinstance(lane_one_id, list): for i in range(len(lane_one_id)): self._create_connecting_road_with_lane_input( road_one_id, road_two_id, lane_one_id[i], lane_two_id[i] ) else: self._create_connecting_road_with_lane_input( road_one_id, road_two_id, lane_one_id, lane_two_id ) else: raise NotEnoughInputArguments( "if lane input is used, both has to be provided" )
def add_incoming_road_cartesian_geometry(self, road, x, y, heading, road_connection=None)
-
add_incoming_road_cartesian_geometry adds an incoming road to a junction, assuming a local coordinate system for the junction.
NOTE: Note, this method can not be used toghether with add_incoming_road_circular_geometry
Parameters
road (Road): the incoming road x (float): the local x-position of the road y (float): the local y-position of the road heading (float): the local heading of the road (pointing in to the junction) road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor' Default: None
Expand source code
def add_incoming_road_cartesian_geometry( self, road, x, y, heading, road_connection=None ): """add_incoming_road_cartesian_geometry adds an incoming road to a junction, assuming a local coordinate system for the junction. NOTE: Note, this method can not be used toghether with add_incoming_road_circular_geometry Parameters ---------- road (Road): the incoming road x (float): the local x-position of the road y (float): the local y-position of the road heading (float): the local heading of the road (pointing in to the junction) road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor' Default: None """ self.incoming_roads.append(road) self._handle_connection_input(road, road_connection) self._x.append(x) self._y.append(y) self._h.append(heading) self._generic_junction = True
def add_incoming_road_circular_geometry(self, road, radius, angle, road_connection=None)
-
add_incoming_road_circular_geometry adds an incoming road to a junction, assuming a cirular geometry of the junction, Meaning all roads will be placed on a circle based on the radius and angle. The radius can be different from different incoming roads, however the origin stays the same.
NOTE: Note, this method can not be used toghether with add_incoming_road_cartesian_geometry
Parameters
road (Road): the incoming road radius (float): the radius on where to put the road angle (float): the angle on where to put the road road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor' Default: None
Expand source code
def add_incoming_road_circular_geometry( self, road, radius, angle, road_connection=None ): """add_incoming_road_circular_geometry adds an incoming road to a junction, assuming a cirular geometry of the junction, Meaning all roads will be placed on a circle based on the radius and angle. The radius can be different from different incoming roads, however the origin stays the same. NOTE: Note, this method can not be used toghether with add_incoming_road_cartesian_geometry Parameters ---------- road (Road): the incoming road radius (float): the radius on where to put the road angle (float): the angle on where to put the road road_connection (str): can be used to say how the incoming road connects to the junction 'predecessor', or 'successor' Default: None """ self.incoming_roads.append(road) self._handle_connection_input(road, road_connection) self._radie.append(radius) self._angles.append(angle) self._circular_junction = True
def get_connecting_roads(self)
-
returns the connecting roads generated for the junction
Returns
list of Road
Expand source code
def get_connecting_roads(self): """returns the connecting roads generated for the junction Returns ------- list of Road """ return self.junction_roads
class DirectJunctionCreator (id, name)
-
DirectJunctionCreator is a helper class to create custom direct junctions.
Parameters
id (int): the id of the junction name (str): name of the junction
Attributes
id (int): the id of the junction junction (Junction): the junction xodr element for the junction
Methods
add_connection(first_road_id, second_road_id, first_lane_id, second_lane_id)
Initalize the DirectJunctionCreator
Parameters
id (int): the id of the junction name (str): name of the junction
Expand source code
class DirectJunctionCreator: """DirectJunctionCreator is a helper class to create custom direct junctions. Parameters ---------- id (int): the id of the junction name (str): name of the junction Attributes ---------- id (int): the id of the junction junction (Junction): the junction xodr element for the junction Methods ------- add_connection(first_road_id, second_road_id, first_lane_id, second_lane_id) """ def __init__(self, id, name): """Initalize the DirectJunctionCreator Parameters ---------- id (int): the id of the junction name (str): name of the junction """ self.id = id self.junction = Junction(name, id, JunctionType.direct) self._incoming_lane_ids = [] self._linked_lane_ids = [] def _get_minimum_lanes_to_connect(self, incoming_road, linked_road): incoming_connection, _, incoming_lane_section = _get_related_lanesection( incoming_road, linked_road ) linked_connection, sign, linked_lane_section = _get_related_lanesection( linked_road, incoming_road ) incoming_left_lanes = len( incoming_road.lanes.lanesections[incoming_lane_section].leftlanes ) incoming_right_lanes = len( incoming_road.lanes.lanesections[incoming_lane_section].rightlanes ) linked_left_lanes = len( linked_road.lanes.lanesections[linked_lane_section].leftlanes ) linked_right_lanes = len( linked_road.lanes.lanesections[linked_lane_section].rightlanes ) self._incoming_lane_ids = [] self._linked_lane_ids = [] # if incoming_connection == "successor" and linked_connection == "predecessor" or incoming_connection == "predecessor" and linked_connection == "successor": if sign > 0: self._incoming_lane_ids.extend( [x for x in range(-min(incoming_right_lanes, linked_right_lanes), 0, 1)] ) self._linked_lane_ids.extend( [x for x in range(-min(incoming_right_lanes, linked_right_lanes), 0, 1)] ) self._incoming_lane_ids.extend( [ x for x in range( 1, min(incoming_left_lanes, linked_left_lanes) + 1, 1 ) ] ) self._linked_lane_ids.extend( [ x for x in range( 1, min(incoming_left_lanes, linked_left_lanes) + 1, 1 ) ] ) elif ( sign < 0 ): # incoming_connection == "successor" and linked_connection == "successor" or incoming_connection == "predecessor" and linked_connection == "predecessor": self._incoming_lane_ids.extend( [-x for x in range(-min(incoming_left_lanes, linked_right_lanes), 0, 1)] ) self._linked_lane_ids.extend( [x for x in range(-min(incoming_left_lanes, linked_right_lanes), 0, 1)] ) self._incoming_lane_ids.extend( [ -x for x in range( 1, min(incoming_right_lanes, linked_left_lanes) + 1, 1 ) ] ) self._linked_lane_ids.extend( [ x for x in range( 1, min(incoming_right_lanes, linked_left_lanes) + 1, 1 ) ] ) def _get_contact_point_linked_road(self, incoming_road): """_get_contact_point_linked_road is a helper method to get the ContactPoint for a linked road Parameters ---------- road_id (int): id of the incoming road Returns ------- contact_point (ContactPoint) """ if incoming_road.successor and incoming_road.successor.element_id == self.id: return ContactPoint.end elif ( incoming_road.predecessor and incoming_road.predecessor.element_id == self.id ): return ContactPoint.start else: raise AttributeError("road is not connected to this junction") def add_connection( self, incoming_road, linked_road, incoming_lane_ids=None, linked_lane_ids=None ): """add_connection adds a connection between an incoming_road and a linked_road. Withouth any lane information, it will add connections to all lanes that the two roads have in common Parameters ---------- incoming_road (Road): the incoming road linked_road (Road): the linked road incoming_lane_ids (int or list of ints): the incoming lane ids to connect Default: None linked_lane_ids (int or list of ints): the linked lane ids to connect Default: None """ linked_lane_offset = 0 inc_lane_offset = 0 incoming_main_road = False if incoming_lane_ids == None and linked_lane_ids == None: self._get_minimum_lanes_to_connect(incoming_road, linked_road) elif incoming_lane_ids is not None and linked_lane_ids is not None: if not isinstance(incoming_lane_ids, list): self._incoming_lane_ids = [incoming_lane_ids] else: self._incoming_lane_ids = incoming_lane_ids if not isinstance(linked_lane_ids, list): self._linked_lane_ids = [linked_lane_ids] if abs(linked_lane_ids) == 1: incoming_main_road = True else: self._linked_lane_ids = linked_lane_ids if min([abs(x) for x in self._linked_lane_ids]) == 1: incoming_main_road = True # sanity check for i in range(len(self._incoming_lane_ids)): if self._get_contact_point_linked_road( incoming_road ) == self._get_contact_point_linked_road(linked_road): if np.sign(self._incoming_lane_ids[i]) == np.sign( self._linked_lane_ids[i] ): raise MixingDrivingDirection( "driving direction not consistent when trying to make connection between roads:" + str(incoming_road.id) + " and " + str(linked_road.id) ) else: if np.sign(self._incoming_lane_ids[i]) != np.sign( self._linked_lane_ids[i] ): raise MixingDrivingDirection( "driving direction not consistent when trying to make connection between roads:" + str(incoming_road.id) + " and " + str(linked_road.id) ) if len(self._linked_lane_ids) != len(self._linked_lane_ids): raise NotSameAmountOfLanesError( "the incoming_lane_ids and linked_lane_ids are not the same length" ) if abs(self._incoming_lane_ids[0]) != abs(self._linked_lane_ids[0]): lane_offset = abs( abs(self._incoming_lane_ids[0]) - abs(self._linked_lane_ids[0]) ) if incoming_main_road: linked_lane_offset = np.sign(self._linked_lane_ids[0]) * lane_offset inc_lane_offset = ( -1 * np.sign(self._incoming_lane_ids[0] * self._linked_lane_ids[0]) * linked_lane_offset ) else: inc_lane_offset = np.sign(self._incoming_lane_ids[0]) * lane_offset linked_lane_offset = ( -1 * np.sign(self._incoming_lane_ids[0] * self._linked_lane_ids[0]) * inc_lane_offset ) if ( incoming_road.predecessor and incoming_road.predecessor.element_id == self.id ): incoming_road.pred_direct_junction[linked_road.id] = inc_lane_offset else: incoming_road.succ_direct_junction[linked_road.id] = inc_lane_offset if linked_road.predecessor and linked_road.predecessor.element_id == self.id: linked_road.pred_direct_junction[incoming_road.id] = linked_lane_offset else: linked_road.succ_direct_junction[incoming_road.id] = linked_lane_offset connection = Connection( incoming_road.id, linked_road.id, self._get_contact_point_linked_road(linked_road), ) for i in range(len(self._incoming_lane_ids)): connection.add_lanelink( self._incoming_lane_ids[i], self._linked_lane_ids[i] ) self.junction.add_connection(connection)
Methods
def add_connection(self, incoming_road, linked_road, incoming_lane_ids=None, linked_lane_ids=None)
-
add_connection adds a connection between an incoming_road and a linked_road. Withouth any lane information, it will add connections to all lanes that the two roads have in common
Parameters
incoming_road (Road): the incoming road linked_road (Road): the linked road incoming_lane_ids (int or list of ints): the incoming lane ids to connect Default: None linked_lane_ids (int or list of ints): the linked lane ids to connect Default: None
Expand source code
def add_connection( self, incoming_road, linked_road, incoming_lane_ids=None, linked_lane_ids=None ): """add_connection adds a connection between an incoming_road and a linked_road. Withouth any lane information, it will add connections to all lanes that the two roads have in common Parameters ---------- incoming_road (Road): the incoming road linked_road (Road): the linked road incoming_lane_ids (int or list of ints): the incoming lane ids to connect Default: None linked_lane_ids (int or list of ints): the linked lane ids to connect Default: None """ linked_lane_offset = 0 inc_lane_offset = 0 incoming_main_road = False if incoming_lane_ids == None and linked_lane_ids == None: self._get_minimum_lanes_to_connect(incoming_road, linked_road) elif incoming_lane_ids is not None and linked_lane_ids is not None: if not isinstance(incoming_lane_ids, list): self._incoming_lane_ids = [incoming_lane_ids] else: self._incoming_lane_ids = incoming_lane_ids if not isinstance(linked_lane_ids, list): self._linked_lane_ids = [linked_lane_ids] if abs(linked_lane_ids) == 1: incoming_main_road = True else: self._linked_lane_ids = linked_lane_ids if min([abs(x) for x in self._linked_lane_ids]) == 1: incoming_main_road = True # sanity check for i in range(len(self._incoming_lane_ids)): if self._get_contact_point_linked_road( incoming_road ) == self._get_contact_point_linked_road(linked_road): if np.sign(self._incoming_lane_ids[i]) == np.sign( self._linked_lane_ids[i] ): raise MixingDrivingDirection( "driving direction not consistent when trying to make connection between roads:" + str(incoming_road.id) + " and " + str(linked_road.id) ) else: if np.sign(self._incoming_lane_ids[i]) != np.sign( self._linked_lane_ids[i] ): raise MixingDrivingDirection( "driving direction not consistent when trying to make connection between roads:" + str(incoming_road.id) + " and " + str(linked_road.id) ) if len(self._linked_lane_ids) != len(self._linked_lane_ids): raise NotSameAmountOfLanesError( "the incoming_lane_ids and linked_lane_ids are not the same length" ) if abs(self._incoming_lane_ids[0]) != abs(self._linked_lane_ids[0]): lane_offset = abs( abs(self._incoming_lane_ids[0]) - abs(self._linked_lane_ids[0]) ) if incoming_main_road: linked_lane_offset = np.sign(self._linked_lane_ids[0]) * lane_offset inc_lane_offset = ( -1 * np.sign(self._incoming_lane_ids[0] * self._linked_lane_ids[0]) * linked_lane_offset ) else: inc_lane_offset = np.sign(self._incoming_lane_ids[0]) * lane_offset linked_lane_offset = ( -1 * np.sign(self._incoming_lane_ids[0] * self._linked_lane_ids[0]) * inc_lane_offset ) if ( incoming_road.predecessor and incoming_road.predecessor.element_id == self.id ): incoming_road.pred_direct_junction[linked_road.id] = inc_lane_offset else: incoming_road.succ_direct_junction[linked_road.id] = inc_lane_offset if linked_road.predecessor and linked_road.predecessor.element_id == self.id: linked_road.pred_direct_junction[incoming_road.id] = linked_lane_offset else: linked_road.succ_direct_junction[incoming_road.id] = linked_lane_offset connection = Connection( incoming_road.id, linked_road.id, self._get_contact_point_linked_road(linked_road), ) for i in range(len(self._incoming_lane_ids)): connection.add_lanelink( self._incoming_lane_ids[i], self._linked_lane_ids[i] ) self.junction.add_connection(connection)