Added "--tcp_threads" argument for multi-threaded detections (on multiple ports)

This commit is contained in:
matlabbe 2015-11-30 00:27:57 -05:00
parent 164da72169
commit cff1d4eac7
12 changed files with 304 additions and 90 deletions

View File

@ -1,4 +1,14 @@
SET(headers_ui
TcpServerPool.h
)
IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4")
QT4_WRAP_CPP(moc_srcs ${headers_ui})
ELSE()
QT5_WRAP_CPP(moc_srcs ${headers_ui})
ENDIF()
SET(INCLUDE_DIRS
${CMAKE_CURRENT_SOURCE_DIR}/../include
${CMAKE_CURRENT_SOURCE_DIR}
@ -17,7 +27,7 @@ SET(LIBRARIES
#include files
INCLUDE_DIRECTORIES(${INCLUDE_DIRS})
SET(SRC_FILES main.cpp)
SET(SRC_FILES main.cpp ${moc_srcs} )
# For Apple set the icns file containing icons
IF(APPLE AND BUILD_AS_BUNDLE)

130
app/TcpServerPool.h Normal file
View File

@ -0,0 +1,130 @@
/*
* TcpServerPool.h
*
* Created on: Nov 29, 2015
* Author: mathieu
*/
#ifndef TCPSERVERPOOL_H_
#define TCPSERVERPOOL_H_
#include <find_object/FindObject.h>
#include <find_object/TcpServer.h>
#include <find_object/utilite/ULogger.h>
#include <QtCore/QThread>
#include <QtCore/QSemaphore>
class FindObjectWorker : public QObject
{
Q_OBJECT;
public:
FindObjectWorker(
find_object::FindObject * sharedFindObject,
QSemaphore * sharedSemaphore,
int maxSemaphoreResources,
QObject * parent = 0) :
QObject(parent),
sharedFindObject_(sharedFindObject),
sharedSemaphore_(sharedSemaphore),
maxSemaphoreResources_(maxSemaphoreResources)
{
UASSERT(sharedFindObject != 0);
UASSERT(sharedSemaphore != 0);
UASSERT(maxSemaphoreResources > 0);
}
public Q_SLOTS:
void detect(const cv::Mat & image)
{
sharedSemaphore_->acquire(1);
UINFO("Thread %p detecting...", (void *)this->thread());
find_object::DetectionInfo info;
sharedFindObject_->detect(image, info);
Q_EMIT objectsFound(info);
sharedSemaphore_->release(1);
}
void addObjectAndUpdate(const cv::Mat & image, int id, const QString & filePath)
{
//block everyone!
sharedSemaphore_->acquire(maxSemaphoreResources_);
UINFO("Thread %p adding object %d (%s)...", (void *)this->thread(), id, filePath.toStdString().c_str());
sharedFindObject_->addObjectAndUpdate(image, id, filePath);
sharedSemaphore_->release(maxSemaphoreResources_);
}
void removeObjectAndUpdate(int id)
{
//block everyone!
sharedSemaphore_->acquire(maxSemaphoreResources_);
UINFO("Thread %p removing object %d...", (void *)this->thread(), id);
sharedFindObject_->removeObjectAndUpdate(id);
sharedSemaphore_->release(maxSemaphoreResources_);
}
Q_SIGNALS:
void objectsFound(const find_object::DetectionInfo &);
private:
find_object::FindObject * sharedFindObject_; //shared findobject
QSemaphore * sharedSemaphore_;
int maxSemaphoreResources_;
};
class TcpServerPool : public QObject
{
Q_OBJECT;
public:
TcpServerPool(find_object::FindObject * sharedFindObject, int threads, int port) :
sharedFindObject_(sharedFindObject),
sharedSemaphore_(threads)
{
UASSERT(sharedFindObject != 0);
UASSERT(port!=0);
UASSERT(threads>=1);
qRegisterMetaType<cv::Mat>("cv::Mat");
threadPool_.resize(threads);
for(int i=0; i<threads; ++i)
{
find_object::TcpServer * tcpServer = new find_object::TcpServer(port++);
UINFO("TcpServer set on port: %d (IP=%s)",
tcpServer->getPort(),
tcpServer->getHostAddress().toString().toStdString().c_str());
threadPool_[i] = new QThread(this);
FindObjectWorker * worker = new FindObjectWorker(sharedFindObject, &sharedSemaphore_, threads);
tcpServer->moveToThread(threadPool_[i]);
worker->moveToThread(threadPool_[i]);
connect(threadPool_[i], SIGNAL(finished()), tcpServer, SLOT(deleteLater()));
connect(threadPool_[i], SIGNAL(finished()), worker, SLOT(deleteLater()));
// connect stuff:
QObject::connect(worker, SIGNAL(objectsFound(find_object::DetectionInfo)), tcpServer, SLOT(publishDetectionInfo(find_object::DetectionInfo)));
QObject::connect(tcpServer, SIGNAL(detectObject(const cv::Mat &)), worker, SLOT(detect(const cv::Mat &)));
QObject::connect(tcpServer, SIGNAL(addObject(const cv::Mat &, int, const QString &)), worker, SLOT(addObjectAndUpdate(const cv::Mat &, int, const QString &)));
QObject::connect(tcpServer, SIGNAL(removeObject(int)), worker, SLOT(removeObjectAndUpdate(int)));
threadPool_[i]->start();
}
}
virtual ~TcpServerPool()
{
for(int i=0; i<threadPool_.size(); ++i)
{
threadPool_[i]->quit();
threadPool_[i]->wait();
}
}
private:
find_object::FindObject * sharedFindObject_;
QVector<QThread*> threadPool_;
QSemaphore sharedSemaphore_;
};
#endif /* TCPSERVERPOOL_H_ */

View File

@ -37,6 +37,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "find_object/TcpServer.h"
#include "find_object/JsonWriter.h"
#include "find_object/utilite/ULogger.h"
#include "TcpServerPool.h"
bool running = true;
@ -112,8 +113,13 @@ void showUsage()
" and \"General/vocabularyFixed\" will be also enabled. Ignored if \"--session\" is set.\n"
" --images_not_saved Don't keep images in RAM after the features are extracted (only\n"
" in console mode). Images won't be saved if an output session is set.\n"
" --tcp_threads # Number of TCP threads (default 1, only in --console mode). \"--General/port\" parameter should not be 0.\n"
" Port numbers start from \"General/port\" value. \"Detect\" TCP service can be\n"
" executed at the same time by multiple threads. \"Add/Remove\" TCP services\n"
" cannot be called by multiple threads, so calling these services on a port\n "
" will block all other threads on the other ports.\n"
" --debug Show debug log.\n"
" --debug-time Show debug log with time.\n"
" --log-time Show log with time.\n"
" --params Show all parameters.\n"
" --defaults Use default parameters (--config is ignored).\n"
" --My/Parameter \"value\" Set find-Object's parameter (look --params for parameters' name).\n"
@ -146,6 +152,7 @@ int main(int argc, char* argv[])
QString jsonPath;
find_object::ParametersMap customParameters;
bool imagesSaved = true;
int tcpThreads = 1;
for(int i=1; i<argc; ++i)
{
@ -322,11 +329,10 @@ int main(int argc, char* argv[])
ULogger::setLevel(ULogger::kDebug);
continue;
}
if(strcmp(argv[i], "-debug-time") == 0 ||
strcmp(argv[i], "--debug-time") == 0)
if(strcmp(argv[i], "-log-time") == 0 ||
strcmp(argv[i], "--log-time") == 0)
{
ULogger::setPrintWhere(true);
ULogger::setLevel(ULogger::kDebug);
ULogger::setPrintTime(true);
continue;
}
@ -353,6 +359,25 @@ int main(int argc, char* argv[])
}
continue;
}
if(strcmp(argv[i], "-tcp_threads") == 0 ||
strcmp(argv[i], "--tcp_threads") == 0)
{
++i;
if(i < argc)
{
tcpThreads = atoi(argv[i]);
if(tcpThreads < 1)
{
printf("tcp_threads should be >= 1!\n");
showUsage();
}
}
else
{
showUsage();
}
continue;
}
if(strcmp(argv[i], "--params") == 0)
{
find_object::ParametersMap parameters = find_object::Settings::getDefaultParameters();
@ -578,31 +603,28 @@ int main(int argc, char* argv[])
}
else
{
find_object::Camera camera;
find_object::TcpServer tcpServer(find_object::Settings::getGeneral_port());
UINFO("Detection sent on port: %d (IP=%s)", tcpServer.getPort(), tcpServer.getHostAddress().toString().toStdString().c_str());
TcpServerPool tcpServerPool(findObject, tcpThreads, find_object::Settings::getGeneral_port());
// connect stuff:
// [FindObject] ---ObjectsDetected---> [TcpServer]
QObject::connect(findObject, SIGNAL(objectsFound(find_object::DetectionInfo)), &tcpServer, SLOT(publishDetectionInfo(find_object::DetectionInfo)));
// [Camera] ---Image---> [FindObject]
QObject::connect(&camera, SIGNAL(imageReceived(const cv::Mat &)), findObject, SLOT(detect(const cv::Mat &)));
QObject::connect(&camera, SIGNAL(finished()), &app, SLOT(quit()));
//connect services
QObject::connect(&tcpServer, SIGNAL(addObject(const cv::Mat &, int, const QString &)), findObject, SLOT(addObjectAndUpdate(const cv::Mat &, int, const QString &)));
QObject::connect(&tcpServer, SIGNAL(removeObject(int)), findObject, SLOT(removeObjectAndUpdate(int)));
//use camera in settings
setupQuitSignal();
// start processing!
while(running && !camera.start())
//If TCP camera is used
find_object::Camera * camera = 0;
if(find_object::Settings::getCamera_6useTcpCamera())
{
UERROR("Camera initialization failed!");
running = false;
camera = new find_object::Camera();
// [Camera] ---Image---> [FindObject]
QObject::connect(camera, SIGNAL(imageReceived(const cv::Mat &)), findObject, SLOT(detect(const cv::Mat &)));
QObject::connect(camera, SIGNAL(finished()), &app, SLOT(quit()));
if(!camera->start())
{
UERROR("Camera initialization failed!");
running = false;
}
}
// start processing!
if(running)
{
app.exec();
@ -626,8 +648,11 @@ int main(int argc, char* argv[])
}
// cleanup
camera.stop();
tcpServer.close();
if(camera)
{
camera->stop();
delete camera;
}
}
delete findObject;

View File

@ -78,7 +78,7 @@ public:
void removeObject(int id);
void removeAllObjects();
bool detect(const cv::Mat & image, find_object::DetectionInfo & info);
bool detect(const cv::Mat & image, find_object::DetectionInfo & info) const;
void updateDetectorExtractor();
void updateObjects(const QList<int> & ids = QList<int>());

View File

@ -45,8 +45,9 @@ class FINDOBJECT_EXP TcpServer : public QTcpServer
public:
enum Service {
kAddObject, // id fileName imageSize image
kRemoveObject // id
kAddObject, // id fileName imageSize image
kRemoveObject, // id
kDetectObject // image
};
public:
@ -67,6 +68,7 @@ private Q_SLOTS:
Q_SIGNALS:
void addObject(const cv::Mat &, int, const QString &);
void removeObject(int);
void detectObject(const cv::Mat &);
private:
QMap<int, quint64> blockSizes_;

View File

@ -1250,10 +1250,6 @@ void FindObject::detect(const cv::Mat & image)
time.start();
DetectionInfo info;
this->detect(image, info);
if(info.objDetected_.size() > 0 || Settings::getGeneral_sendNoObjDetectedEvents())
{
Q_EMIT objectsFound(info);
}
if(info.objDetected_.size() > 1)
{
@ -1275,9 +1271,14 @@ void FindObject::detect(const cv::Mat & image)
QTime::currentTime().toString("HH:mm:ss.zzz").toStdString().c_str(),
time.elapsed());
}
if(info.objDetected_.size() > 0 || Settings::getGeneral_sendNoObjDetectedEvents())
{
Q_EMIT objectsFound(info);
}
}
bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info)
bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info) const
{
QTime totalTime;
totalTime.start();
@ -1348,7 +1349,7 @@ bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info
info.sceneWords_ = words;
}
for(QMap<int, ObjSignature*>::iterator iter=objects_.begin(); iter!=objects_.end(); ++iter)
for(QMap<int, ObjSignature*>::const_iterator iter=objects_.begin(); iter!=objects_.end(); ++iter)
{
info.matches_.insert(iter.key(), QMultiMap<int, int>());
}
@ -1434,7 +1435,7 @@ bool FindObject::detect(const cv::Mat & image, find_object::DetectionInfo & info
}
else
{
QMap<int, int>::iterator iter = dataRange_.lowerBound(i);
QMap<int, int>::const_iterator iter = dataRange_.lowerBound(i);
int objectId = iter.value();
int fisrtObjectDescriptorIndex = (iter == dataRange_.begin())?0:(--iter).key()+1;
int objectDescriptorIndex = i - fisrtObjectDescriptorIndex;

View File

@ -80,6 +80,7 @@ void TcpServer::publishDetectionInfo(const DetectionInfo & info)
QList<QTcpSocket*> clients = this->findChildren<QTcpSocket*>();
if(clients.size())
{
UINFO("TCP server: Publish detected objects");
QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_0);
@ -156,6 +157,15 @@ void TcpServer::readReceivedData()
UINFO("TCP service: Remove %d", id);
Q_EMIT removeObject(id);
}
else if(serviceType == kDetectObject)
{
std::vector<unsigned char> buf(blockSizes_[client->socketDescriptor()]);
in.readRawData((char*)buf.data(), blockSizes_[client->socketDescriptor()]-sizeof(quint32));
cv::Mat image = cv::imdecode(buf, cv::IMREAD_UNCHANGED);
UINFO("TCP service: Detect object");
Q_EMIT detectObject(image);
}
else
{
UERROR("Unknown service type called %d", serviceType);

View File

@ -4,9 +4,7 @@ SET(headers_ui
)
IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4")
#This will generate moc_* for Qt
QT4_WRAP_CPP(moc_srcs ${headers_ui})
### Qt Gui stuff end###
ELSE()
QT5_WRAP_CPP(moc_srcs ${headers_ui})
ENDIF()

View File

@ -4,9 +4,7 @@ SET(headers_ui
)
IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4")
#This will generate moc_* for Qt
QT4_WRAP_CPP(moc_srcs ${headers_ui})
### Qt Gui stuff end###
ELSE()
QT5_WRAP_CPP(moc_srcs ${headers_ui})
ENDIF()

View File

@ -4,9 +4,7 @@ SET(headers_ui
)
IF("${FINDOBJECT_QT_VERSION}" STREQUAL "4")
#This will generate moc_* for Qt
QT4_WRAP_CPP(moc_srcs ${headers_ui})
### Qt Gui stuff end###
ELSE()
QT5_WRAP_CPP(moc_srcs ${headers_ui})
ENDIF()

View File

@ -30,14 +30,17 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <QtCore/QFile>
#include <QtCore/QTime>
#include <opencv2/opencv.hpp>
#include <find_object/TcpServer.h>
#include "TcpResponse.h"
#include "find_object/JsonWriter.h"
void showUsage()
{
printf("\ntcpRequest [options] --scene image.png --out # --in #\n"
"\ntcpRequest [options] --scene image.png --port #\n"
" \"out\" is the port to which the image is sent.\n"
" \"in\" is the port from which the detection is received.\n"
" \"port\" is the bidirectional port from which the image is sent AND the detection is received.\n"
" Options:\n"
" --host #.#.#.# Set host address.\n"
" --json \"path\" Path to an output JSON file.\n"
@ -52,6 +55,7 @@ int main(int argc, char * argv[])
QString jsonPath;
quint16 portOut = 0;
quint16 portIn = 0;
quint16 bidrectionalPort = 0;
for(int i=1; i<argc; ++i)
{
@ -111,6 +115,20 @@ int main(int argc, char * argv[])
}
continue;
}
if(strcmp(argv[i], "--port") == 0 || strcmp(argv[i], "-port") == 0)
{
++i;
if(i < argc)
{
bidrectionalPort = std::atoi(argv[i]);
}
else
{
printf("error parsing --port\n");
showUsage();
}
continue;
}
if(strcmp(argv[i], "--json") == 0 || strcmp(argv[i], "-json") == 0)
{
@ -137,18 +155,25 @@ int main(int argc, char * argv[])
showUsage();
}
if(portOut == 0)
if(bidrectionalPort == 0 && portOut == 0 && portIn == 0)
{
printf("Arguments --port or [--in and --out] should be set.\n");
showUsage();
}
else if(portOut == 0 && bidrectionalPort == 0)
{
printf("Argument --out should be set.\n");
showUsage();
}
else if(portIn == 0)
else if(portIn == 0 && bidrectionalPort == 0)
{
printf("Argument --in should be set.\n");
showUsage();
}
else if(scenePath.isEmpty())
{
printf("Argument --scene should be set.\n");
showUsage();
}
if(ipAddress.isEmpty())
@ -169,22 +194,34 @@ int main(int argc, char * argv[])
QObject::connect(&response, SIGNAL(detectionReceived()), &app, SLOT(quit()));
QObject::connect(&response, SIGNAL(disconnected()), &app, SLOT(quit()));
QObject::connect(&request, SIGNAL(disconnected()), &app, SLOT(quit()));
QObject::connect(&response, SIGNAL(error(QAbstractSocket::SocketError)), &app, SLOT(quit()));
QObject::connect(&request, SIGNAL(error(QAbstractSocket::SocketError)), &app, SLOT(quit()));
request.connectToHost(ipAddress, portOut);
response.connectToHost(ipAddress, portIn);
if(!request.waitForConnected())
QTcpSocket * requestPtr = &request;
if(bidrectionalPort == 0)
{
printf("ERROR: Unable to connect to %s:%d\n", ipAddress.toStdString().c_str(), portOut);
return -1;
QObject::connect(&request, SIGNAL(disconnected()), &app, SLOT(quit()));
QObject::connect(&request, SIGNAL(error(QAbstractSocket::SocketError)), &app, SLOT(quit()));
request.connectToHost(ipAddress, portOut);
response.connectToHost(ipAddress, portIn);
if(!request.waitForConnected())
{
printf("ERROR: Unable to connect to %s:%d\n", ipAddress.toStdString().c_str(), portOut);
return -1;
}
}
else
{
printf("Using bidirectional port\n");
requestPtr = &response;
response.connectToHost(ipAddress, bidrectionalPort);
}
if(!response.waitForConnected())
{
printf("ERROR: Unable to connect to %s:%d\n", ipAddress.toStdString().c_str(), portIn);
printf("ERROR: Unable to connect to %s:%d\n", ipAddress.toStdString().c_str(), bidrectionalPort?bidrectionalPort:portIn);
return -1;
}
@ -196,60 +233,65 @@ int main(int argc, char * argv[])
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_0);
out << (quint64)0;
if(bidrectionalPort)
{
out << (quint32)find_object::TcpServer::kDetectObject;
}
out.writeRawData((char*)buf.data(), (int)buf.size());
out.device()->seek(0);
out << (quint64)(block.size() - sizeof(quint64));
if(request.waitForReadyRead())
qint64 bytes = requestPtr->write(block);
printf("Image published (%d bytes), waiting for response...\n", (int)bytes);
QTime time;
time.start();
// wait for response
if(bidrectionalPort)
{
requestPtr->waitForBytesWritten();
response.waitForReadyRead();
}
else
{
qint64 bytes = request.write(block);
printf("Image published (%d bytes), waiting for response...\n", (int)bytes);
QTime time;
time.start();
// wait for response
app.exec();
}
if(response.dataReceived())
if(response.dataReceived())
{
printf("Response received! (%d ms)\n", time.elapsed());
// print detected objects
if(response.info().objDetected_.size())
{
printf("Response received! (%d ms)\n", time.elapsed());
// print detected objects
if(response.info().objDetected_.size())
QList<int> ids = response.info().objDetected_.uniqueKeys();
for(int i=0; i<ids.size(); ++i)
{
QList<int> ids = response.info().objDetected_.uniqueKeys();
for(int i=0; i<ids.size(); ++i)
int count = response.info().objDetected_.count(ids[i]);
if(count == 1)
{
int count = response.info().objDetected_.count(ids[i]);
if(count == 1)
{
printf("Object %d detected.\n", ids[i]);
}
else
{
printf("Object %d detected %d times.\n", ids[i], count);
}
printf("Object %d detected.\n", ids[i]);
}
else
{
printf("Object %d detected %d times.\n", ids[i], count);
}
}
else
{
printf("No objects detected.\n");
}
// write json
if(!jsonPath.isEmpty())
{
find_object::JsonWriter::write(response.info(), jsonPath);
printf("JSON written to \"%s\"\n", jsonPath.toStdString().c_str());
}
}
else
{
printf("Failed to receive a response...\n");
return -1;
printf("No objects detected.\n");
}
// write json
if(!jsonPath.isEmpty())
{
find_object::JsonWriter::write(response.info(), jsonPath);
printf("JSON written to \"%s\"\n", jsonPath.toStdString().c_str());
}
}
else
{
printf("Server is busy...\n");
printf("Failed to receive a response...\n");
return -1;
}

View File

@ -155,7 +155,6 @@ int main(int argc, char * argv[])
QObject::connect(&request, SIGNAL(disconnected()), &app, SLOT(quit()));
QObject::connect(&request, SIGNAL(error(QAbstractSocket::SocketError)), &app, SLOT(quit()));
QObject::connect(&request, SIGNAL(bytesWritten(quint64)), &app, SLOT(quit()));
printf("Connecting to \"%s:%d\"...\n", ipAddress.toStdString().c_str(), port);
request.connectToHost(ipAddress, port);
@ -201,6 +200,7 @@ int main(int argc, char * argv[])
qint64 bytes = request.write(block);
printf("Service published (%d bytes)!\n", (int)bytes);
request.waitForBytesWritten();
request.waitForReadyRead();
return 0;