
#include "workerthread.h"

//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
// Job implementation
//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/

// Constructor 
Job::Job ( const char *q, const hbEventContext *c )
{
    _query   = 0 ;
    _context = 0 ;

    if ( q ) 
    {
        _query = new char [ strlen( q ) + 1 ];
        if ( _query )
        {
            strcpy ( _query, q );
        }
    }
    if ( c ) 
    {
        _context = new hbEventContext ( *c );
    }
}

// Destructor 
Job::~Job ()
{
    delete _query ; 
    delete _context;
}

// Copy Constructor 
Job::Job ( Job  &arg )
{
    if ( &arg == this )
    {
        return;
    }

    _query   = 0 ;
    _context = 0 ;

    if ( arg._query ) 
    {
        _query = new char [ strlen( arg._query ) + 1 ];
        if ( _query )
        {
            strcpy ( _query, arg._query );
        }
    }

    if ( arg._context ) 
    {
        _context = new hbEventContext ( *arg._context );
    }
}



//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
// JobQueue implementation
//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/

// Constructor
JobQueue::JobQueue()
{
    _elems     = 0;
    _topIdx    = 0;
    _bottomIdx = 0;
}

//
// Destructor
// frees up all memory
JobQueue::~JobQueue()
{
    Job  *p = 0;
    do 
    { 
        p = get( 0 );
        if ( p ) delete p;
    }  while ( p ); 
}

//
// Add a job to the queue, thread safe
//
JobQueue::add( Job  &arg )
{
    int rc = 0;

    // 
    // copy thread argument and put it into queue
    // 

    _mutex.Lock();

    if ( _elems < MAX_JOBS ) 
    {
        Job  *newarg = new Job ( arg );

        _queue[ _topIdx ] = newarg;

        ++_elems ; 
        _topIdx = (_topIdx+1) %  MAX_JOBS;

        // now send a signal to potential waiting threads, 
        // that a new element is available in the queue

        _elementAvailable.SignalEvent();
    }
    else
    {
        rc = -1;
        hbAPI::writeLog("Cannot add job to JobQueue: Queue full\n");
    }

    _mutex.Release();
 
    return rc;
}

//
// Get a job from the queue, thread safe
//
// The "wait" parameter controls wether the function
// blocks until an element is available or if it just
// returns NULL if no element is available.
// Please note that this function might also return NULL,
// if "wait" is true, because the _elementAvailable signal is 
// also sent by the JobQueue.stop() function.
//

Job  * JobQueue::get( int wait )
{
    int         rc           = 0;
    Job        *result       = 0;

    _mutex.Lock();

    if ( _elems <= 0 && wait ) 
    {
        //
        // no elements available 
        // wait for the appropriate signal
        //
#ifdef _WIN32
        // the eventMutex behaviour under Windows
        // is (of course :-() different to the posix
        // variant. In the posix variant the system
        // assures that _mutex is released before waiting
        // for the condition and then again the mutex is locked

        _mutex.Release();
        _elementAvailable.WaitForEvent( _mutex );
        _mutex.Lock();

#else
        _elementAvailable.WaitForEvent( _mutex );
#endif
    }

    if ( _elems > 0 ) 
    {
        result = _queue[ _bottomIdx ] ;
 
        --_elems ; 
  
       _bottomIdx = (_bottomIdx+1) %  MAX_JOBS;
    }

    _mutex.Release();

    return result;
}

//
// Wake up threads waiting for new elements in the queue
// This is to give the workerthread loop the chance
// to terminate.
//

void JobQueue::stop()
{
    _mutex.Lock();
    _elementAvailable.SignalEvent();
    _mutex.Release();
}



//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/
// WorkerThread implementation
//_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/_/

//
// Constructor
// has the thread's main function as parameter
//
#ifdef _WIN32
WorkerThread::WorkerThread( void (*function) (void *) )
#else 
WorkerThread::WorkerThread( void *(*function) (void *) )
#endif 
{
    _stop     = 0;
    _function = function;
}

//
// start the thread
//
int WorkerThread::start()
{
    int rc;

    if ( ! _function )
    {
       return -1;
    }

#ifdef _WIN32

    _theThread = _beginthread( _function, 0, this );
    if ( _theThread == -1 ) rc = -1;
    else rc = 0;

#else

    rc = pthread_create( &_theThread, NULL, _function, this );

#endif

    return rc;
}

//
// stop the thread
//
void WorkerThread::stop()
{
  // We could do a pthread_cancel() or Windows equivalent here
  // but for the sake of simplicity in this sample we prefer a "soft" 
  // termination of the thread, i.e. the worker thread main function's 
  // loop always checks if the thread has to be stopped by called stopped();
  // Please note that we do not wait for the actual termination of the thread,
  // though this should be done in a production system.

  // first we set the thread stop flag
  _stop = 1;

  // then we wake up threads waiting for the _elementsAvailable
  // event in the job queue.

  _jobs.stop();
}
