20 #if !defined(__MITSUBA_CORE_SCHED_H_)
21 #define __MITSUBA_CORE_SCHED_H_
50 virtual void set(
const WorkUnit *workUnit) = 0;
53 virtual void load(
Stream *stream) = 0;
56 virtual void save(
Stream *stream)
const = 0;
59 virtual std::string
toString()
const = 0;
83 virtual void load(
Stream *stream) = 0;
86 virtual void save(
Stream *stream)
const = 0;
89 virtual std::string
toString()
const = 0;
146 virtual void prepare() = 0;
157 const bool &stop) = 0;
235 virtual EStatus generateWork(
WorkUnit *unit,
int worker) = 0;
254 virtual void processResult(
const WorkResult *result,
263 virtual void handleCancellation();
292 virtual void bindResource(
const std::string &name,
int id);
301 virtual bool isLocal()
const;
322 virtual std::vector<std::string> getRequiredPlugins();
380 return cancel(proc,
false);
403 int registerMultiResource(std::vector<SerializableObject *> &resources);
413 void retainResource(
int resourceID);
423 bool unregisterResource(
int id);
433 void registerWorker(
Worker *processor);
436 void unregisterWorker(
Worker *processor);
439 size_t getWorkerCount()
const;
442 size_t getLocalWorkerCount()
const;
445 Worker *getWorker(
int index);
466 bool hasLocalWorkers()
const;
469 bool hasRemoteWorkers()
const;
490 struct ProcessRecord {
509 : id(
id), inflight(0), morework(
true), cancelled(
false),
510 active(
true), logLevel(logLevel) {
533 inline Item() : id(-1), workerIndex(-1), coreOffset(-1),
534 proc(NULL), rec(NULL), stop(false) {
537 std::string toString()
const;
540 struct ResourceRecord {
541 std::vector<SerializableObject *> resources;
546 inline ResourceRecord(SerializableObject *resource)
547 : resources(1), refCount(1), multi(false) {
548 resources[0] = resource;
551 inline ResourceRecord(std::vector<SerializableObject *> resources)
552 : resources(resources), refCount(1), multi(true) {
568 SerializableObject *getResource(
int id,
int coreIndex = -1);
571 const MemoryStream *getResourceStream(
int id);
577 bool isMultiResource(
int id)
const;
583 virtual ~Scheduler();
589 EStatus acquireWork(Item &item,
bool local,
bool onlyTry,
bool keepLock);
595 ProcessRecord *rec = item.rec;
597 item.proc->processResult(item.workResult, item.stop);
598 }
catch (
const std::exception &ex) {
599 Log(
EWarn,
"Caught an exception - canceling process %i: %s",
601 cancel(item.proc,
true);
607 if (rec->inflight == 0 && !rec->morework && !item.stop)
608 signalProcessTermination(item.proc, item.rec);
627 Log(
EError,
"Process %i is not locally known!",
id);
631 item.rec = m_processes[proc];
634 for (ParallelProcess::ResourceBindings::const_iterator it = bindings.begin();
635 it != bindings.end(); ++it)
636 item.wp->m_resources[(*it).first] = m_scheduler->getResource((*it).second, item.coreOffset);
639 item.workUnit = item.wp->createWorkUnit();
640 item.workResult = item.wp->createWorkResult();
641 }
catch (std::exception &) {
647 void signalProcessTermination(
ParallelProcess *proc, ProcessRecord *rec);
656 std::deque<int> m_localQueue, m_remoteQueue;
658 std::map<const ParallelProcess *, ProcessRecord *> m_processes;
660 std::map<int, ParallelProcess *> m_idToProcess;
662 std::map<int, ResourceRecord *> m_resources;
664 std::vector<Worker *> m_workers;
665 int m_resourceCounter, m_processCounter;
687 Worker(
const std::string &name);
690 virtual void clear();
694 int workerIndex,
int coreOffset);
702 virtual void signalResourceExpiration(
int id) = 0;
709 virtual void signalProcessCancellation(
int id) = 0;
715 virtual void signalProcessTermination(
int id) = 0;
719 bool onlyTry =
false,
bool keepLock =
false) {
720 return m_scheduler->acquireWork(m_schedItem, local, onlyTry, keepLock);
724 return m_scheduler->releaseLock();
729 m_scheduler->releaseWork(item);
734 return m_scheduler->setProcessByID(item,
id);
742 inline void cancel(
bool reduceInflight) {
743 m_scheduler->cancel(m_schedItem.proc, reduceInflight);
785 virtual
void signalResourceExpiration(
int id);
786 virtual
void signalProcessCancellation(
int id);
787 virtual
void signalProcessTermination(
int id);
796 void set(
const WorkUnit *workUnit);
797 void load(
Stream *stream);
798 void save(
Stream *stream)
const;
void releaseLock()
Release the main scheduler lock – internally used by the remote worker.
Definition: sched.h:592
Temporarily, no work units can be created.
Definition: sched.h:206
WorkProcessor(Stream *stream, InstanceManager *manager)
Definition: sched.h:165
ELogLevel getLogLevel() const
Return the log level for events associated with this process.
Definition: sched.h:308
void setProcessByID(Scheduler::Item &item, int id)
Initialize the m_schedItem data structure when only the process ID is known.
Definition: sched.h:733
bool isRemoteWorker() const
Is this a remote worker?
Definition: sched.h:679
Condition variable synchronization primitive. Can be used to wait for a condition to become true in a...
Definition: lock.h:106
ELogLevel m_logLevel
Definition: sched.h:334
static void staticInitialization()
Initializes the built-in reference count debugger (if enabled)
std::map< std::string, SerializableObject * > m_resources
Definition: sched.h:176
EStatus getReturnStatus() const
Query the return status of a process after its execution has finished.
Definition: sched.h:273
Base class of all reference-counted objects with serialization support.
Definition: serialization.h:35
EStatus
Return codes used by generateWork() and getReturnStatus()
Definition: sched.h:204
bool isRunning() const
Has the scheduler been started?
Definition: sched.h:475
Scheduler::EStatus acquireWork(bool local, bool onlyTry=false, bool keepLock=false)
Definition: sched.h:718
#define MTS_EXPORT_CORE
Definition: getopt.h:29
EStatus m_returnStatus
Definition: sched.h:333
Debug message, usually turned off.
Definition: formatter.h:30
The process finished / a piece of work was generated.
Definition: sched.h:207
Abstract parallelizable task.
Definition: sched.h:197
virtual ~ParallelProcess()
Virtual destructor.
Definition: sched.h:330
Simple RAII-style locking of a Mutex. On construction it locks the mutex and unlocks it on destructio...
Definition: lock.h:170
Abstract work result – represents the result of a processed WorkUnit instance.
Definition: sched.h:80
Dummy work unit without contents.
Definition: sched.h:794
bool cancel(ParallelProcess *proc)
Cancel the execution of a parallelizable process.
Definition: sched.h:379
void releaseSchedulerLock()
Definition: sched.h:723
Abstract work unit – represents a small amount of information that encodes part of a larger processin...
Definition: sched.h:47
MTS_EXPORT_CORE int getCoreCount()
Determine the number of available CPU cores.
#define Log(level, fmt,...)
Write a Log message to the console (to be used within subclasses of Object)
Definition: logger.h:35
void releaseWork(Item &item)
Definition: sched.h:594
void setProcessByID(Item &item, int id)
Definition: sched.h:623
Unknown return status.
Definition: sched.h:205
ResourceBindings m_bindings
Definition: sched.h:332
void start()
Start the thread.
EThreadPriority
Possible priority values for Thread::setPriority()
Definition: thread.h:37
Abstract seekable stream class.
Definition: stream.h:58
#define MTS_DECLARE_CLASS()
This macro must be used in the initial definition in classes that derive from Object.
Definition: class.h:158
Reference counting helper.
Definition: ref.h:40
Warning message.
Definition: formatter.h:32
const ResourceBindings & getResourceBindings() const
Return a list of all bound resources.
Definition: sched.h:313
Acquires work from the scheduler and executes it locally.
Definition: sched.h:759
Cross-platform thread implementation.
Definition: thread.h:34
ELogLevel
Available Log message types.
Definition: formatter.h:28
Error message, causes an exception to be thrown.
Definition: formatter.h:33
Scheduler::Item m_schedItem
Definition: sched.h:747
virtual ref< WorkProcessor > createWorkProcessor() const =0
Create an instance of the algorithm responsible for executing the work units of this parallel process...
Wait flag synchronization primitive. Can be used to wait for a certain event to occur.
Definition: lock.h:61
Scheduler * m_scheduler
Definition: sched.h:746
void cancel(bool reduceInflight)
Definition: sched.h:742
WorkProcessor()
Protected constructors.
Definition: sched.h:164
static Scheduler * getInstance()
Return a pointer to the scheduler of this process.
Definition: sched.h:472
void releaseWork(Scheduler::Item &item)
Release a processed work unit.
Definition: sched.h:728
Base class of all worker implementations.
Definition: sched.h:673
Parent of all Mitsuba classes.
Definition: object.h:38
bool m_isRemote
Definition: sched.h:749
Coordinates the serialization and unserialization of object graphs.
Definition: serialization.h:65
virtual std::string toString() const
Return a human-readable string representation of the object's contents.
Centralized task scheduler implementation.
Definition: sched.h:351
static void staticShutdown()
Free the memory taken by staticInitialization()
size_t getCoreCount() const
Return the number of cores exposed by this worker.
Definition: sched.h:677
std::map< std::string, int > ResourceBindings
Binding from local resource names to global resource IDs.
Definition: sched.h:201
size_t m_coreCount
Definition: sched.h:748
Thin wrapper around the recursive boost thread lock.
Definition: lock.h:34
Abstract work processor – takes work units and turns them into WorkResult instances.
Definition: sched.h:118
virtual std::string toString() const
Return a string representation.