使用OpenCV怎么實(shí)現(xiàn)道路車輛計(jì)數(shù)功能,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
目前成都創(chuàng)新互聯(lián)已為上1000家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、網(wǎng)站托管維護(hù)、企業(yè)網(wǎng)站設(shè)計(jì)、延津網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
代碼如下所示:
import osimport loggingimport logging.handlersimport randomimport numpy as npimport skvideo.ioimport cv2import matplotlib.pyplot as pltimport utils# without this some strange errors happencv2.ocl.setUseOpenCL(False)random.seed(123)# ============================================================================IMAGE_DIR = "./out"VIDEO_SOURCE = "input.mp4"SHAPE = (720, 1280) # HxW# ============================================================================def train_bg_subtractor(inst, cap, num=500):'''BG substractor need process some amount of frames to start giving result'''print ('Training BG Subtractor...')i = 0for frame in cap:inst.apply(frame, None, 0.001)i += 1if i >= num:return capdef main():log = logging.getLogger("main")# creting MOG bg subtractor with 500 frames in cache# and shadow detctionbg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, detectShadows=True)# Set up image source# You can use also CV2, for some reason it not working for mecap = skvideo.io.vreader(VIDEO_SOURCE)# skipping 500 frames to train bg subtractortrain_bg_subtractor(bg_subtractor, cap, num=500)frame_number = -1for frame in cap:if not frame.any():log.error("Frame capture failed, stopping...")breakframe_number += 1utils.save_frame(frame, "./out/frame_%04d.png" % frame_number)fg_mask = bg_subtractor.apply(frame, None, 0.001)utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number)# ============================================================================if __name__ == "__main__":log = utils.init_logging()if not os.path.exists(IMAGE_DIR):log.debug("Creating image directory `%s`...", IMAGE_DIR)os.makedirs(IMAGE_DIR)main()
處理后得到下面的前景圖像

去除背景后的前景圖像
我們可以看出前景圖像上有一些噪音,可以通過標(biāo)準(zhǔn)濾波技術(shù)可以將其消除。
濾波
針對(duì)我們現(xiàn)在的情況,我們將需要以下濾波函數(shù):Threshold、Erode、Dilate、Opening、Closing。
首先,我們使用“Closing”來移除區(qū)域中的間隙,然后使用“Opening”來移除個(gè)別獨(dú)立的像素點(diǎn),然后使用“Dilate”進(jìn)行擴(kuò)張以使對(duì)象變粗。代碼如下:
def filter_mask(img): kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations=2) # threshold th = dilation[dilation < 240] = 0 return th
處理后的前景如下:

利用輪廓進(jìn)行物體檢測(cè)
我們將使用cv2.findContours函數(shù)對(duì)輪廓進(jìn)行檢測(cè)。我們?cè)谑褂玫臅r(shí)候可以選擇的參數(shù)為:
cv2.CV_RETR_EXTERNAL------僅獲取外部輪廓。
cv2.CV_CHAIN_APPROX_TC89_L1------使用Teh-Chin鏈逼近算法(更快)
代碼如下:
def get_centroid(x, y, w, h): x1 = int(w / 2) y1 = int(h / 2) cx = x + x1 cy = y + y1 return (cx, cy) def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35): matches = [] # finding external contours im, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) # filtering by with, height for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= min_contour_width) and ( h >= min_contour_height) if not contour_valid: continue # getting center of the bounding box centroid = get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches
建立數(shù)據(jù)處理框架
我們都知道在ML和CV中,沒有一個(gè)算法可以處理所有問題。即使存在這種算法,我們也不會(huì)使用它,因?yàn)樗茈y大規(guī)模有效。例如幾年前Netflix公司用300萬美元的獎(jiǎng)金懸賞最佳電影推薦算法。有一個(gè)團(tuán)隊(duì)完成這個(gè)任務(wù),但是他們的推薦算法無法大規(guī)模運(yùn)行,因此其實(shí)對(duì)公司毫無用處。但是,Netflix公司仍獎(jiǎng)勵(lì)了他們100萬美元。
接下來我們來建立解決當(dāng)前問題的框架,這樣可以使數(shù)據(jù)的處理更加方便
class PipelineRunner(object): ''' Very simple pipline. Just run passed processors in order with passing context from one to another. You can also set log level for processors. ''' def __init__(self, pipeline=None, log_level=logging.DEBUG): self.pipeline = pipeline or [] self.context = {} self.log = logging.getLogger(self.__class__.__name__) self.log.setLevel(log_level) self.log_level = log_level self.set_log_level() def set_context(self, data): self.context = data def add(self, processor): if not isinstance(processor, PipelineProcessor): raise Exception( 'Processor should be an isinstance of PipelineProcessor.') processor.log.setLevel(self.log_level) self.pipeline.append(processor) def remove(self, name): for i, p in enumerate(self.pipeline): if p.__class__.__name__ == name: del self.pipeline[i] return True return False def set_log_level(self): for p in self.pipeline: p.log.setLevel(self.log_level) def run(self): for p in self.pipeline: self.context = p(self.context) self.log.debug("Frame #%d processed.", self.context['frame_number']) return self.context class PipelineProcessor(object): ''' Base class for processors. ''' def __init__(self): self.log = logging.getLogger(self.__class__.__name__) 首先我們獲取一張?zhí)幚砥鬟\(yùn)行順序的列表,讓每個(gè)處理器完成一部分工作,在案順序完成執(zhí)行以獲得最終結(jié)果。
我們首先創(chuàng)建輪廓檢測(cè)處理器。輪廓檢測(cè)處理器只需將前面的背景扣除,濾波和輪廓檢測(cè)部分合并在一起即可,代碼如下所示:
class ContourDetection(PipelineProcessor): ''' Detecting moving objects. Purpose of this processor is to subtrac background, get moving objects and detect them with a cv2.findContours method, and then filter off-by width and height. bg_subtractor - background subtractor isinstance. min_contour_width - min bounding rectangle width. min_contour_height - min bounding rectangle height. save_image - if True will save detected objects mask to file. image_dir - where to save images(must exist). ''' def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'): super(ContourDetection, self).__init__() self.bg_subtractor = bg_subtractor self.min_contour_width = min_contour_width self.min_contour_height = min_contour_height self.save_image = save_image self.image_dir = image_dir def filter_mask(self, img, a=None): ''' This filters are hand-picked just based on visual tests ''' kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)) # Fill any small holes closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel) # Remove noise opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) # Dilate to merge adjacent blobs dilation = cv2.dilate(opening, kernel, iterations=2) return dilation def detect_vehicles(self, fg_mask, context): matches = [] # finding external contours im2, contours, hierarchy = cv2.findContours( fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1) for (i, contour) in enumerate(contours): (x, y, w, h) = cv2.boundingRect(contour) contour_valid = (w >= self.min_contour_width) and ( h >= self.min_contour_height) if not contour_valid: continue centroid = utils.get_centroid(x, y, w, h) matches.append(((x, y, w, h), centroid)) return matches def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] fg_mask = self.bg_subtractor.apply(frame, None, 0.001) # just thresholding values fg_mask[fg_mask < 240] = 0 fg_mask = self.filter_mask(fg_mask, frame_number) if self.save_image: utils.save_frame(fg_mask, self.image_dir + "/mask_%04d.png" % frame_number, flip=False) context['objects'] = self.detect_vehicles(fg_mask, context) context['fg_mask'] = fg_mask return contex
現(xiàn)在,讓我們創(chuàng)建一個(gè)處理器,該處理器將找出不同的幀上檢測(cè)到的相同對(duì)象,創(chuàng)建路徑,并對(duì)到達(dá)出口區(qū)域的車輛進(jìn)行計(jì)數(shù)。代碼如下所示:
'''Counting vehicles that entered in exit zone.Purpose of this class based on detected object and local cache createobjects pathes and count that entered in exit zone defined by exit masks.exit_masks - list of the exit masks.path_size - max number of points in a path.max_dst - max distance between two points.'''def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0):super(VehicleCounter, self).__init__()self.exit_masks = exit_masksself.vehicle_count = 0self.path_size = path_sizeself.pathes = []self.max_dst = max_dstself.x_weight = x_weightself.y_weight = y_weightdef check_exit(self, point):for exit_mask in self.exit_masks:try:if exit_mask[point[1]][point[0]] == 255:return Trueexcept:return Truereturn Falsedef __call__(self, context):objects = context['objects']context['exit_masks'] = self.exit_maskscontext['pathes'] = self.pathescontext['vehicle_count'] = self.vehicle_countif not objects:return contextpoints = np.array(objects)[:, 0:2]points = points.tolist()# add new points if pathes is emptyif not self.pathes:for match in points:self.pathes.append([match])else:# link new points with old pathes based on minimum distance between# pointsnew_pathes = []for path in self.pathes:_min = 999999_match = Nonefor p in points:if len(path) == 1:# distance from last point to currentd = utils.distance(p[0], path[-1][0])else:# based on 2 prev points predict next point and calculate# distance from predicted next point to currentxn = 2 * path[-1][0][0] - path[-2][0][0]yn = 2 * path[-1][0][1] - path[-2][0][1]d = utils.distance(p[0], (xn, yn),x_weight=self.x_weight,y_weight=self.y_weight)if d < _min:_min = d_match = pif _match and _min <= self.max_dst:points.remove(_match)path.append(_match)new_pathes.append(path)# do not drop path if current frame has no matchesif _match is None:new_pathes.append(path)self.pathes = new_pathes# add new pathesif len(points):for p in points:# do not add points that already should be countedif self.check_exit(p[1]):continueself.pathes.append([p])# save only last N points in pathfor i, _ in enumerate(self.pathes):self.pathes[i] = self.pathes[i][self.path_size * -1:]# count vehicles and drop counted pathes:new_pathes = []for i, path in enumerate(self.pathes):d = path[-2:]if (# need at list two points to countlen(d) >= 2 and# prev point not in exit zonenot self.check_exit(d[0][1]) and# current point in exit zoneself.check_exit(d[1][1]) and# path len is bigger then minself.path_size <= len(path)):self.vehicle_count += 1else:# prevent linking with path that already in exit zoneadd = Truefor p in path:if self.check_exit(p[1]):add = Falsebreakif add:new_pathes.append(path)self.pathes = new_pathescontext['pathes'] = self.pathescontext['objects'] = objectscontext['vehicle_count'] = self.vehicle_countself.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)return context
上面的代碼有點(diǎn)復(fù)雜,因此讓我們一個(gè)部分一個(gè)部分的介紹一下。

上面的圖像中綠色的部分是出口區(qū)域。我們?cè)谶@里對(duì)車輛進(jìn)行計(jì)數(shù),只有當(dāng)車輛移動(dòng)的長(zhǎng)度超過3個(gè)點(diǎn)我們才進(jìn)行計(jì)算
我們使用掩碼來解決這個(gè)問題,因?yàn)樗仁褂檬噶克惴ㄓ行液?jiǎn)單得多。只需使用“二進(jìn)制和”即可選出車輛區(qū)域中點(diǎn)。設(shè)置方式如下:
EXIT_PTS = np.array([ [[732, 720], [732, 590], [1280, 500], [1280, 720]], [[0, 400], [645, 400], [645, 0], [0, 0]] ]) base = np.zeros(SHAPE + (3,), dtype='uint8') exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]
現(xiàn)在我們將檢測(cè)到的點(diǎn)鏈接起來。
對(duì)于第一幀圖像,我們將所有點(diǎn)均添加為新路徑。
接下來,如果len(path)== 1,我們?cè)谛聶z測(cè)到的對(duì)象中找到與每條路徑最后一點(diǎn)距離最近的對(duì)象。
如果len(path)> 1,則使用路徑中的最后兩個(gè)點(diǎn),即在同一條線上預(yù)測(cè)新點(diǎn),并找到該點(diǎn)與當(dāng)前點(diǎn)之間的最小距離。
具有最小距離的點(diǎn)將添加到當(dāng)前路徑的末端并從列表中刪除。如果在此之后還剩下一些點(diǎn),我們會(huì)將其添加為新路徑。這個(gè)過程中我們還會(huì)限制路徑中的點(diǎn)數(shù)。
new_pathes = [] for path in self.pathes: _min = 999999 _match = None for p in points: if len(path) == 1: # distance from last point to current d = utils.distance(p[0], path[-1][0]) else: # based on 2 prev points predict next point and calculate # distance from predicted next point to current xn = 2 * path[-1][0][0] - path[-2][0][0] yn = 2 * path[-1][0][1] - path[-2][0][1] d = utils.distance( p[0], (xn, yn), x_weight=self.x_weight, y_weight=self.y_weight ) if d < _min: _min = d _match = p if _match and _min <= self.max_dst: points.remove(_match) path.append(_match) new_pathes.append(path) # do not drop path if current frame has no matches if _match is None: new_pathes.append(path) self.pathes = new_pathes # add new pathes if len(points): for p in points: # do not add points that already should be counted if self.check_exit(p[1]): continue self.pathes.append([p]) # save only last N points in path for i, _ in enumerate(self.pathes): self.pathes[i] = self.pathes[i][self.path_size * -1:]
現(xiàn)在,我們將嘗試計(jì)算進(jìn)入出口區(qū)域的車輛。為此,我們需獲取路徑中的最后2個(gè)點(diǎn),并檢查len(path)是否應(yīng)大于限制。
# count vehicles and drop counted pathes: new_pathes = [] for i, path in enumerate(self.pathes): d = path[-2:] if ( # need at list two points to count len(d) >= 2 and # prev point not in exit zone not self.check_exit(d[0][1]) and # current point in exit zone self.check_exit(d[1][1]) and # path len is bigger then min self.path_size <= len(path) ): self.vehicle_count += 1 else: # prevent linking with path that already in exit zone add = True for p in path: if self.check_exit(p[1]): add = False break if add: new_pathes.append(path) self.pathes = new_pathes context['pathes'] = self.pathes context['objects'] = objects context['vehicle_count'] = self.vehicle_count self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count) return context 最后兩個(gè)處理器是CSV編寫器,用于創(chuàng)建報(bào)告CSV文件,以及用于調(diào)試和精美圖片的可視化。
class CsvWriter(PipelineProcessor): def __init__(self, path, name, start_time=0, fps=15): super(CsvWriter, self).__init__() self.fp = open(os.path.join(path, name), 'w') self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles']) self.writer.writeheader() self.start_time = start_time self.fps = fps self.path = path self.name = name self.prev = None def __call__(self, context): frame_number = context['frame_number'] count = _count = context['vehicle_count'] if self.prev: _count = count - self.prev time = ((self.start_time + int(frame_number / self.fps)) * 100 + int(100.0 / self.fps) * (frame_number % self.fps)) self.writer.writerow({'time': time, 'vehicles': _count}) self.prev = count return context class Visualizer(PipelineProcessor): def __init__(self, save_image=True, image_dir='images'): super(Visualizer, self).__init__() self.save_image = save_image self.image_dir = image_dir def check_exit(self, point, exit_masks=[]): for exit_mask in exit_masks: if exit_mask[point[1]][point[0]] == 255: return True return False def draw_pathes(self, img, pathes): if not img.any(): return for i, path in enumerate(pathes): path = np.array(path)[:, 1].tolist() for point in path: cv2.circle(img, point, 2, CAR_COLOURS[0], -1) cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1) return img def draw_boxes(self, img, pathes, exit_masks=[]): for (i, match) in enumerate(pathes): contour, centroid = match[-1][:2] if self.check_exit(centroid, exit_masks): continue x, y, w, h = contour cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1) cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1) return img def draw_ui(self, img, vehicle_count, exit_masks=[]): # this just add green mask with opacity to the image for exit_mask in exit_masks: _img = np.zeros(img.shape, img.dtype) _img[:, :] = EXIT_COLOR mask = cv2.bitwise_and(_img, _img, mask=exit_mask) cv2.addWeighted(mask, 1, img, 1, 0, img) # drawing top block with counts cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED) cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) return img def __call__(self, context): frame = context['frame'].copy() frame_number = context['frame_number'] pathes = context['pathes'] exit_masks = context['exit_masks'] vehicle_count = context['vehicle_count'] frame = self.draw_ui(frame, vehicle_count, exit_masks) frame = self.draw_pathes(frame, pathes) frame = self.draw_boxes(frame, pathes, exit_masks) utils.save_frame(frame, self.image_dir + "/processed_%04d.png" % frame_number) return context 看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。
當(dāng)前文章:使用OpenCV怎么實(shí)現(xiàn)道路車輛計(jì)數(shù)功能
網(wǎng)址分享:http://chinadenli.net/article24/gspgje.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)、虛擬主機(jī)、動(dòng)態(tài)網(wǎng)站、網(wǎng)站改版、搜索引擎優(yōu)化、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)