Mitsuba Renderer  0.5.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sched.h
Go to the documentation of this file.
1 /*
2  This file is part of Mitsuba, a physically based rendering system.
3 
4  Copyright (c) 2007-2014 by Wenzel Jakob and others.
5 
6  Mitsuba is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License Version 3
8  as published by the Free Software Foundation.
9 
10  Mitsuba is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
19 #pragma once
20 #if !defined(__MITSUBA_CORE_SCHED_H_)
21 #define __MITSUBA_CORE_SCHED_H_
22 
24 #include <mitsuba/core/lock.h>
25 #include <deque>
26 
27 /**
28  * Uncomment this to enable scheduling debug messages
29  */
30 //#define DEBUG_SCHED 1
31 
33 
34 /**
35  * \brief Abstract work unit -- represents a small amount of information
36  * that encodes part of a larger processing task.
37  *
38  * Instead of the usual serialization function and unserialization
39  * constructor, implementations of this class supply \ref load()
40  * and \ref save() methods that can be used for essentially the
41  * same purpose, but without requiring any memory allocations.
42  *
43  * \sa WorkProcessor
44  * \ingroup libcore
45  * \ingroup libpython
46  */
48 public:
49  /// Copy the content of another work unit of the same type
50  virtual void set(const WorkUnit *workUnit) = 0;
51 
52  /// Fill the work unit with content acquired from a binary data stream
53  virtual void load(Stream *stream) = 0;
54 
55  /// Serialize a work unit to a binary data stream
56  virtual void save(Stream *stream) const = 0;
57 
58  /// Return a string representation
59  virtual std::string toString() const = 0;
60 
62 protected:
63  /// Virtual destructor
64  virtual ~WorkUnit() { }
65 };
66 
67 /**
68  * \brief Abstract work result -- represents the result of a
69  * processed \ref WorkUnit instance.
70  *
71  * Instead of the usual serialization function and unserialization
72  * constructor, implementations of this class supply \ref load()
73  * and \ref save() methods that can be used for essentially the
74  * same purpose, but without requiring any memory allocations.
75  *
76  * \sa WorkProcessor
77  * \ingroup libcore
78  * \ingroup libpython
79  */
81 public:
82  /// Fill the work result with content acquired from a binary data stream
83  virtual void load(Stream *stream) = 0;
84 
85  /// Serialize a work result to a binary data stream
86  virtual void save(Stream *stream) const = 0;
87 
88  /// Return a string representation
89  virtual std::string toString() const = 0;
90 
92 protected:
93  /// Virtual destructor
94  virtual ~WorkResult() { }
95 };
96 
97 /**
98  * \brief Abstract work processor -- takes work units and turns them into
99  *\ref WorkResult instances.
100  *
101  * When executing a parallel task using Mitsuba's scheduling system, the
102  * actual work is done in an implementation of this interface.
103  *
104  * The class is serializable so that it can be sent over the network if
105  * required. It is possible to keep local state in \ref WorkProcessor
106  * instances (e.g. scratch space for computations), though anything not
107  * returned in the form of a \ref WorkResult will eventually be lost.
108  * Each \ref Worker (both locally and remotely) has its own \ref WorkProcessor,
109  * and therefore no form of locking is required within instances of this class.
110  *
111  * \sa WorkUnit
112  * \sa WorkResult
113  * \sa ParallelProcess
114  * \sa Scheduler
115  * \ingroup libcore
116  * \ingroup libpython
117  */
119  friend class Scheduler;
120 public:
121  /// Create a work unit of the proper type and size.
122  virtual ref<WorkUnit> createWorkUnit() const = 0;
123 
124  /// Create a work result of the proper type and size
125  virtual ref<WorkResult> createWorkResult() const = 0;
126 
127  /**
128  * \brief Create a copy of this work processor instance.
129  *
130  * \remark In practice, before the cloned work processor
131  * is actually used, its \ref prepare() method will be called.
132  * Therefore, any state that is initialized in \ref prepeare()
133  * does not have to be copied.
134  */
135  virtual ref<WorkProcessor> clone() const = 0;
136 
137  /**
138  * \brief Called once before processing starts.
139  *
140  * This is useful for allocating scratch space or resolving references
141  * to resource objects. Lengthy computations should be performed in
142  * process() instead of here, since this this method will be called
143  * while the central scheduler lock is held. A thrown exception will
144  * lead to the termination of the parallel process.
145  */
146  virtual void prepare() = 0;
147 
148  /**
149  * \brief Process a work unit and store the computed results.
150  *
151  * The <tt>active</tt> parameter can be used to signal a premature
152  * stop of the execution flow. In this case, the work result is allowed
153  * to be undefined (it will simply be ignored). A thrown exception will
154  * lead to the termination of the parallel process.
155  */
156  virtual void process(const WorkUnit *workUnit, WorkResult *workResult,
157  const bool &stop) = 0;
158 
160 protected:
161  /// Virtual destructor
162  virtual ~WorkProcessor() { }
163  /// Protected constructors
164  inline WorkProcessor() { }
165  inline WorkProcessor(Stream *stream, InstanceManager *manager)
166  : SerializableObject(stream, manager) { }
167 
168  /**
169  * \brief Look up a named resource, which has been bound to
170  * the associated parallel process.
171  *
172  * Throws an exception if the resource is not known / bound.
173  */
174  SerializableObject *getResource(const std::string &name);
175 protected:
176  std::map<std::string, SerializableObject *> m_resources;
177 };
178 
179 /**
180  * \brief Abstract parallelizable task.
181  *
182  * Instances of this class model a larger piece of work that can be split
183  * into independent `units' and subsequently farmed out over a cluster or
184  * processed locally. After the work units have been completed, the results
185  * are pieced back together to a solution of the original large-scale problem.
186  *
187  * This class implements the core logic running on the central scheduling
188  * server, i.e. the part that is responsible for generating work units and
189  * accepting their results. The module that performs the actual computation
190  * is an instance of \ref WorkProcessor, which is also specified here.
191  * Finally, the this class references `resources', which denote
192  * chunks of globally shared read-only data required during execution.
193  *
194  * \ingroup libcore
195  * \ingroup libpython
196  */
198  friend class Scheduler;
199 public:
200  /// Binding from local resource names to global resource IDs
201  typedef std::map<std::string, int> ResourceBindings;
202 
203  /// Return codes used by generateWork() and getReturnStatus()
204  enum EStatus {
205  EUnknown, ///< Unknown return status
206  EPause, ///< Temporarily, no work units can be created
207  ESuccess, ///< The process finished / a piece of work was generated
208  EFailure ///< The process failed / no more work is available
209  };
210 
211  /**
212  * \brief Generate a piece of work.
213  *
214  * Takes a pre-allocated \ref WorkUnit instance of
215  * the appropriate sub-type and size (as specified by
216  * \ref ParallelProcess::getWorkUnitName()) and
217  * fills it with the appropriate content. Returns ESuccess
218  * on success and EFailure or EPause when no more work is
219  * left -- in that case, the work unit will be ignored and
220  * the process completed (\ref EFailure) or temporarily
221  * paused (\ref EPause). When \ref EPause was used,
222  * resubmission via \ref Scheduler::schedule() will
223  * be required once more work is available. In some cases, it
224  * is useful to distribute 'nearby' pieces of work to the same
225  * processor -- the \c worker parameter can be used to
226  * implement this.
227  * This function should run as quickly as possible, since it
228  * will be executed while the scheduler mutex is held. A
229  * thrown exception will lead to the termination of the
230  * parallel process.
231  *
232  * \param unit Work unit data structure to be filled
233  * \param worker ID of the worker executing this function
234  */
235  virtual EStatus generateWork(WorkUnit *unit, int worker) = 0;
236 
237  /**
238  * \brief Called whenever a work unit has been completed.
239  *
240  * Note that this function may concurrently be executed by
241  * multiple threads. Also, processing of work results will
242  * generally be out of order with respect to the creation
243  * in \ref generateWork().
244  *
245  * When a work unit is only partially completed due to
246  * a call to \ref Scheduler::cancel(), the second
247  * parameter is set to true.
248  * A thrown exception will lead to the termination of
249  * the parallel process.
250  *
251  * \param result Work result to be processed
252  * \param cancelled Was the associated work unit not fully completed
253  */
254  virtual void processResult(const WorkResult *result,
255  bool cancelled) = 0;
256 
257  /**
258  * \brief Called when the parallel process is canceled by
259  * \ref Scheduler::cancel().
260  *
261  * The default implementation does nothing.
262  */
263  virtual void handleCancellation();
264 
265  /**
266  * \brief Query the return status of a process after its
267  * execution has finished.
268  *
269  * Returns one of \ref Success, \ref Failure or \ref Unknown.
270  * (\ref EUnknown means that the process is either still running
271  * or has never been scheduled).
272  */
273  inline EStatus getReturnStatus() const { return m_returnStatus; }
274 
275  /**
276  * \brief Create an instance of the algorithm responsible
277  * for executing the work units of this parallel process.
278  */
279  virtual ref<WorkProcessor> createWorkProcessor() const = 0;
280 
281  /**
282  * \brief Bind a resource to this parallel process.
283  *
284  * Takes a resource ID as given by the scheduler and associates it
285  * with a name. This name can later be used by the work processor
286  * to access the resource data.
287  *
288  * \param name Process-specific name of the resource
289  * \param id Resource ID as returned by \ref Scheduler::registerResource()
290  * \sa WorkProcessor::getResource
291  */
292  virtual void bindResource(const std::string &name, int id);
293 
294  /**
295  * \brief Is this process strictly local?
296  *
297  * If a process is marked as local, it shouldn't be distributed
298  * to remote processing nodes. The default implementation
299  * returns false.
300  */
301  virtual bool isLocal() const;
302 
303  /**
304  * \brief Return the log level for events associated with this process.
305  *
306  * By default, this is set to EDebug
307  */
308  inline ELogLevel getLogLevel() const { return m_logLevel; }
309 
310  /**
311  * \brief Return a list of all bound resources
312  */
313  inline const ResourceBindings &getResourceBindings() const { return m_bindings; }
314 
315  /**
316  * \brief Return a list of plugins required by this parallel process.
317  *
318  * This is required so that remote machines can load the plugins before
319  * they accept work from this process. The default implementation just
320  * returns all plugins that are loaded in the current application.
321  */
322  virtual std::vector<std::string> getRequiredPlugins();
323 
325 protected:
326  /// Protected constructor
327  inline ParallelProcess() : m_returnStatus(EUnknown),
328  m_logLevel(EDebug) { }
329  /// Virtual destructor
330  virtual ~ParallelProcess() { }
331 protected:
335 };
336 
337 class Worker;
338 
339 /**
340  * \brief Centralized task scheduler implementation.
341  *
342  * Accepts parallelizable jobs and distributes their computational load
343  * both locally and remotely. This is done by associating different types
344  * of \ref Worker instances with the scheduler. These try to acquire work
345  * units from the scheduler, which are then executed on the current machine
346  * or sent to remote nodes over a network connection.
347  *
348  * \ingroup libcore
349  * \ingroup libpython
350  */
352  friend class Worker;
353 public:
354  /**
355  * \brief Schedule a parallelizable process for execution.
356  *
357  * If the scheduler is currently running and idle, its execution
358  * will begin immediately. Returns \c false if the process
359  * is already scheduled and has not yet terminated and \c true
360  * in any other case.
361  */
362  bool schedule(ParallelProcess *process);
363 
364  /**
365  * \brief Block until the process has successfully been completed
366  * or canceled prematurely.
367  *
368  * Returns false if the process does not exist or has already
369  * finished by the time \ref wait() is invoked.
370  */
371  bool wait(const ParallelProcess *process);
372 
373  /**
374  * \brief Cancel the execution of a parallelizable process.
375  *
376  * Upon return, no more work from this process is running.
377  * Returns false if the process does not exist (anymore).
378  */
379  inline bool cancel(ParallelProcess *proc) {
380  return cancel(proc, false);
381  }
382 
383  /**
384  * \brief Register a serializable resource with the scheduler.
385  *
386  * A resource should be thought of as a constant state that is shared
387  * amongst all processing nodes. Resources can be reused by
388  * subsequent parallel processes, and consequently do not have to be
389  * re-transmitted over the network. Returns a resource ID, which can be
390  * used to reference the associated data.
391  */
392  int registerResource(SerializableObject *resource);
393 
394  /**
395  * \brief Register a \a multiple resource with the scheduler.
396  *
397  * \a Multi means that in comparison to the previous method, a separate
398  * instance is provided for every core. An example where this is useful
399  * is to distribute random generator state when performing parallel
400  * Monte Carlo simulations. \c resources must be a vector whose
401  * length is equal to \ref getCoreCount().
402  */
403  int registerMultiResource(std::vector<SerializableObject *> &resources);
404 
405  /**
406  * \brief Increase the reference count of a previously registered resource.
407  *
408  * The resource must be unregistered an additional time after calling
409  * this function.
410  *
411  * \sa unregisterResource
412  */
413  void retainResource(int resourceID);
414 
415  /**
416  * \brief Unregister a resource from the scheduler
417  *
418  * Note that the resource's won't be removed until all processes using
419  * it have terminated)
420  *
421  * \return \c false if the resource could not be found
422  */
423  bool unregisterResource(int id);
424 
425  /**
426  * \brief Return the ID of a registered resource
427  *
428  * Throws an exception if the resource cannot be found.
429  */
430  int getResourceID(const SerializableObject *resource) const;
431 
432  /// Register a worker with the scheduler
433  void registerWorker(Worker *processor);
434 
435  /// Unregister a worker from the scheduler
436  void unregisterWorker(Worker *processor);
437 
438  /// Get the number of workers
439  size_t getWorkerCount() const;
440 
441  /// Get the number of local workers
442  size_t getLocalWorkerCount() const;
443 
444  /// Retrieve one of the workers by index
445  Worker *getWorker(int index);
446 
447  /// Start all workers and begin execution of any scheduled processes
448  void start();
449 
450  /**
451  * \brief Pause the distribution of work units and shut down all
452  * running workers.
453  *
454  * Any currently scheduled work units are still completed.
455  * Processing can be resumed via \ref start().
456  */
457  void pause();
458 
459  /// Cancel all running processes and free memory used by resources
460  void stop();
461 
462  /// Return the total number of cores exposed through this scheduler
463  size_t getCoreCount() const;
464 
465  /// Does the scheduler have one or more local workers?
466  bool hasLocalWorkers() const;
467 
468  /// Does the scheduler have one or more remote workers?
469  bool hasRemoteWorkers() const;
470 
471  /// Return a pointer to the scheduler of this process
472  inline static Scheduler *getInstance() { return m_scheduler; }
473 
474  /// Has the scheduler been started?
475  inline bool isRunning() const { return m_running; }
476 
477  /// Is the scheduler currently executing work?
478  bool isBusy() const;
479 
480  /// Initialize the scheduler of this process -- called once in main()
481  static void staticInitialization();
482 
483  /// Free the memory taken by staticInitialization()
484  static void staticShutdown();
485 
487 public:
488  // Public, but shouldn't be part of the documentation
489  /// \cond
490  struct ProcessRecord {
491  /* Unique ID value assigned to this process */
492  int id;
493  /* Current number of in-flight work units */
494  int inflight;
495  /* Is the parallel process still generating work */
496  bool morework;
497  /* Was the process cancelled using \c cancel()?*/
498  bool cancelled;
499  /* Is the process currently in the queue? */
500  bool active;
501  /* Signaled every time a work unit arrives */
503  /* Set when the process is done/canceled */
504  ref<WaitFlag> done;
505  /* Log level for events associated with this process */
506  ELogLevel logLevel;
507 
508  inline ProcessRecord(int id, ELogLevel logLevel, Mutex *mutex)
509  : id(id), inflight(0), morework(true), cancelled(false),
510  active(true), logLevel(logLevel) {
511  cond = new ConditionVariable(mutex);
512  done = new WaitFlag();
513  }
514  };
515 
516  /**
517  * Data structure, which contains a piece of work
518  * as well as the information required to either execute
519  * it locally or submit it to a processing node over the
520  * network.
521  */
522  struct Item {
523  int id;
524  int workerIndex;
525  int coreOffset;
526  ParallelProcess *proc;
527  ProcessRecord *rec;
529  ref<WorkUnit> workUnit;
530  ref<WorkResult> workResult;
531  bool stop;
532 
533  inline Item() : id(-1), workerIndex(-1), coreOffset(-1),
534  proc(NULL), rec(NULL), stop(false) {
535  }
536 
537  std::string toString() const;
538  };
539 
540  struct ResourceRecord {
541  std::vector<SerializableObject *> resources;
542  ref<MemoryStream> stream;
543  int refCount;
544  bool multi;
545 
546  inline ResourceRecord(SerializableObject *resource)
547  : resources(1), refCount(1), multi(false) {
548  resources[0] = resource;
549  }
550 
551  inline ResourceRecord(std::vector<SerializableObject *> resources)
552  : resources(resources), refCount(1), multi(true) {
553  }
554  };
555 
556  /// A list of status codes returned by acquireWork()
557  enum EStatus {
558  /// Sucessfully acquired a work unit
559  EOK,
560  /// There is currently no work (and onlyTry was set to true)
561  ENone,
562  /// The scheduler is shutting down
563  EStop
564  };
565  /// \endcond
566 
567  /// Look up a resource by ID & core index
568  SerializableObject *getResource(int id, int coreIndex = -1);
569 
570  /// Return a resource in the form of a binary data stream
571  const MemoryStream *getResourceStream(int id);
572 
573  /**
574  * \brief Test whether this is a multi-resource,
575  * i.e. different for every core.
576  */
577  bool isMultiResource(int id) const;
578 protected:
579  /// Protected constructor
580  Scheduler();
581 
582  /// Virtual destructor
583  virtual ~Scheduler();
584 
585  /**
586  * Acquire a piece of work from the scheduler -- internally
587  * used by the different worker implementations.
588  */
589  EStatus acquireWork(Item &item, bool local, bool onlyTry, bool keepLock);
590 
591  /// Release the main scheduler lock -- internally used by the remote worker
592  inline void releaseLock() { m_mutex->unlock(); }
593 
594  inline void releaseWork(Item &item) {
595  ProcessRecord *rec = item.rec;
596  try {
597  item.proc->processResult(item.workResult, item.stop);
598  } catch (const std::exception &ex) {
599  Log(EWarn, "Caught an exception - canceling process %i: %s",
600  item.id, ex.what());
601  cancel(item.proc, true);
602  return;
603  }
604  LockGuard lock(m_mutex);
605  --rec->inflight;
606  rec->cond->signal();
607  if (rec->inflight == 0 && !rec->morework && !item.stop)
608  signalProcessTermination(item.proc, item.rec);
609  }
610 
611  /**
612  * Cancel the execution of a parallelizable process. Upon
613  * return, no more work from this process is running. When
614  * the second parameter is set to true, the number of in-flight
615  * work units for this process is reduced by one.
616  */
617  bool cancel(ParallelProcess *proc, bool reduceInflight);
618 
619  /**
620  * Internally used to prepare a Scheduler::Item structure
621  * when only the process ID is known.
622  */
623  inline void setProcessByID(Item &item, int id) {
624  LockGuard lock(m_mutex);
625  ParallelProcess *proc = m_idToProcess[id];
626  if (proc == NULL) {
627  Log(EError, "Process %i is not locally known!", id);
628  };
629  item.proc = proc;
630  item.id = id;
631  item.rec = m_processes[proc];
632  item.wp = proc->createWorkProcessor();
633  const ParallelProcess::ResourceBindings &bindings = item.proc->getResourceBindings();
634  for (ParallelProcess::ResourceBindings::const_iterator it = bindings.begin();
635  it != bindings.end(); ++it)
636  item.wp->m_resources[(*it).first] = m_scheduler->getResource((*it).second, item.coreOffset);
637  try {
638  item.wp->prepare();
639  item.workUnit = item.wp->createWorkUnit();
640  item.workResult = item.wp->createWorkResult();
641  } catch (std::exception &) {
642  throw;
643  }
644  }
645 
646  /// Announces the termination of a process
647  void signalProcessTermination(ParallelProcess *proc, ProcessRecord *rec);
648 private:
649  /// Global scheduler instance
650  static ref<Scheduler> m_scheduler;
651  /// Mutex, which protects local data structures
652  mutable ref<Mutex> m_mutex;
653  /// CV used to signal the availability of work
654  ref<ConditionVariable> m_workAvailable;
655  /// Scheduled processes in FIFO order
656  std::deque<int> m_localQueue, m_remoteQueue;
657  /// Set of all currently scheduled processes
658  std::map<const ParallelProcess *, ProcessRecord *> m_processes;
659  /// Maps process IDs to processes
660  std::map<int, ParallelProcess *> m_idToProcess;
661  /// List of shared resources
662  std::map<int, ResourceRecord *> m_resources;
663  /// List of all active workers
664  std::vector<Worker *> m_workers;
665  int m_resourceCounter, m_processCounter;
666  bool m_running;
667 };
668 
669 /**
670  * \brief Base class of all worker implementations
671  * \ingroup libpython
672  */
673 class MTS_EXPORT_CORE Worker : public Thread {
674  friend class Scheduler;
675 public:
676  /// Return the number of cores exposed by this worker
677  inline size_t getCoreCount() const { return m_coreCount; }
678  /// Is this a remote worker?
679  inline bool isRemoteWorker() const { return m_isRemote; };
680 
682 protected:
683  /// Virtual destructor
684  virtual ~Worker() { }
685 
686  /// Protected constructor
687  Worker(const std::string &name);
688 
689  /* Decrement reference counts to any referenced objects */
690  virtual void clear();
691 
692  /// Used internally by the scheduler
693  virtual void start(Scheduler *scheduler,
694  int workerIndex, int coreOffset);
695 
696  /**
697  * \brief Called to inform a worker that a resource is no longer in use.
698  *
699  * The remote worker uses this to notify the machine on the other end that
700  * the memory used by this resource can now be released.
701  */
702  virtual void signalResourceExpiration(int id) = 0;
703 
704  /**
705  * \brief Called to inform a worker that a process has been cancelled
706  *
707  * Guaranteed to be called while the Scheduler's main lock is held.
708  */
709  virtual void signalProcessCancellation(int id) = 0;
710 
711  /**
712  * \brief Called to inform a worker that a process has successfully been
713  * completed and any associated resources can be freed.
714  */
715  virtual void signalProcessTermination(int id) = 0;
716 
717  /* Inline functions to access protected members of Scheduler */
718  inline Scheduler::EStatus acquireWork(bool local,
719  bool onlyTry = false, bool keepLock = false) {
720  return m_scheduler->acquireWork(m_schedItem, local, onlyTry, keepLock);
721  }
722 
724  return m_scheduler->releaseLock();
725  }
726 
727  /// Release a processed work unit
728  inline void releaseWork(Scheduler::Item &item) {
729  m_scheduler->releaseWork(item);
730  }
731 
732  /// Initialize the m_schedItem data structure when only the process ID is known
733  void setProcessByID(Scheduler::Item &item, int id) {
734  return m_scheduler->setProcessByID(item, id);
735  }
736 
737  /**
738  * Cancel the currently scheduled parallel process and possibly
739  * reduce the number of in-flight work units
740  * Returns false if the process does not exist (anymore).
741  */
742  inline void cancel(bool reduceInflight) {
743  m_scheduler->cancel(m_schedItem.proc, reduceInflight);
744  }
745 protected:
747  Scheduler::Item m_schedItem;
748  size_t m_coreCount;
750 };
751 
752 /**
753  * \brief Acquires work from the scheduler and executes
754  * it locally.
755  *
756  * \ingroup libcore
757  * \ingroup libpython
758  */
760 public:
761  /**
762  * \brief Create a new local worker thread
763  *
764  * \param coreID
765  * When an CPU core ID (>=0) is specified here, the worker
766  * thread will attempt to register core affinity with the
767  * operating system. Passing -1 disables this.
768  *
769  * \param name
770  * An identifying string for this thread
771  *
772  * \param priority
773  * The desired thread priority (not supported on some
774  * operating systems)
775  */
776  LocalWorker(int coreID, const std::string &name,
777  Thread::EThreadPriority priority = Thread::ENormalPriority);
778 
780 protected:
781  /// Virtual destructor
782  virtual ~LocalWorker();
783  /* Worker implementation */
784  virtual void run();
785  virtual void signalResourceExpiration(int id);
786  virtual void signalProcessCancellation(int id);
787  virtual void signalProcessTermination(int id);
788 };
789 
790 /**
791  * \brief Dummy work unit without contents
792  * \ingroup libcore
793  */
795 public:
796  void set(const WorkUnit *workUnit);
797  void load(Stream *stream);
798  void save(Stream *stream) const;
799  std::string toString() const;
800 
802 protected:
803  virtual ~DummyWorkUnit() { }
804 };
805 
807 
808 #endif /* __MITSUBA_CORE_SCHED_H_ */
void releaseLock()
Release the main scheduler lock – internally used by the remote worker.
Definition: sched.h:592
Temporarily, no work units can be created.
Definition: sched.h:206
WorkProcessor(Stream *stream, InstanceManager *manager)
Definition: sched.h:165
ELogLevel getLogLevel() const
Return the log level for events associated with this process.
Definition: sched.h:308
void setProcessByID(Scheduler::Item &item, int id)
Initialize the m_schedItem data structure when only the process ID is known.
Definition: sched.h:733
bool isRemoteWorker() const
Is this a remote worker?
Definition: sched.h:679
Condition variable synchronization primitive. Can be used to wait for a condition to become true in a...
Definition: lock.h:106
ELogLevel m_logLevel
Definition: sched.h:334
static void staticInitialization()
Initializes the built-in reference count debugger (if enabled)
std::map< std::string, SerializableObject * > m_resources
Definition: sched.h:176
EStatus getReturnStatus() const
Query the return status of a process after its execution has finished.
Definition: sched.h:273
Base class of all reference-counted objects with serialization support.
Definition: serialization.h:35
EStatus
Return codes used by generateWork() and getReturnStatus()
Definition: sched.h:204
bool isRunning() const
Has the scheduler been started?
Definition: sched.h:475
Scheduler::EStatus acquireWork(bool local, bool onlyTry=false, bool keepLock=false)
Definition: sched.h:718
#define MTS_EXPORT_CORE
Definition: getopt.h:29
EStatus m_returnStatus
Definition: sched.h:333
Debug message, usually turned off.
Definition: formatter.h:30
The process finished / a piece of work was generated.
Definition: sched.h:207
Abstract parallelizable task.
Definition: sched.h:197
virtual ~ParallelProcess()
Virtual destructor.
Definition: sched.h:330
#define MTS_NAMESPACE_BEGIN
Definition: platform.h:137
Simple RAII-style locking of a Mutex. On construction it locks the mutex and unlocks it on destructio...
Definition: lock.h:170
Abstract work result – represents the result of a processed WorkUnit instance.
Definition: sched.h:80
Dummy work unit without contents.
Definition: sched.h:794
bool cancel(ParallelProcess *proc)
Cancel the execution of a parallelizable process.
Definition: sched.h:379
void releaseSchedulerLock()
Definition: sched.h:723
Abstract work unit – represents a small amount of information that encodes part of a larger processin...
Definition: sched.h:47
MTS_EXPORT_CORE int getCoreCount()
Determine the number of available CPU cores.
#define Log(level, fmt,...)
Write a Log message to the console (to be used within subclasses of Object)
Definition: logger.h:35
void releaseWork(Item &item)
Definition: sched.h:594
void setProcessByID(Item &item, int id)
Definition: sched.h:623
Unknown return status.
Definition: sched.h:205
ResourceBindings m_bindings
Definition: sched.h:332
void start()
Start the thread.
EThreadPriority
Possible priority values for Thread::setPriority()
Definition: thread.h:37
Abstract seekable stream class.
Definition: stream.h:58
#define MTS_DECLARE_CLASS()
This macro must be used in the initial definition in classes that derive from Object.
Definition: class.h:158
Reference counting helper.
Definition: ref.h:40
Warning message.
Definition: formatter.h:32
const ResourceBindings & getResourceBindings() const
Return a list of all bound resources.
Definition: sched.h:313
Acquires work from the scheduler and executes it locally.
Definition: sched.h:759
Cross-platform thread implementation.
Definition: thread.h:34
ELogLevel
Available Log message types.
Definition: formatter.h:28
Error message, causes an exception to be thrown.
Definition: formatter.h:33
Scheduler::Item m_schedItem
Definition: sched.h:747
virtual ref< WorkProcessor > createWorkProcessor() const =0
Create an instance of the algorithm responsible for executing the work units of this parallel process...
Wait flag synchronization primitive. Can be used to wait for a certain event to occur.
Definition: lock.h:61
Scheduler * m_scheduler
Definition: sched.h:746
void cancel(bool reduceInflight)
Definition: sched.h:742
WorkProcessor()
Protected constructors.
Definition: sched.h:164
static Scheduler * getInstance()
Return a pointer to the scheduler of this process.
Definition: sched.h:472
void releaseWork(Scheduler::Item &item)
Release a processed work unit.
Definition: sched.h:728
Base class of all worker implementations.
Definition: sched.h:673
Parent of all Mitsuba classes.
Definition: object.h:38
bool m_isRemote
Definition: sched.h:749
Coordinates the serialization and unserialization of object graphs.
Definition: serialization.h:65
virtual std::string toString() const
Return a human-readable string representation of the object&#39;s contents.
Centralized task scheduler implementation.
Definition: sched.h:351
static void staticShutdown()
Free the memory taken by staticInitialization()
size_t getCoreCount() const
Return the number of cores exposed by this worker.
Definition: sched.h:677
std::map< std::string, int > ResourceBindings
Binding from local resource names to global resource IDs.
Definition: sched.h:201
#define MTS_NAMESPACE_END
Definition: platform.h:138
size_t m_coreCount
Definition: sched.h:748
Thin wrapper around the recursive boost thread lock.
Definition: lock.h:34
Abstract work processor – takes work units and turns them into WorkResult instances.
Definition: sched.h:118
virtual std::string toString() const
Return a string representation.