« 月の土地、買いました | トップページ | 長さ5メートルの自撮り棒 »

2021年7月26日 (月)

Yolov4で物体検出した対象にIDつけて追いかけ(トラッキング)つつ、ある場所を通過した人をカウントするやつ作ってみた

まるでどこかのラノベのような長いタイトルで申し訳ないですが、表題の通りのもの、作りました。

Track02  

用途としては、ある場所を通過した人の人数を数える、という、イベントなどでは使いたくなるやつです。

(ただし、今回のは出入り方向のどちらも区別なくカウントします)

もちろん、一から作ったわけではなく、ベースの仕組みがあったので、それを改造して作成。

ベースとしたコードは、こちら。

GitHub - LeonLok/Deep-SORT-YOLOv4: People detection and optional tracking with Tensorflow backend.

「Deep Sort Yolov4」という、Yolov4を使って物体検出(デフォルトでは人)をさせて、それぞれにIDをつけて追跡する、いわゆる「物体追跡」のコードです。

物体検出には、検出精度が高くて定評な(かつ商用利用も安心な)Yolov4を使ってます。

しかもこのコード、TensorFlow 1.14とKeras 2.2.4という、我が家の環境でも動きます。

(なお、TensorFlow 2.0版も同梱されてます)

ここでは、Windows 10+Python 3.7.9+TensorFlow 1.14+Keras 2.2.4で話を進めます。

まず、上のコードをZIPでダウンロードし、展開。

Deep-SORT-YOLOv4-master」というフォルダができるので、その中の「tensorflow1.14」‐「deep-sort-yolov4」に入ります。

で、まずはREADME.mdのQuick startに従い、既存の学習モデルを入手、変換します。

ここから「yolov4.weights」が入手できるので、それを上のフォルダに入れて、

> python convert.py

と入力し、実行。

yolo4.h5」というファイルができるので、それを「model_data」というフォルダに移動。

ここでまず、

> python demo.py

を動かしてみます。

Track03

多分、デフォルトではこんな感じの絵が出てくるはずです。

歩行者にそれぞれ個別のIDが割り振られて、ちゃんと追跡されているのが分かるかと思います。

読み込む動画を変えたいときは、「demo.py」の中の41行目にある

file_path = 'video.webm'

のイコールの後ろ、「video.webm」のところを処理させたい動画ファイル名に変えてやればOKです。

Track01

こんな感じになります。

今回の目的は、ある場所の通過地点の人の数をカウントすることが目的なため、上側から撮影した俯瞰動画の方が都合がいいです。

ということで、ここでは以前に書いた「Jetson Nanoの固定カメラで人の動きを分析する「動線分析」をやらせてみた: EeePCの軌跡」で、Jetson Nanoの中に入っていた「pedestrians.mp4」を使うことにします。

さて、このままでは、全然人をカウントしてくれません。そこで、このdemo.pyを以下のように書き換えます。

demo.py


#! /usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division, print_function, absolute_import

from timeit import time
import warnings
import cv2
import numpy as np
from PIL import Image
from yolo import YOLO

from deep_sort import preprocessing
from deep_sort import nn_matching
from deep_sort.detection import Detection
from deep_sort.detection_yolo import Detection_YOLO
from deep_sort.tracker import Tracker
from tools import generate_detections as gdet
import imutils.video
from videocaptureasync import VideoCaptureAsync

warnings.filterwarnings('ignore')

def main(yolo):
  with open('test.csv','w'as f:
    # Definition of the parameters
    max_cosine_distance = 0.3
    nn_budget = None
    nms_max_overlap = 1.0
    
    # Deep SORT
    model_filename = 'model_data/mars-small128.pb'
    encoder = gdet.create_box_encoder(model_filename, batch_size=1)
    
    metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
    tracker = Tracker(metric)

    tracking = True
    writeVideo_flag = True
    asyncVideo_flag = False

    file_path = 'pedestrians.mp4'
    if asyncVideo_flag :
        video_capture = VideoCaptureAsync(file_path)
    else:
        video_capture = cv2.VideoCapture(file_path)

    if asyncVideo_flag:
        video_capture.start()

    if writeVideo_flag:
        if asyncVideo_flag:
            w = int(video_capture.cap.get(3))
            h = int(video_capture.cap.get(4))
        else:
            w = int(video_capture.get(3))
            h = int(video_capture.get(4))
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter('output_yolov4.avi', fourcc, 30, (w, h))
        frame_index = -1

    fps = 0.0
    fps_imutils = imutils.video.FPS().start()

    count = 0

    while True:
        count+=1
        ret, frame = video_capture.read()  # frame shape 640*480*3
        if ret != True:
             break

        t1 = time.time()

        if count==1:
            cv2.imwrite('test.jpg',frame)

        image = Image.fromarray(frame[...,::-1])  # bgr to rgb
        boxes, confidence, classes = yolo.detect_image(image)

        if tracking:
            features = encoder(frame, boxes)

            detections = [Detection(bbox, confidence, cls, feature) for bbox, confidence, cls, feature in
                          zip(boxes, confidence, classes, features)]
        else:
            detections = [Detection_YOLO(bbox, confidence, clsfor bbox, confidence, cls in
                          zip(boxes, confidence, classes)]

        # Run non-maxima suppression.
        boxes = np.array([d.tlwh for d in detections])
        scores = np.array([d.confidence for d in detections])
        indices = preprocessing.non_max_suppression(boxes, nms_max_overlap, scores)
        detections = [detections[i] for i in indices]

        for det in detections:
            bbox = det.to_tlbr()
            score = "%.2f" % round(det.confidence * 1002) + "%"
            cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (25500), 2)
            if len(classes) > 0:
                cls = det.cls
                cv2.putText(frame, str(cls) + " " + score, (int(bbox[0]), int(bbox[3])), 0,
                            1e-3 * frame.shape[0], (02550), 1)

        if tracking:
            # Call the tracker
            tracker.predict()
            tracker.update(detections)

            for track in tracker.tracks:
                if not track.is_confirmed() or track.time_since_update > 1:
                    continue
                bbox = track.to_tlbr()
                cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (255255255), 2)
                cv2.putText(frame, "ID: " + str(track.track_id), (int(bbox[0]), int(bbox[1])), 0,
                            1e-3 * frame.shape[0], (02550), 1)
                track_info = str(count) + ' , ' + str(track.track_id) + ' , ' + str((int(bbox[0]) + int(bbox[2]))/2) + ' , ' + str((int(bbox[1]) + int(bbox[3]))/2)
                print(track_info,file=f)
                print(track_info)

        cv2.imshow('', frame)

        if writeVideo_flag: # and not asyncVideo_flag:
            # save a frame
            out.write(frame)
            frame_index = frame_index + 1

        fps_imutils.update()

        if not asyncVideo_flag:
            fps = (fps + (1./(time.time()-t1))) / 2
            print("FPS = %f"%(fps))
        
        # Press Q to stop!
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    fps_imutils.stop()
    print('imutils FPS: {}'.format(fps_imutils.fps()))

    if asyncVideo_flag:
        video_capture.stop()
    else:
        video_capture.release()

    if writeVideo_flag:
        out.release()

    cv2.destroyAllWindows()

if __name__ == '__main__':
    main(YOLO())

ところどころ書き換えてますが、要するにこれ、最初の1枚目の画像を出力し、画像のコマ数、トラッキングIDとボックスの中心座標を、CSVファイルに吐き出すようにしただけです。

Track04

こんなのが出てきます。左から順に「コマ数」「トラッキングID」「検出ボックス中心のX座標(ピクセル)」「〃 Y座標」。

それを可視化するコードは、こちら。

plot_count.py


import numpy as np
import matplotlib.pyplot as plt
import csv
import math
import cv2

cross_id = []

# 通過判定のボックス範囲
bminx = 673
bmaxx = 711
bminy = 196
bmaxy = 324

# カラーマップを作成
np.random.seed(133)

col_b = np.random.randint(0,255,256)
col_g = np.random.randint(0,255,256)
col_r = np.random.randint(0,255,256)

# 1枚目の画像を読み取り
img = cv2.imread('test.jpg')

with open("./test.csv","r"as f:
  with open('cross_id.csv''w'newline=''as file:
    reader = csv.reader(f)
    for line in reader:
        id = math.floor(float(line[1]))
        x = math.floor(float(line[2]))
        y = math.floor(float(line[3]))

        # id書き込み
        if x>bminx and x<bmaxx and y>bminy and y<bmaxy:
            print(id,file=file)
            cross_id.append(id)

        # 点をプロット
        cv2.circle(img, (x, y), 2color=(int(col_b[id%255]), int(col_g[id%255]) ,int(col_r[id%255]) ), thickness=2)

# 四角形
cv2.rectangle(img, (bminx ,bminy), (bmaxx, bmaxy), (00255),thickness=2)
# 画像書き込み
cv2.imwrite('image.png',img)

# IDの重複を削除
count_id = list(set(cross_id))

# 通過数のカウント表示
print(len(count_id))

# 重複を除いたIDを書き込み
file = open('count_id.csv''w'newline=''
writer = csv.writer(file)
for ele in count_id:
    writer.writerow([ele])

# 結果を表示する。
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.axis('off')
plt.show()

これで得られた画像は、以下。

Track02

各々の人の軌跡の画像が、こんな感じに得られます。

で、真ん中ちょっと右に赤い四角がありますが、このコードは同時に、ここを通過する人の人数をカウントしてます。

赤枠の座標は、上のプログラム中の「bminx」「bmaxx」「bminy」「bmaxy」でそれぞれX、Y座標を指定してます。

で、上のプログラム中に「cross_id」という変数(リスト)がありますが、そこにこの赤枠内を通過したdeep sortが取得したIDが記録されます。

が、実はこの赤枠内を通過している間がすべて記録されるため、同じIDがいくつも記録されてます。

Track05

こんな感じ。

で、この重複部分を削除したのが「count_id」というリストで、

Track06

こうなります。

上の図でも見てわかると思いますが、この赤枠内を通過した人は3人。

で、IDも3つ。

ちゃんと、カウントされていることが分かるかと思います。

なお、実行ターミナル上にもこの「3」という数字が出てきます。

固定カメラで得られた動画と、どこを通過した人数をカウントしたいか、さえ決めれば、カウントしてくれます。

ただし、入場、出場の人数分けをしたいときは、今のところできませんね。またそれは、いずれ考えようかと。

ところで、私がやりたかったのは、実は「人」ではなく「車」のカウント。

ある場所を通過する車の台数をカウントする必要があり、それをさせるためにこれを作りました。

が、このままでこいつ、「人」しかカウントしません。

これを「車」用に変えるには、同じフォルダ内にある「yolo4.py」を開き、その104行目の


if predicted_class != 'person':


if predicted_class != 'car':

に変えてやれば、車をカウントできるようになります。

なお、ラベル名は「model_data」フォルダの中にある「coco_classes.txt」を参照してください。

このラベル内にないものを検出させたい場合は……頑張って、独自データで学習させるしかありませんね。

まあ、たいていは人か車をカウントすることが多いので、その用途でなら上の方法で行けます。

ところで、今回は上の「demo.py」と「plot_cound.py」を別々にしてます。

一緒にしようと思えばできたのですが、トラッキングしたデータをカウントする場所をトライアンドエラーすることを想定し、敢えてコードを分けておきました。

これが一緒だと、試行錯誤するたびに毎回動画処理をすることとなり、ものすごく時間がかかります。

さらに、数か所をはかることになりそうなので、その場合は「plot_cound.py」をコピーして、それぞれの場所のカウントを個別に取る、なんてことも対応できます。

ついでに言うと、入退場を別々に処理させたい場合も、いったんは別々に作ってからやった方がプログラムの試行錯誤もやりやすいかと思ったわけです。

例によって、このブログに載ったコードは、自由に使っていただいてOKです。元々、ベースのコードもそうですし。

ただし利用は、自己責任でお願いいたします。


TensorFlowはじめました3 Object Detection ─ 物体検出 (NextPublishing)

« 月の土地、買いました | トップページ | 長さ5メートルの自撮り棒 »

科学・技術」カテゴリの記事

コメント

コメントを書く

(ウェブ上には掲載しません)

« 月の土地、買いました | トップページ | 長さ5メートルの自撮り棒 »

無料ブログはココログ

スポンサード リンク

ブログ村