dmlite  0.6
TaskExec.h
Go to the documentation of this file.
1 /*
2  * Copyright 2015 CERN
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 #ifndef DMLITE_TASKEXEC_H
18 #define DMLITE_TASKEXEC_H
19 
20 /** @file dmTaskExec.h
21  * @brief A class that spawns commands that perform actions
22  * @author Fabrizio Furano
23  * @date Dec 2015
24  */
25 
26 
27 #include <boost/thread.hpp>
28 #include <signal.h>
29 #include <vector>
30 #include <string>
31 #include <algorithm>
32 #include <sstream>
33 #include <iterator>
34 #include <iostream>
36 
37 namespace dmlite {
38  class dmTaskExec;
39 
40  class dmTask: public boost::mutex {
41 
42  protected:
43  /// Threads waiting for result about this task will wait and synchronize here
44  /// using something like
45  /// boost::lock_guard< boost::mutex > l(workmutex);
46  ///
47  boost::condition_variable condvar;
48  public:
49  dmTask(dmTaskExec *wheretolog);
50  dmTask(const dmTask &o) {
51  key = o.key;
52  cmd = o.cmd;
53  for(unsigned int i = 0; i < 64; i++) parms[i] = NULL;
55  starttime = o.starttime;
56  endtime = o.endtime;
57  running = o.running;
58  finished = o.finished;
59  fd[0] = 0; fd[1] = 0; fd[2] = 0;
60  this->stdout = o.stdout;
61  this->loggerinst = o.loggerinst;
62  }
63 
64  ~dmTask();
65  int key;
66 
67  std::string cmd;
68  const char *parms[64];
69 
71 
72  time_t starttime, endtime;
73  bool running;
74  bool finished;
75 
76  int fd[3];
77  pid_t pid;
78  std::string stdout;
79 
80  /// Split che command string into the single parms
81  void splitCmd();
82 
83  /// Wait until the task has finished or the timeout is expired
84  int waitFinished(int tmout=5);
85 
86  void notifyAll() {
87  condvar.notify_all();
88  }
89 
91  };
92 
93 
94  /// Allows to spawn commands, useful for checksum calculations or file pulling
95  /// The spawned commands are pollable, i.e. in a given moment it's possible to
96  /// know the list of commands that are still running.
97  /// Objects belonging to this class in general are created in the disk nodes,
98  /// e.g. for running checksums or file copies and pulls
99  class dmTaskExec: public boost::recursive_mutex {
100 
101  public:
102  dmTaskExec();
103  ~dmTaskExec();
104  std::string instance;
105  /// Executes a command. Returns a positive integer as a key to reference
106  /// the execution status and the result
107  /// The mechanics is that a detached thread is started. This guy invokes popen3
108  /// and blocks waiting for the process to end. Upon end it updates the corresponding
109  /// instance of dmTask with the result and the stdout
110  int submitCmd(std::string cmd);
111 
112 
113  /// Executes a command. Returns a positive integer as a key to reference
114  // the execution status and the result
115  // The mechanics is that a detached thread is started. This guy invokes popen3
116  // and blocks waiting for the process to end. Upon end it updates the corresponding
117  // instance of dmTask with the result and the stdout
118  // -1 is returned in case of error in the submission
119  int submitCmd(std::vector<std::string> &args);
120 
121  /// Actually starts the thread corresponding to a command that was just submitted
122  /// Avoids race conditions
123  void goCmd(int id);
124 
125  /// Split che command string into the single parms
126  void assignCmd(dmTask *task, std::vector<std::string> &args);
127 
128  /// Get the results of a task.
129  /// Wait at max tmout seconds until the task finishes
130  /// Return 0 if the task has finished and there is a result
131  /// Return nonzero if the task is still running
132  int waitResult(int taskID, int tmout=5);
133 
134  //kill a specific task given the id
135  int killTask(int taskID);
136 
137  //get a dmTask given the id ( mainly for testing)
138  dmTask* getTask(int taskID);
139 
140  //get the current stdout of a task which may be running
141  int getTaskStdout(int taskID, std::string &stdout);
142 
143  /// Loops over all the tasks and:
144  /// - send a notification to the head node about all the processes that are running or that have finished
145  /// - garbage collect the task list.
146  /// - Task that are finished since long (e.g. 1 hour)
147  /// - Tasks that are stuck (e.g. 1 day)
148  void tick();
149 
150  int getTaskCounters(int &tot, int &running, int &finished);
151 
152 
153  /// Event invoked internally to log stuff
154  virtual void onLoggingRequest(Logger::Level lvl, std::string const & msg) = 0;
155  /// Event invoked internally to log stuff
156  virtual void onErrLoggingRequest(std::string const & msg) = 0;
157 
158  protected:
159 
160  /// event for immediate notifications when a task finishes
161  /// Subclasses can specialize this and apply app-dependent behavior to
162  /// perform actions when something has finished running
163  /// NOTE the signature. This passes copies of Task objects, not the originals
164  virtual void onTaskCompleted(dmTask &task);
165 
166  // event that notifies that a task is running
167  // This event can be invoked multiple times during the life of a task
168  /// NOTE the signature. This passes copies of Task objects, not the originals
169  virtual void onTaskRunning(dmTask &task);
170 
171 
172  private:
173 
174  int popen3(int fd[3], pid_t *pid, const char ** argv );
175 
176  /// Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big number
177  int taskcnt;
178  /// This map works like a sparse array :-)
179  std::map<int, dmTask*> tasks;
180 
181 
182  /// Here we invoke popen3
183  /// and block waiting for the process to end. Upon end it updates the corresponding
184  /// instance of dmTask with the result and the stdout
185  virtual void run(int key);
186 
187  //kill a specific task
188  int killTask(dmTask *task);
189  };
190 
191 
192 
193 }
194 
195 
196 
197 
198 
199 
200 
201 
202 
203 
204 
205 
206 
207 #endif
208 
std::map< int, dmTask * > tasks
This map works like a sparse array :-)
Definition: TaskExec.h:179
virtual void run(int key)
bool running
Definition: TaskExec.h:73
void splitCmd()
Split che command string into the single parms.
int key
Definition: TaskExec.h:65
Definition: TaskExec.h:40
int getTaskCounters(int &tot, int &running, int &finished)
dmTaskExec * loggerinst
Definition: TaskExec.h:90
Definition: TaskExec.h:99
virtual void onLoggingRequest(Logger::Level lvl, std::string const &msg)=0
Event invoked internally to log stuff.
int popen3(int fd[3], pid_t *pid, const char **argv)
int waitResult(int taskID, int tmout=5)
const char * parms[64]
Definition: TaskExec.h:68
bool finished
Definition: TaskExec.h:74
dmTask(dmTaskExec *wheretolog)
int waitFinished(int tmout=5)
Wait until the task has finished or the timeout is expired.
void goCmd(int id)
void assignCmd(dmTask *task, std::vector< std::string > &args)
Split che command string into the single parms.
int submitCmd(std::string cmd)
time_t starttime
Definition: TaskExec.h:72
int fd[3]
Definition: TaskExec.h:76
int taskcnt
Used to create keys to be inserted into the map. This has to be treated modulo MAXINT or similar big ...
Definition: TaskExec.h:177
void notifyAll()
Definition: TaskExec.h:86
boost::condition_variable condvar
Definition: TaskExec.h:47
std::string stdout
Definition: TaskExec.h:78
std::string instance
Definition: TaskExec.h:104
std::string cmd
Definition: TaskExec.h:67
int getTaskStdout(int taskID, std::string &stdout)
time_t endtime
Definition: TaskExec.h:72
pid_t pid
Definition: TaskExec.h:77
virtual void onTaskCompleted(dmTask &task)
int resultcode
Definition: TaskExec.h:70
virtual void onTaskRunning(dmTask &task)
NOTE the signature. This passes copies of Task objects, not the originals.
dmTask(const dmTask &o)
Definition: TaskExec.h:50
int killTask(int taskID)
Level
Definition: logger.h:88
virtual void onErrLoggingRequest(std::string const &msg)=0
Event invoked internally to log stuff.
dmTask * getTask(int taskID)