Mitsuba Renderer  0.5.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sched_remote.h
Go to the documentation of this file.
1 /*
2  This file is part of Mitsuba, a physically based rendering system.
3 
4  Copyright (c) 2007-2014 by Wenzel Jakob and others.
5 
6  Mitsuba is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License Version 3
8  as published by the Free Software Foundation.
9 
10  Mitsuba is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
19 #pragma once
20 #if !defined(__MITSUBA_CORE_SCHED_REMOTE_H_)
21 #define __MITSUBA_CORE_SCHED_REMOTE_H_
22 
23 #include <mitsuba/core/sched.h>
24 #include <set>
25 
26 /// Default port of <tt>mtssrv</tt>
27 #define MTS_DEFAULT_PORT 7554
28 
29 /** How many work units should be sent to a remote worker
30  at a time? This is a multiple of the worker's core count */
31 #define MTS_BACKLOG_FACTOR 3
32 
33 /** Once the back log factor drops below this value (also a
34  multiple of the core size), the stream processor will
35  continue sending batches of work units */
36 #define MTS_CONTINUE_FACTOR 2
37 
39 
40 class RemoteWorkerReader;
41 class StreamBackend;
42 
43 /**
44  * \brief Acquires work from the scheduler and forwards
45  * it to a processing node reachable through a \ref Stream.
46  *
47  * \ingroup libcore
48  * \ingroup libpython
49  */
51  friend class RemoteWorkerReader;
52 public:
53  /**
54  * \brief Construct a new remote worker with the given name and
55  * communication stream
56  */
57  RemoteWorker(const std::string &name, Stream *stream);
58 
59  /// Return the name of the node on the other side
60  inline const std::string &getNodeName() const { return m_nodeName; }
61 
63 protected:
64  /// Virtual destructor
65  virtual ~RemoteWorker();
66  /* Worker implementation */
67  virtual void run();
68  virtual void clear();
69  virtual void signalResourceExpiration(int id);
70  virtual void signalProcessCancellation(int id);
71  virtual void signalProcessTermination(int id);
72  virtual void start(Scheduler *scheduler, int workerIndex, int coreOffset);
73  void flush();
74 
75  inline void signalCompletion() {
76  LockGuard lock(m_mutex);
77  m_inFlight--;
78  m_finishCond->signal();
79  }
80 protected:
86 
87  /* List of processes and resources that are
88  currently active at the remote node */
89  std::set<int> m_resources;
90  std::set<int> m_processes;
91  std::set<std::string> m_plugins;
92  std::string m_nodeName;
93  size_t m_inFlight;
94 };
95 
96 /**
97  * \brief Communication helper thread required by \ref RemoteWorker.
98  *
99  * Constantly waits for finished work units sent by the processing node.
100  */
102  friend class RemoteWorker;
103 public:
105 
106  inline void shutdown() { m_shutdown = true; }
107 
109 protected:
110  /// Virtual destructor
111  virtual ~RemoteWorkerReader() { }
112  /// Thread body
113  void run();
114 private:
115  std::vector<Thread *> m_joinThreads;
116  RemoteWorker *m_parent;
117  ref<Stream> m_stream;
118  bool m_shutdown;
119  int m_currentID;
120  Scheduler::Item m_schedItem;
121 };
122 
123 /**
124  * \brief Parallel process facade used to insert work units from a
125  * remote scheduler into the local one.
126  *
127  * \ingroup libcore
128  */
130 public:
131  /**
132  * \brief Create a new remote process
133  *
134  * \param id Identification number for this process
135  * \param logLevel Log level for events associated with this process
136  * \param backend The responsible server-side communication backend
137  * \param proc Work processor instance for use with this process
138  */
139  RemoteProcess(int id, ELogLevel logLevel,
140  StreamBackend *backend, WorkProcessor *proc);
141 
142  /* ParallelProcess interface implementation */
143  EStatus generateWork(WorkUnit *unit, int worker);
144  void processResult(const WorkResult *result,
145  bool cancelled);
147  void handleCancellation();
148 
149  /// Get an empty work unit from the process (or create one)
151  ref<WorkUnit> wu;
152  LockGuard lock(m_mutex);
153  if (m_empty.empty()) {
154  wu = m_wp->createWorkUnit();
155  wu->incRef();
156  } else {
157  wu = m_empty.back();
158  m_empty.pop_back();
159  }
160  return wu;
161  }
162 
163  /// Make a full work unit available to the process
164  inline void putFullWorkUnit(WorkUnit *wu) {
165  LockGuard lock(m_mutex);
166  m_full.push_back(wu);
167  }
168 
169  /// Mark the process as finished
170  inline void setDone() {
171  LockGuard lock(m_mutex);
172  m_done = true;
173  }
174 
176 protected:
177  // Virtual destructor
178  virtual ~RemoteProcess();
179 private:
180  int m_id;
181  ref<StreamBackend> m_backend;
182  std::vector<WorkUnit *> m_empty;
183  std::deque<WorkUnit *> m_full;
184  ref<WorkProcessor> m_wp;
185  ref<Mutex> m_mutex;
186  bool m_done;
187 };
188 
189 /**
190  * \brief Network processing communication backend
191  *
192  * Attaches to the end of a stream, accepts work units and forwards
193  * them to the local scheduler. Can be used to create network processing nodes.
194  *
195  * \ingroup libcore
196  */
198  friend class RemoteProcess;
199  friend class RemoteWorker;
200  friend class RemoteWorkerReader;
201 public:
202  /**
203  * \brief Create a new stream backend
204  *
205  * \param name
206  * Name of the created thread
207  * \param scheduler
208  * Scheduler instance used to process work units
209  * \param nodeName
210  * Exposed name of this node
211  * \param stream
212  * Stream used for communications
213  * \param detach
214  * Should the associated thread be joinable or detach instead?
215  */
216  StreamBackend(const std::string &name, Scheduler *scheduler,
217  const std::string &nodeName, Stream *stream, bool detach);
218 
220 protected:
221  enum EMessage {
222  EUnknown = 0,
236  EHello = 0x1bcd
237  };
238 
239  /// Virtual destructor
240  virtual ~StreamBackend();
241  virtual void run();
242  void sendWorkResult(int id, const WorkResult *result, bool cancelled);
243  void sendCancellation(int id, int numLost);
244 private:
245  Scheduler *m_scheduler;
246  std::string m_nodeName;
247  ref<Stream> m_stream;
248  ref<MemoryStream> m_memStream;
249  std::map<int, RemoteProcess *> m_processes;
250  std::map<int, int> m_resources;
251  ref<Mutex> m_sendMutex;
252  bool m_detach;
253 };
254 
256 
257 #endif /* __MITSUBA_CORE_SCHED_REMOTE_H_ */
WorkUnit * getEmptyWorkUnit()
Get an empty work unit from the process (or create one)
Definition: sched_remote.h:150
Definition: sched_remote.h:226
void setDone()
Mark the process as finished.
Definition: sched_remote.h:170
virtual EStatus generateWork(WorkUnit *unit, int worker)=0
Generate a piece of work.
virtual void handleCancellation()
Called when the parallel process is canceled by Scheduler::cancel().
std::set< std::string > m_plugins
Definition: sched_remote.h:91
Network processing communication backend.
Definition: sched_remote.h:197
Definition: sched_remote.h:231
ref< MemoryStream > m_memStream
Definition: sched_remote.h:83
virtual void run()=0
The thread&#39;s run method.
virtual void processResult(const WorkResult *result, bool cancelled)=0
Called whenever a work unit has been completed.
Definition: sched_remote.h:235
EStatus
Return codes used by generateWork() and getReturnStatus()
Definition: sched.h:204
Parallel process facade used to insert work units from a remote scheduler into the local one...
Definition: sched_remote.h:129
#define MTS_EXPORT_CORE
Definition: getopt.h:29
Abstract parallelizable task.
Definition: sched.h:197
Definition: sched_remote.h:227
#define MTS_NAMESPACE_BEGIN
Definition: platform.h:137
std::set< int > m_processes
Definition: sched_remote.h:90
Simple RAII-style locking of a Mutex. On construction it locks the mutex and unlocks it on destructio...
Definition: lock.h:170
Abstract work result – represents the result of a processed WorkUnit instance.
Definition: sched.h:80
Definition: sched_remote.h:234
std::string m_nodeName
Definition: sched_remote.h:92
Definition: sched_remote.h:224
Abstract work unit – represents a small amount of information that encodes part of a larger processin...
Definition: sched.h:47
EMessage
Definition: sched_remote.h:221
Definition: sched_remote.h:232
ref< Mutex > m_mutex
Definition: sched_remote.h:81
Abstract seekable stream class.
Definition: stream.h:58
#define MTS_DECLARE_CLASS()
This macro must be used in the initial definition in classes that derive from Object.
Definition: class.h:158
Reference counting helper.
Definition: ref.h:40
std::set< int > m_resources
Definition: sched_remote.h:89
Communication helper thread required by RemoteWorker.
Definition: sched_remote.h:101
Cross-platform thread implementation.
Definition: thread.h:34
Acquires work from the scheduler and forwards it to a processing node reachable through a Stream...
Definition: sched_remote.h:50
ELogLevel
Available Log message types.
Definition: formatter.h:28
ref< Stream > m_stream
Definition: sched_remote.h:84
virtual ref< WorkProcessor > createWorkProcessor() const =0
Create an instance of the algorithm responsible for executing the work units of this parallel process...
void shutdown()
Definition: sched_remote.h:106
Definition: sched_remote.h:233
void putFullWorkUnit(WorkUnit *wu)
Make a full work unit available to the process.
Definition: sched_remote.h:164
const std::string & getNodeName() const
Return the name of the node on the other side.
Definition: sched_remote.h:60
Definition: sched_remote.h:230
size_t m_inFlight
Definition: sched_remote.h:93
Base class of all worker implementations.
Definition: sched.h:673
ref< ConditionVariable > m_finishCond
Definition: sched_remote.h:82
Definition: sched_remote.h:225
Centralized task scheduler implementation.
Definition: sched.h:351
Definition: sched_remote.h:229
Definition: sched_remote.h:228
#define MTS_NAMESPACE_END
Definition: platform.h:138
Thin wrapper around the recursive boost thread lock.
Definition: lock.h:34
Abstract work processor – takes work units and turns them into WorkResult instances.
Definition: sched.h:118
ref< RemoteWorkerReader > m_reader
Definition: sched_remote.h:85
Definition: sched_remote.h:223