From cff1d4eac72f0a09e5f6e3ecd4d11b0fb637cb7a Mon Sep 17 00:00:00 2001 From: matlabbe Date: Mon, 30 Nov 2015 00:27:57 -0500 Subject: [PATCH] Added "--tcp_threads" argument for multi-threaded detections (on multiple ports) --- app/CMakeLists.txt | 12 ++- app/TcpServerPool.h | 130 ++++++++++++++++++++++++++ app/main.cpp | 77 +++++++++------ include/find_object/FindObject.h | 2 +- include/find_object/TcpServer.h | 6 +- src/FindObject.cpp | 15 +-- src/TcpServer.cpp | 10 ++ tools/tcpClient/CMakeLists.txt | 2 - tools/tcpImagesServer/CMakeLists.txt | 2 - tools/tcpRequest/CMakeLists.txt | 2 - tools/tcpRequest/main.cpp | 134 ++++++++++++++++++--------- tools/tcpService/main.cpp | 2 +- 12 files changed, 304 insertions(+), 90 deletions(-) create mode 100644 app/TcpServerPool.h diff --git a/app/CMakeLists.txt b/app/CMakeLists.txt index e4c76fc9..454079ad 100644 --- a/app/CMakeLists.txt +++ b/app/CMakeLists.txt @@ -1,4 +1,14 @@ +SET(headers_ui + TcpServerPool.h +) + +IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4") + QT4_WRAP_CPP(moc_srcs ${headers_ui}) +ELSE() + QT5_WRAP_CPP(moc_srcs ${headers_ui}) +ENDIF() + SET(INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/../include ${CMAKE_CURRENT_SOURCE_DIR} @@ -17,7 +27,7 @@ SET(LIBRARIES #include files INCLUDE_DIRECTORIES(${INCLUDE_DIRS}) -SET(SRC_FILES main.cpp) +SET(SRC_FILES main.cpp ${moc_srcs} ) # For Apple set the icns file containing icons IF(APPLE AND BUILD_AS_BUNDLE) diff --git a/app/TcpServerPool.h b/app/TcpServerPool.h new file mode 100644 index 00000000..c4d4e326 --- /dev/null +++ b/app/TcpServerPool.h @@ -0,0 +1,130 @@ +/* + * TcpServerPool.h + * + * Created on: Nov 29, 2015 + * Author: mathieu + */ + +#ifndef TCPSERVERPOOL_H_ +#define TCPSERVERPOOL_H_ + +#include +#include +#include +#include +#include + +class FindObjectWorker : public QObject +{ + Q_OBJECT; + +public: + FindObjectWorker( + find_object::FindObject * sharedFindObject, + QSemaphore * sharedSemaphore, + int maxSemaphoreResources, + QObject * parent = 0) : + QObject(parent), + sharedFindObject_(sharedFindObject), + sharedSemaphore_(sharedSemaphore), + maxSemaphoreResources_(maxSemaphoreResources) + { + UASSERT(sharedFindObject != 0); + UASSERT(sharedSemaphore != 0); + UASSERT(maxSemaphoreResources > 0); + } + +public Q_SLOTS: + void detect(const cv::Mat & image) + { + sharedSemaphore_->acquire(1); + UINFO("Thread %p detecting...", (void *)this->thread()); + find_object::DetectionInfo info; + sharedFindObject_->detect(image, info); + Q_EMIT objectsFound(info); + sharedSemaphore_->release(1); + } + + void addObjectAndUpdate(const cv::Mat & image, int id, const QString & filePath) + { + //block everyone! + sharedSemaphore_->acquire(maxSemaphoreResources_); + UINFO("Thread %p adding object %d (%s)...", (void *)this->thread(), id, filePath.toStdString().c_str()); + sharedFindObject_->addObjectAndUpdate(image, id, filePath); + sharedSemaphore_->release(maxSemaphoreResources_); + } + void removeObjectAndUpdate(int id) + { + //block everyone! + sharedSemaphore_->acquire(maxSemaphoreResources_); + UINFO("Thread %p removing object %d...", (void *)this->thread(), id); + sharedFindObject_->removeObjectAndUpdate(id); + sharedSemaphore_->release(maxSemaphoreResources_); + } + +Q_SIGNALS: + void objectsFound(const find_object::DetectionInfo &); + +private: + find_object::FindObject * sharedFindObject_; //shared findobject + QSemaphore * sharedSemaphore_; + int maxSemaphoreResources_; +}; + +class TcpServerPool : public QObject +{ + Q_OBJECT; +public: + TcpServerPool(find_object::FindObject * sharedFindObject, int threads, int port) : + sharedFindObject_(sharedFindObject), + sharedSemaphore_(threads) + { + UASSERT(sharedFindObject != 0); + UASSERT(port!=0); + UASSERT(threads>=1); + + qRegisterMetaType("cv::Mat"); + + threadPool_.resize(threads); + for(int i=0; igetPort(), + tcpServer->getHostAddress().toString().toStdString().c_str()); + + threadPool_[i] = new QThread(this); + FindObjectWorker * worker = new FindObjectWorker(sharedFindObject, &sharedSemaphore_, threads); + + tcpServer->moveToThread(threadPool_[i]); + worker->moveToThread(threadPool_[i]); + connect(threadPool_[i], SIGNAL(finished()), tcpServer, SLOT(deleteLater())); + connect(threadPool_[i], SIGNAL(finished()), worker, SLOT(deleteLater())); + + // connect stuff: + QObject::connect(worker, SIGNAL(objectsFound(find_object::DetectionInfo)), tcpServer, SLOT(publishDetectionInfo(find_object::DetectionInfo))); + QObject::connect(tcpServer, SIGNAL(detectObject(const cv::Mat &)), worker, SLOT(detect(const cv::Mat &))); + QObject::connect(tcpServer, SIGNAL(addObject(const cv::Mat &, int, const QString &)), worker, SLOT(addObjectAndUpdate(const cv::Mat &, int, const QString &))); + QObject::connect(tcpServer, SIGNAL(removeObject(int)), worker, SLOT(removeObjectAndUpdate(int))); + threadPool_[i]->start(); + } + } + + virtual ~TcpServerPool() + { + for(int i=0; iquit(); + threadPool_[i]->wait(); + } + } + +private: + find_object::FindObject * sharedFindObject_; + QVector threadPool_; + QSemaphore sharedSemaphore_; +}; + + + +#endif /* TCPSERVERPOOL_H_ */ diff --git a/app/main.cpp b/app/main.cpp index 3a602739..74e6205d 100644 --- a/app/main.cpp +++ b/app/main.cpp @@ -37,6 +37,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "find_object/TcpServer.h" #include "find_object/JsonWriter.h" #include "find_object/utilite/ULogger.h" +#include "TcpServerPool.h" bool running = true; @@ -112,8 +113,13 @@ void showUsage() " and \"General/vocabularyFixed\" will be also enabled. Ignored if \"--session\" is set.\n" " --images_not_saved Don't keep images in RAM after the features are extracted (only\n" " in console mode). Images won't be saved if an output session is set.\n" + " --tcp_threads # Number of TCP threads (default 1, only in --console mode). \"--General/port\" parameter should not be 0.\n" + " Port numbers start from \"General/port\" value. \"Detect\" TCP service can be\n" + " executed at the same time by multiple threads. \"Add/Remove\" TCP services\n" + " cannot be called by multiple threads, so calling these services on a port\n " + " will block all other threads on the other ports.\n" " --debug Show debug log.\n" - " --debug-time Show debug log with time.\n" + " --log-time Show log with time.\n" " --params Show all parameters.\n" " --defaults Use default parameters (--config is ignored).\n" " --My/Parameter \"value\" Set find-Object's parameter (look --params for parameters' name).\n" @@ -146,6 +152,7 @@ int main(int argc, char* argv[]) QString jsonPath; find_object::ParametersMap customParameters; bool imagesSaved = true; + int tcpThreads = 1; for(int i=1; i= 1!\n"); + showUsage(); + } + } + else + { + showUsage(); + } + continue; + } if(strcmp(argv[i], "--params") == 0) { find_object::ParametersMap parameters = find_object::Settings::getDefaultParameters(); @@ -578,31 +603,28 @@ int main(int argc, char* argv[]) } else { - find_object::Camera camera; - find_object::TcpServer tcpServer(find_object::Settings::getGeneral_port()); - UINFO("Detection sent on port: %d (IP=%s)", tcpServer.getPort(), tcpServer.getHostAddress().toString().toStdString().c_str()); + TcpServerPool tcpServerPool(findObject, tcpThreads, find_object::Settings::getGeneral_port()); - // connect stuff: - // [FindObject] ---ObjectsDetected---> [TcpServer] - QObject::connect(findObject, SIGNAL(objectsFound(find_object::DetectionInfo)), &tcpServer, SLOT(publishDetectionInfo(find_object::DetectionInfo))); - - // [Camera] ---Image---> [FindObject] - QObject::connect(&camera, SIGNAL(imageReceived(const cv::Mat &)), findObject, SLOT(detect(const cv::Mat &))); - QObject::connect(&camera, SIGNAL(finished()), &app, SLOT(quit())); - - //connect services - QObject::connect(&tcpServer, SIGNAL(addObject(const cv::Mat &, int, const QString &)), findObject, SLOT(addObjectAndUpdate(const cv::Mat &, int, const QString &))); - QObject::connect(&tcpServer, SIGNAL(removeObject(int)), findObject, SLOT(removeObjectAndUpdate(int))); - - //use camera in settings setupQuitSignal(); - // start processing! - while(running && !camera.start()) + //If TCP camera is used + find_object::Camera * camera = 0; + if(find_object::Settings::getCamera_6useTcpCamera()) { - UERROR("Camera initialization failed!"); - running = false; + camera = new find_object::Camera(); + + // [Camera] ---Image---> [FindObject] + QObject::connect(camera, SIGNAL(imageReceived(const cv::Mat &)), findObject, SLOT(detect(const cv::Mat &))); + QObject::connect(camera, SIGNAL(finished()), &app, SLOT(quit())); + + if(!camera->start()) + { + UERROR("Camera initialization failed!"); + running = false; + } } + + // start processing! if(running) { app.exec(); @@ -626,8 +648,11 @@ int main(int argc, char* argv[]) } // cleanup - camera.stop(); - tcpServer.close(); + if(camera) + { + camera->stop(); + delete camera; + } } delete findObject; diff --git a/include/find_object/FindObject.h b/include/find_object/FindObject.h index 8ccb32f7..cbcd1a88 100644 --- a/include/find_object/FindObject.h +++ b/include/find_object/FindObject.h @@ -78,7 +78,7 @@ public: void removeObject(int id); void removeAllObjects(); - bool detect(const cv::Mat & image, find_object::DetectionInfo & info); + bool detect(const cv::Mat & image, find_object::DetectionInfo & info) const; void updateDetectorExtractor(); void updateObjects(const QList & ids = QList()); diff --git a/include/find_object/TcpServer.h b/include/find_object/TcpServer.h index 96038889..5a82f9ad 100644 --- a/include/find_object/TcpServer.h +++ b/include/find_object/TcpServer.h @@ -45,8 +45,9 @@ class FINDOBJECT_EXP TcpServer : public QTcpServer public: enum Service { - kAddObject, // id fileName imageSize image - kRemoveObject // id + kAddObject, // id fileName imageSize image + kRemoveObject, // id + kDetectObject // image }; public: @@ -67,6 +68,7 @@ private Q_SLOTS: Q_SIGNALS: void addObject(const cv::Mat &, int, const QString &); void removeObject(int); + void detectObject(const cv::Mat &); private: QMap blockSizes_; diff --git a/src/FindObject.cpp b/src/FindObject.cpp index 17557a69..acc3b4e6 100644 --- a/src/FindObject.cpp +++ b/src/FindObject.cpp @@ -1250,10 +1250,6 @@ void FindObject::detect(const cv::Mat & image) time.start(); DetectionInfo info; this->detect(image, info); - if(info.objDetected_.size() > 0 || Settings::getGeneral_sendNoObjDetectedEvents()) - { - Q_EMIT objectsFound(info); - } if(info.objDetected_.size() > 1) { @@ -1275,9 +1271,14 @@ void FindObject::detect(const cv::Mat & image) QTime::currentTime().toString("HH:mm:ss.zzz").toStdString().c_str(), time.elapsed()); } + + if(info.objDetected_.size() > 0 || Settings::getGeneral_sendNoObjDetectedEvents()) + { + Q_EMIT objectsFound(info); + } } -bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info) +bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info) const { QTime totalTime; totalTime.start(); @@ -1348,7 +1349,7 @@ bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info info.sceneWords_ = words; } - for(QMap::iterator iter=objects_.begin(); iter!=objects_.end(); ++iter) + for(QMap::const_iterator iter=objects_.begin(); iter!=objects_.end(); ++iter) { info.matches_.insert(iter.key(), QMultiMap()); } @@ -1434,7 +1435,7 @@ bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info } else { - QMap::iterator iter = dataRange_.lowerBound(i); + QMap::const_iterator iter = dataRange_.lowerBound(i); int objectId = iter.value(); int fisrtObjectDescriptorIndex = (iter == dataRange_.begin())?0:(--iter).key()+1; int objectDescriptorIndex = i - fisrtObjectDescriptorIndex; diff --git a/src/TcpServer.cpp b/src/TcpServer.cpp index dfc5a290..a0d8bbc5 100644 --- a/src/TcpServer.cpp +++ b/src/TcpServer.cpp @@ -80,6 +80,7 @@ void TcpServer::publishDetectionInfo(const DetectionInfo & info) QList clients = this->findChildren(); if(clients.size()) { + UINFO("TCP server: Publish detected objects"); QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_0); @@ -156,6 +157,15 @@ void TcpServer::readReceivedData() UINFO("TCP service: Remove %d", id); Q_EMIT removeObject(id); } + else if(serviceType == kDetectObject) + { + std::vector buf(blockSizes_[client->socketDescriptor()]); + in.readRawData((char*)buf.data(), blockSizes_[client->socketDescriptor()]-sizeof(quint32)); + cv::Mat image = cv::imdecode(buf, cv::IMREAD_UNCHANGED); + + UINFO("TCP service: Detect object"); + Q_EMIT detectObject(image); + } else { UERROR("Unknown service type called %d", serviceType); diff --git a/tools/tcpClient/CMakeLists.txt b/tools/tcpClient/CMakeLists.txt index 11beb5b7..425e895b 100644 --- a/tools/tcpClient/CMakeLists.txt +++ b/tools/tcpClient/CMakeLists.txt @@ -4,9 +4,7 @@ SET(headers_ui ) IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4") - #This will generate moc_* for Qt QT4_WRAP_CPP(moc_srcs ${headers_ui}) - ### Qt Gui stuff end### ELSE() QT5_WRAP_CPP(moc_srcs ${headers_ui}) ENDIF() diff --git a/tools/tcpImagesServer/CMakeLists.txt b/tools/tcpImagesServer/CMakeLists.txt index f4ee7b29..84f82c88 100644 --- a/tools/tcpImagesServer/CMakeLists.txt +++ b/tools/tcpImagesServer/CMakeLists.txt @@ -4,9 +4,7 @@ SET(headers_ui ) IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4") - #This will generate moc_* for Qt QT4_WRAP_CPP(moc_srcs ${headers_ui}) - ### Qt Gui stuff end### ELSE() QT5_WRAP_CPP(moc_srcs ${headers_ui}) ENDIF() diff --git a/tools/tcpRequest/CMakeLists.txt b/tools/tcpRequest/CMakeLists.txt index 6e5085d3..3bc7d7e5 100644 --- a/tools/tcpRequest/CMakeLists.txt +++ b/tools/tcpRequest/CMakeLists.txt @@ -4,9 +4,7 @@ SET(headers_ui ) IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4") - #This will generate moc_* for Qt QT4_WRAP_CPP(moc_srcs ${headers_ui}) - ### Qt Gui stuff end### ELSE() QT5_WRAP_CPP(moc_srcs ${headers_ui}) ENDIF() diff --git a/tools/tcpRequest/main.cpp b/tools/tcpRequest/main.cpp index 21aa3cd9..820f187c 100644 --- a/tools/tcpRequest/main.cpp +++ b/tools/tcpRequest/main.cpp @@ -30,14 +30,17 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include "TcpResponse.h" #include "find_object/JsonWriter.h" void showUsage() { printf("\ntcpRequest [options] --scene image.png --out # --in #\n" + "\ntcpRequest [options] --scene image.png --port #\n" " \"out\" is the port to which the image is sent.\n" " \"in\" is the port from which the detection is received.\n" + " \"port\" is the bidirectional port from which the image is sent AND the detection is received.\n" " Options:\n" " --host #.#.#.# Set host address.\n" " --json \"path\" Path to an output JSON file.\n" @@ -52,6 +55,7 @@ int main(int argc, char * argv[]) QString jsonPath; quint16 portOut = 0; quint16 portIn = 0; + quint16 bidrectionalPort = 0; for(int i=1; iseek(0); out << (quint64)(block.size() - sizeof(quint64)); - if(request.waitForReadyRead()) + qint64 bytes = requestPtr->write(block); + printf("Image published (%d bytes), waiting for response...\n", (int)bytes); + + QTime time; + time.start(); + + // wait for response + if(bidrectionalPort) + { + requestPtr->waitForBytesWritten(); + response.waitForReadyRead(); + } + else { - qint64 bytes = request.write(block); - printf("Image published (%d bytes), waiting for response...\n", (int)bytes); - QTime time; - time.start(); - - // wait for response app.exec(); + } - if(response.dataReceived()) + if(response.dataReceived()) + { + printf("Response received! (%d ms)\n", time.elapsed()); + // print detected objects + if(response.info().objDetected_.size()) { - printf("Response received! (%d ms)\n", time.elapsed()); - // print detected objects - if(response.info().objDetected_.size()) + QList ids = response.info().objDetected_.uniqueKeys(); + for(int i=0; i ids = response.info().objDetected_.uniqueKeys(); - for(int i=0; i