GridComputing
Job Management in Grid Computing
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
machine.h
Go to the documentation of this file.
1 #ifndef MACHINE_H_
2 #define MACHINE_H_
3 
4 #include <unordered_set>
5 #include <string>
6 #include <queue>
7 #include <map>
8 #include <set>
9 #include <stack>
10 #include <vector>
11 #include <mutex>
12 #include <iostream>
13 #include <stdexcept>
14 #include <iomanip>
15 #include <numeric>
16 
17 #include "utils.h"
18 #include "interfaces.h"
19 #include "software.h"
20 #include "menu.h"
21 #include "job.h"
22 #include "log.h"
23 
24 typedef std::unordered_set<Software, Software::Hash> SoftwareSet;
26 
29 class _Machine
30 {
31 public:
32  static uint GetLastJobId() { return _lastJobId; }
33  static void SetLastJobId(uint val) { _lastJobId = val; }
34 
35 protected:
36  _Machine() { }
37  static uint _lastJobId;
38 };
39 
41 typedef std::vector<const Job*> JobVector;
42 
44 
47 template <class Container>
48 class IMachine : public ISave, public IUpdate, public IPrint, public _Machine
49 {
50 public:
52  IMachine(uint id) : _id(id) { }
53 
54  typedef std::set<Job*, IdLess<Job>> JobSet;
55 
57  IMachine(const std::string& machineName, uint maxJobs, double totalRAM, double totalDiskSpace) :
58  _id(0), _name(machineName), _maxJobs(maxJobs), _totalRAM(totalRAM), _totalDiskSpace(totalDiskSpace)
59  {
60  if (_name.length() > _maxNameLength)
61  _maxNameLength = _name.length();
62  }
63 
64  ~IMachine() { }
65 
66  // Name function members
67  const std::string& GetName() const { return _name; }
68  void SetName(const std::string& name);
69 
70  // Jobs function members
71  uint GetMaxJobs() const { return _maxJobs; }
72  void SetMaxJobs(uint val);
73 
74  // RAM function members
75  double GetAvailableRAM() const { return _totalRAM - _inUseRAM; }
76  double GetInUseRAM() const { return _inUseRAM; }
77  double GetTotalRAM() const { return _totalRAM; }
78  void SetTotalRAM(double val);
79 
80  // Disk Space function members
82  double GetInUseDiskSpace() const { return _inUseDiskSpace; }
83  double GetTotalDiskSpace() const { return _totalDiskSpace; }
84  void SetTotalDiskSpace(double val);
85 
86  // Id function members
87  uint GetId() const { return _id; }
88  void SetId(uint id) { _id = id; }
89 
90  // Software function members
91  void AddAvailableSoftware(const Software& sw) { _availableSoftware.insert(sw); }
92  void RemoveAvailableSoftware(const Software& sw) { _availableSoftware.erase(sw); }
94 
95  JobVector GetJobs() const;
96  bool AddJob(Job* job);
97  const Job* GetJob(uint id) const;
98  bool RemoveJob(uint id);
99  void RemoveAllJobs();
100  bool ChangeJobPriority(uint id, uint8 newPriority);
101  uint GetNumberOfCurrentJobs() const { return _currentJobs.size(); }
102 
103  bool Save(ByteBuffer& bb) const override;
104  void Update(uint diff) override;
105  void Print(std::ostream& os = std::cout) const override;
106 
107  // Static function members
108  static void PrintHeader(std::ostream& os = std::cout);
109  static Menu* GetMenu() { return _menu; }
110  static IMachine<Container>* Load(ByteBuffer& bb);
111 
112 protected:
113  bool SoftwareMeetsRequirements(const Software& sw) const { return _availableSoftware.find(sw) != _availableSoftware.end(); }
114 
115  double _totalRAM;
116  double _inUseRAM;
117 
120 
122 
124 
125  std::string _name;
127 
128  mutable std::mutex _mutex;
129 
130  static Menu* _menu;
132 
133  Container _currentJobs;
134 private: // no copying
135  IMachine(const IMachine&);
136  IMachine& operator =(IMachine const&);
137 };
138 
139 template <class Container>
141 {
142  _mutex.lock();
143  Container temp;
144 
145  bool found = false;
146 
147  for( ; !_currentJobs.empty() && !found; _currentJobs.pop())
148  {
149  if (_currentJobs.top() && _currentJobs.top()->GetId() == id)
150  {
151  found = true;
152 
153  Job* j = _currentJobs.top();
154  _currentJobs.pop();
155  j->SetPriority(newPriority);
156  _currentJobs.push(j);
157  }
158  else
159  temp.push(_currentJobs.top());
160  }
161 
162  while (!temp.empty())
163  {
164  _currentJobs.push(temp.top());
165  temp.pop();
166  }
167 
168  _mutex.unlock();
169  return found;
170 }
171 
172 
174 class MachineInExecution : public std::exception
175 {
176 public:
178  MachineInExecution(const uint mId) : exception("Machine in execution"), _machineId(mId) { }
179  uint GetMachineId() const { return _machineId; }
180 private:
182 };
183 
184 template <class Container> uint IMachine<Container>::_maxNameLength = 0;
185 template <class Container> Menu* IMachine<Container>::_menu = Loader<Menu>("machineMenu.txt").Load();
186 
187 template <class T>
188 class Queue : public std::queue<T>
189 {
190 public:
191  Queue() : std::queue<T>() { }
192  const value_type& top() const { return front(); }
193 };
194 
197 
198 template <class Container>
199 void IMachine<Container>::SetName(const std::string& name)
200 {
201  if (name.size() != 0)
202  {
203  _name = name;
204  if (_name.length() > _maxNameLength)
205  _maxNameLength = _name.length();
206  }
207 }
208 
209 template <class Container>
211 {
212  if (val < _currentJobs.size()) // If the value is smaller than the current number of jobs we can't modify this parameter.
213  throw MachineInExecution(_id);
214 
215  _maxJobs = val;
216 }
217 
218 template <class Container>
220 {
221  if (val < GetInUseRAM()) // If the value is smaller than the current needed RAM we can't modify this parameter.
222  throw MachineInExecution(_id);
223 
224  _totalRAM = val;
225 }
226 
227 template <class Container>
229 {
230  if (val < GetInUseDiskSpace()) // If the value is smaller than the current needed Disk Space we can't modify this parameter.
231  throw MachineInExecution(_id);
232 
233  _totalDiskSpace = val;
234 }
235 
236 template <class Container>
238 {
239  JobVector res;
240 
241  _mutex.lock();
242  Container temp(_currentJobs);
243  _mutex.unlock();
244 
245  while (!temp.empty())
246  {
247  res.push_back(temp.top());
248  temp.pop();
249  }
250  return res;
251 }
252 
253 template <class Container>
255 {
256  _mutex.lock();
257 
258  if (_currentJobs.size() >= _maxJobs)
259  {
260  _mutex.unlock();
261  return false;
262  }
263 
264  if (job->GetRequiredRAM() > GetAvailableRAM())
265  {
266  _mutex.unlock();
267  return false;
268  }
269 
270  if (job->GetRequiredDiskSpace() > GetAvailableDiskSpace())
271  {
272  _mutex.unlock();
273  return false;
274  }
275 
276  const SoftwareSet& requiredSoftware = job->GetRequiredSoftware();
277  for (const auto& it : requiredSoftware)
278  if (!SoftwareMeetsRequirements(it))
279  {
280  _mutex.unlock();
281  return false;
282  }
283 
284  job->SetId(_lastJobId);
285  _currentJobs.push(job);
286  _lastJobId++;
287  _inUseRAM += job->GetRequiredRAM();
288  _inUseDiskSpace += job->GetRequiredDiskSpace();
289 
290  _mutex.unlock();
291 
292  return true;
293 }
294 
295 template <class Container>
297 {
298  JobVector vec = GetJobs();
299 
300  auto it = std::find_if(vec.begin(), vec.end(), [id](const Job* j) { return j->GetId() == id; });
301 
302  if (it == vec.end())
303  return nullptr;
304 
305  return *it;
306 }
307 
308 template <class Container>
310 {
311  _mutex.lock();
312  Container temp(_currentJobs);
313  _mutex.unlock();
314 
315  bool found = false;
316 
317  for( ; !temp.empty(); temp.pop())
318  {
319  if (temp.top() && temp.top()->GetId() == id)
320  {
321  found = true;
322 
323  _mutex.lock();
324  temp.top()->Finish();
325  _mutex.unlock();
326  }
327  }
328 
329  return found;
330 }
331 
332 template <class Container>
334 {
335  _mutex.lock();
336 
337  for(Container temp(_currentJobs); !temp.empty(); temp.pop())
338  if (temp.top())
339  temp.top()->Finish();
340 
341  _mutex.unlock();
342 }
343 
344 template <class Container>
345 inline bool IMachine<Container>::Save(ByteBuffer& bb) const
346 {
347  bb.WriteUInt32(_id);
348  bb.WriteString(_name);
349  bb.WriteUInt32(_maxJobs);
350  bb.WriteDouble(_totalRAM);
351  bb.WriteDouble(_totalDiskSpace);
352 
353  bb.WriteUInt32(_currentJobs.size());
354  for (Container temp(_currentJobs); !temp.empty(); temp.pop())
355  temp.top()->Save(bb);
356 
357  bb.WriteUInt32(_availableSoftware.size());
358  for (auto sw : _availableSoftware)
359  sw.Save(bb);
360 
361  return true;
362 }
363 
364 template <class Container>
366 {
367  _mutex.lock();
368  Container temp;
369  for (; !_currentJobs.empty(); _currentJobs.pop())
370  {
371  _currentJobs.top()->Update(diff);
372 
373  if (_currentJobs.top()->Finished())
374  {
375  sLog(Console)->Log("Job %s removed from machine %s.", _currentJobs.top()->GetName().c_str(), _name.c_str());
376  Container::value_type tempJob = _currentJobs.top();
377  _inUseRAM -= tempJob->GetRequiredRAM();
378  _inUseDiskSpace -= tempJob->GetRequiredDiskSpace();
379  delete tempJob;
380  }
381  else
382  temp.push(_currentJobs.top());
383  }
384 
385  while (!temp.empty())
386  {
387  _currentJobs.push(temp.top());
388  temp.pop();
389  }
390 
391  _mutex.unlock();
392 }
393 
394 template <class Container>
395 inline void IMachine<Container>::Print(std::ostream& os = std::cout) const
396 {
397  os << "| " << std::setfill('0') << std::setw(4) << std::right << _id << " | "
398  << std::setfill(' ') << std::setw(_maxNameLength) << std::left << _name
399 
400  << " | " << std::setw(5) << std::left << GetAvailableRAM() << " / " << std::right << std::setw(5) << _totalRAM
401  << " | " << std::setw(5) << std::left << GetAvailableDiskSpace() << " / " << std::right << std::setw(5) << _totalDiskSpace
402  << " | " << std::setw(4) << std::left << _currentJobs.size() << " / " << std::right << std::setw(4) << _maxJobs << " |\n"
403  << "---------" << std::string(_maxNameLength, '-') << "------------------------------------------------\n";
404 }
405 
406 template <class Container>
407 void IMachine<Container>::PrintHeader(std::ostream& os = std::cout)
408 {
409  os << "---------" << std::string(_maxNameLength, '-') << "------------------------------------------------\n"
410  << "| Id | " << std::setw(_maxNameLength) << "Name" << " | RAM (MB) | Disk (MB) | Jobs |\n"
411  << "---------" << std::string(_maxNameLength, '-') << "------------------------------------------------\n";
412 }
413 
414 template <class Container>
416 {
417  uint32 id = bb.ReadUInt32();
418  std::string name = bb.ReadString();
419  uint32 maxJobs = bb.ReadUInt32();
420  double totalRAM = bb.ReadDouble();
421  double totalDiskSpace = bb.ReadDouble();
422 
423  IMachine* m = new IMachine<Container>(name, maxJobs, totalRAM, totalDiskSpace);
424  m->_id = id;
425 
426  uint32 jobCount = bb.ReadUInt32();
427  for (uint32 i = 0; i < jobCount; ++i)
428  {
429  Job* job = Job::Load(bb);
430  m->_currentJobs.push(job);
431  m->_inUseRAM += job->GetRequiredRAM();
433  }
434 
435  uint32 softwareCount = bb.ReadUInt32();
436  for (uint32 i = 0; i < softwareCount; ++i)
437  m->_availableSoftware.insert(Software::Load(bb));
438 
439  return m;
440 }
441 
442 #endif // MACHINE_H_
443