# See LICENSE for licensing information. # #Copyright (c) 2016-2019 Regents of the University of California and The Board #of Regents for the Oklahoma Agricultural and Mechanical College #(acting for and on behalf of Oklahoma State University) #All rights reserved. # from itertools import tee import debug from heapq import heappush,heappop from copy import deepcopy from grid import grid from grid_path import grid_path from vector3d import vector3d class signal_grid(grid): """ Expand the two layer grid to include A* search functions for a source and target. """ def __init__(self, ll, ur, track_factor): """ Create a routing map of width x height cells and 2 in the z-axis. """ grid.__init__(self, ll, ur, track_factor) # priority queue for the maze routing self.q = [] def reinit(self): """ Reinitialize everything for a new route. """ # Reset all the cells in the map for p in self.map.values(): p.reset() # clear source and target pins self.source=[] self.target=[] # Clear the queue while len(self.q)>0: heappop(self.q) self.counter = 0 def init_queue(self): """ Populate the queue with all the source pins with cost to the target. Each item is a path of the grid cells. We will use an A* search, so this cost must be pessimistic. Cost so far will be the length of the path. """ #debug.info(3,"Initializing queue.") # Counter is used to not require data comparison in Python 3.x # Items will be returned in order they are added during cost ties self.counter = 0 for s in self.source: cost = self.cost_to_target(s) debug.info(3,"Init: cost=" + str(cost) + " " + str([s])) heappush(self.q,(cost,self.counter,grid_path([vector3d(s)]))) self.counter+=1 def route(self,detour_scale): """ This does the A* maze routing with preferred direction routing. This only works for 1 track wide routes! """ # We set a cost bound of the HPWL for run-time. This can be # over-ridden if the route fails due to pruning a feasible solution. any_source_element = next(iter(self.source)) cost_bound = detour_scale*self.cost_to_target(any_source_element)*grid.PREFERRED_COST # Check if something in the queue is already a source and a target! for s in self.source: if self.is_target(s): return((grid_path([vector3d(s)]),0)) # Make sure the queue is empty if we run another route while len(self.q)>0: heappop(self.q) # Put the source items into the queue self.init_queue() cheapest_path = None cheapest_cost = None # Keep expanding and adding to the priority queue until we are done while len(self.q)>0: # should we keep the path in the queue as well or just the final node? (cost,count,curpath) = heappop(self.q) debug.info(3,"Queue size: size=" + str(len(self.q)) + " " + str(cost)) debug.info(4,"Expanding: cost=" + str(cost) + " " + str(curpath)) # expand the last element neighbors = self.expand_dirs(curpath) debug.info(4,"Neighbors: " + str(neighbors)) for n in neighbors: # make a new copy of the path to not update the old ones newpath = deepcopy(curpath) # node is added to the map by the expand routine newpath.append(n) # check if we hit the target and are done if self.is_target(n[0]): # This uses the [0] item because we are assuming 1-track wide return (newpath,newpath.cost()) else: # current path cost + predicted cost current_cost = newpath.cost() target_cost = self.cost_to_target(n[0]) predicted_cost = current_cost + target_cost # only add the cost if it is less than our bound if (predicted_cost < cost_bound): if (self.map[n[0]].min_cost==-1 or predicted_cost