Changeset 928
- Timestamp:
- 5/1/2009 12:10:25 AM (4 years ago)
- File:
-
- 1 edited
-
branches/eraser6/Manager/RemoteExecutor.cs (modified) (9 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/eraser6/Manager/RemoteExecutor.cs
r927 r928 37 37 /// </summary> 38 38 [Serializable] 39 internal class Remote Request39 internal class RemoteExecutorRequest 40 40 { 41 /// <summary>42 /// List of supported functions43 /// </summary>44 public enum Function : uint45 {46 ADD_TASK = 0,47 GET_TASK,48 GET_TASKS,49 CANCEL_TASK,50 DELETE_TASK,51 QUEUE_TASK,52 REPLACE_TASK,53 SCHEDULE_TASK,54 SAVE_TASK_LIST,55 LOAD_TASK_LIST,56 QUEUE_RESTART_TASK,57 }58 59 41 /// <summary> 60 42 /// Constructor. … … 63 45 /// <param name="data">The parameters for the command, serialised using a 64 46 /// BinaryFormatter</param> 65 public Remote Request(Function func, byte[] data)47 public RemoteExecutorRequest(RemoteExecutorFunction func, params object[] data) 66 48 { 67 49 Func = func; … … 72 54 /// The function that this request is meant to call. 73 55 /// </summary> 74 public Function Func;56 public RemoteExecutorFunction Func { get; set; } 75 57 76 58 /// <summary> 77 59 /// The parameters associated with the function call. 78 60 /// </summary> 79 public byte[] Data;61 public object[] Data { get; private set; } 80 62 }; 63 64 /// <summary> 65 /// List of supported functions 66 /// </summary> 67 public enum RemoteExecutorFunction : uint 68 { 69 QueueTask, 70 ScheduleTask, 71 UnqueueTask, 72 73 AddTask, 74 DeleteTask, 75 //UpdateTask, 76 GetTaskCount, 77 GetTask 78 } 81 79 82 80 /// <summary> … … 94 92 95 93 /// <summary> 96 /// The thread which will answer pipe connections97 /// </summary>98 private Thread thread;99 100 /// <summary>101 94 /// Constructor. 102 95 /// </summary> … … 104 97 { 105 98 thread = new Thread(Main); 99 serverLock = new Semaphore(maxServerInstances, maxServerInstances); 100 106 101 thread.Start(); 107 102 Thread.Sleep(0); … … 121 116 while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested) 122 117 { 123 //Wait for a connection to be established 118 //Wait for a new server instance to be available. 119 if (!serverLock.WaitOne()) 120 continue; 121 122 //Otherwise, a new instance can be created. Create it and wait for connections. 124 123 NamedPipeServerStream server = new NamedPipeServerStream(ServerName, 125 PipeDirection.InOut, 4, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); 126 IAsyncResult asyncWait = server.BeginWaitForConnection( 127 EndWaitForConnection, server); 128 129 //Wait for a connection before moving on to create another listening server 130 asyncWait.AsyncWaitHandle.WaitOne(); 131 } 132 } 124 PipeDirection.InOut, maxServerInstances, PipeTransmissionMode.Message, 125 PipeOptions.Asynchronous); 126 server.BeginWaitForConnection(EndWaitForConnection, server); 127 } 128 } 129 130 /// <summary> 131 /// Handles the arguments passed to the server and calls the real function. 132 /// </summary> 133 /// <param name="arguments">The arguments to the function.</param> 134 /// <returns>Te result of the function call.</returns> 135 private delegate object RequestHandler(object[] arguments); 133 136 134 137 /// <summary> … … 139 142 private void EndWaitForConnection(IAsyncResult result) 140 143 { 141 using (NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState) 144 NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState; 145 146 try 142 147 { 143 148 //We're done waiting for the connection 144 149 server.EndWaitForConnection(result); 145 150 146 //Read the request into the buffer. 147 RemoteRequest request = null; 148 using (MemoryStream mstream = new MemoryStream()) 151 while (server.IsConnected) 149 152 { 150 byte[] buffer = new byte[65536]; 151 server.Read(buffer, 0, sizeof(int)); 152 int messageSize = BitConverter.ToInt32(buffer, 0); 153 while (messageSize > 0) 153 //Read the request into the buffer. 154 RemoteExecutorRequest request = null; 155 using (MemoryStream mstream = new MemoryStream()) 154 156 { 155 int lastRead = server.Read(buffer, 0, Math.Min(messageSize, buffer.Length)); 156 messageSize -= lastRead; 157 mstream.Write(buffer, 0, lastRead); 157 byte[] buffer = new byte[65536]; 158 159 do 160 { 161 int lastRead = server.Read(buffer, 0, buffer.Length); 162 mstream.Write(buffer, 0, lastRead); 163 } 164 while (!server.IsMessageComplete); 165 166 //Ignore the request if the client disconnected from us. 167 if (!server.IsConnected) 168 return; 169 170 //Deserialise the header of the request. 171 mstream.Position = 0; 172 try 173 { 174 request = (RemoteExecutorRequest)new BinaryFormatter().Deserialize( 175 new MemoryStream(buffer)); 176 } 177 catch (SerializationException) 178 { 179 //We got a unserialisation issue but we can't do anything about it. 180 return; 181 } 158 182 } 159 183 160 //Ignore the request if the client disconnected from us. 161 if (!server.IsConnected) 162 return; 163 164 //Deserialise the header of the request. 165 mstream.Position = 0; 166 try 167 { 168 request = (RemoteRequest)new BinaryFormatter().Deserialize( 169 new MemoryStream(buffer)); 170 } 171 catch (SerializationException) 172 { 173 //We got a unserialisation issue but we can't do anything about it. 174 return; 175 } 176 } 177 178 #region Deserialise 179 object parameter = null; 180 switch (request.Func) 181 { 182 // void \+ task 183 case RemoteRequest.Function.CANCEL_TASK: 184 case RemoteRequest.Function.QUEUE_TASK: 185 case RemoteRequest.Function.REPLACE_TASK: 186 case RemoteRequest.Function.SCHEDULE_TASK: 187 case RemoteRequest.Function.ADD_TASK: 188 using (MemoryStream mStream = new MemoryStream(request.Data)) 189 parameter = new BinaryFormatter().Deserialize(mStream); 190 break; 191 192 // bool \+ taskid 193 case RemoteRequest.Function.DELETE_TASK: 194 // task \+ taskid 195 case RemoteRequest.Function.GET_TASK: 196 using (MemoryStream mStream = new MemoryStream(request.Data)) 197 parameter = new BinaryFormatter().Deserialize(mStream); 198 break; 199 200 // void \+ stream 201 case RemoteRequest.Function.LOAD_TASK_LIST: 202 case RemoteRequest.Function.SAVE_TASK_LIST: 203 using (MemoryStream mStream = new MemoryStream(request.Data)) 204 parameter = new BinaryFormatter().Deserialize(mStream); 205 break; 206 207 // list<task> \+ void 208 case RemoteRequest.Function.GET_TASKS: 209 // void \+ void 210 case RemoteRequest.Function.QUEUE_RESTART_TASK: 211 break; 212 213 default: 214 throw new FatalException("Unknown RemoteExecutorClient.Function"); 215 } 216 #endregion 217 218 #region Invoke 219 object returnValue = null; 220 switch (request.Func) 221 { 222 // void \+ task 223 case RemoteRequest.Function.CANCEL_TASK: 224 CancelTask((Task)parameter); 225 break; 226 227 // void \+ task 228 case RemoteRequest.Function.QUEUE_TASK: 229 QueueTask((Task)parameter); 230 break; 231 232 // void \+ task 233 case RemoteRequest.Function.REPLACE_TASK: 234 ReplaceTask((Task)parameter); 235 break; 236 237 // void \+ task 238 case RemoteRequest.Function.SCHEDULE_TASK: 239 ScheduleTask((Task)parameter); 240 break; 241 242 // void \+ ref task 243 case RemoteRequest.Function.ADD_TASK: 244 { 245 Task task = (Task)parameter; 246 AddTask(task); 247 break; 248 } 249 250 // bool \+ taskid 251 case RemoteRequest.Function.DELETE_TASK: 252 returnValue = DeleteTask((uint)parameter); 253 break; 254 255 // task \+ taskid 256 case RemoteRequest.Function.GET_TASK: 257 returnValue = GetTask((uint)parameter); 258 break; 259 260 // void \+ stream 261 case RemoteRequest.Function.LOAD_TASK_LIST: 262 LoadTaskList((Stream)parameter); 263 break; 264 265 // void \+ stream 266 case RemoteRequest.Function.SAVE_TASK_LIST: 267 SaveTaskList((Stream)parameter); 268 break; 269 270 // list<task> \+ void 271 case RemoteRequest.Function.GET_TASKS: 272 returnValue = GetTasks(); 273 break; 274 275 // void \+ void 276 case RemoteRequest.Function.QUEUE_RESTART_TASK: 277 QueueRestartTasks(); 278 break; 279 280 default: 281 throw new FatalException("Unknown RemoteExecutorClient.Function"); 282 } 283 #endregion 284 285 //Return the result of the invoked function, if any. 286 if (returnValue != null) 184 //Map the deserialisation function to a real function 185 Dictionary<RemoteExecutorFunction, RequestHandler> functionMap = 186 new Dictionary<RemoteExecutorFunction, RequestHandler>(); 187 functionMap.Add(RemoteExecutorFunction.QueueTask, 188 delegate(object[] args) { QueueTask((Task)args[0]); return null; }); 189 functionMap.Add(RemoteExecutorFunction.ScheduleTask, 190 delegate(object[] args) { ScheduleTask((Task)args[0]); return null; }); 191 functionMap.Add(RemoteExecutorFunction.UnqueueTask, 192 delegate(object[] args) { UnqueueTask((Task)args[0]); return null; }); 193 194 functionMap.Add(RemoteExecutorFunction.AddTask, 195 delegate(object[] args) 196 { 197 Tasks.Add((Task)args[0]); 198 return null; 199 }); 200 functionMap.Add(RemoteExecutorFunction.DeleteTask, 201 delegate(object[] args) 202 { 203 Tasks.Remove((Task)args[0]); 204 return null; 205 }); 206 functionMap.Add(RemoteExecutorFunction.GetTaskCount, 207 delegate(object[] args) { return Tasks.Count; }); 208 functionMap.Add(RemoteExecutorFunction.GetTask, 209 delegate(object[] args) { return Tasks[(int)args[0]]; }); 210 211 //Execute the function 212 object returnValue = functionMap[request.Func](request.Data); 213 214 //Return the result of the invoked function. 287 215 using (MemoryStream mStream = new MemoryStream()) 288 216 { 289 new BinaryFormatter().Serialize(mStream, returnValue); 290 byte[] buffer = mStream.ToArray(); 291 byte[] bufferLength = BitConverter.GetBytes(buffer.Length); 292 server.Write(bufferLength, 0, sizeof(int)); 293 server.Write(buffer, 0, buffer.Length); 217 if (returnValue != null) 218 { 219 byte[] header = BitConverter.GetBytes((Int32)1); 220 byte[] buffer = null; 221 new BinaryFormatter().Serialize(mStream, returnValue); 222 223 server.Write(header, 0, header.Length); 224 server.Write(buffer, 0, buffer.Length); 225 } 226 else 227 { 228 byte[] header = BitConverter.GetBytes((Int32)0); 229 server.Write(header, 0, header.Length); 230 } 294 231 } 295 else 296 { 297 byte[] buffer = BitConverter.GetBytes(0); 298 server.Write(buffer, 0, sizeof(int)); 232 233 server.WaitForPipeDrain(); 299 234 } 300 235 } 301 } 236 finally 237 { 238 server.Close(); 239 serverLock.Release(); 240 } 241 } 242 243 /// <summary> 244 /// The thread which will answer pipe connections 245 /// </summary> 246 private Thread thread; 247 248 /// <summary> 249 /// Counts the number of available server instances. 250 /// </summary> 251 private Semaphore serverLock; 252 253 /// <summary> 254 /// The maximum number of server instances existing concurrently. 255 /// </summary> 256 private const int maxServerInstances = 4; 302 257 } 303 258 259 /// <summary> 260 /// The RemoteExecutorServer class is the client half required for remote execution 261 /// of tasks, sending requests to the server running on the local computer. 262 /// </summary> 304 263 public class RemoteExecutorClient : Executor 305 264 { 306 private NamedPipeClientStream client;307 308 265 public RemoteExecutorClient() 309 266 { 310 267 client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName, 311 268 PipeDirection.InOut); 269 Tasks = new RemoteExecutorClientTasksCollection(this); 312 270 } 313 271 … … 336 294 } 337 295 338 private object SendRequest(RemoteRequest header) 296 public override void Run() 297 { 298 } 299 300 /// <summary> 301 /// Sends a request to the executor server. 302 /// </summary> 303 /// <typeparam name="ReturnType">The expected return type of the request.</typeparam> 304 /// <param name="function">The requested operation.</param> 305 /// <param name="args">The arguments for the operation.</param> 306 /// <returns>The return result from the object as if it were executed locally.</returns> 307 internal ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args) 339 308 { 340 309 //Connect to the server … … 344 313 { 345 314 //Serialise the request 346 new BinaryFormatter().Serialize(mStream, header);315 new BinaryFormatter().Serialize(mStream, new RemoteExecutorRequest(function, args)); 347 316 348 317 //Write the request to the pipe 349 318 byte[] buffer = mStream.ToArray(); 350 byte[] bufferLength = BitConverter.GetBytes(buffer.Length);351 client.Write(bufferLength, 0, sizeof(int));352 319 client.Write(buffer, 0, buffer.Length); 353 320 354 321 //Read the response from the pipe 355 322 mStream.Position = 0; 356 buffer = new byte[32768]; 357 client.Read(buffer, 0, sizeof(int)); 358 int responseLength = BitConverter.ToInt32(buffer, 0); 359 while (responseLength > 0) 360 responseLength -= client.Read(buffer, 0, Math.Min(buffer.Length, responseLength)); 361 362 //Deserialise the response 363 mStream.Position = 0; 364 if (mStream.Length > 0) 365 result = new BinaryFormatter().Deserialize(mStream); 366 } 367 368 return result; 369 } 370 371 public override bool DeleteTask(uint taskId) 372 { 373 MemoryStream mStream = new MemoryStream(); 374 new BinaryFormatter().Serialize(mStream, taskId); 375 return (bool)SendRequest(new RemoteRequest(RemoteRequest.Function.DELETE_TASK, 376 mStream.GetBuffer())); 377 } 378 379 public override ICollection<Task> GetTasks() 380 { 381 MemoryStream mStream = new MemoryStream(); 382 new BinaryFormatter().Serialize(mStream, null); 383 return (List<Task>)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASKS, 384 mStream.GetBuffer())); 385 } 386 387 public override Task GetTask(uint taskId) 388 { 389 MemoryStream mStream = new MemoryStream(); 390 new BinaryFormatter().Serialize(mStream, taskId); 391 return (Task)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASK, 392 mStream.GetBuffer())); 393 } 394 395 public override void LoadTaskList(Stream stream) 396 { 397 MemoryStream mStream = new MemoryStream(); 398 new BinaryFormatter().Serialize(mStream, stream); 399 SendRequest(new RemoteRequest(RemoteRequest.Function.LOAD_TASK_LIST, 400 mStream.GetBuffer())); 401 } 402 403 public override void AddTask(Task task) 404 { 405 MemoryStream mStream = new MemoryStream(); 406 new BinaryFormatter().Serialize(mStream, task); 407 SendRequest(new RemoteRequest(RemoteRequest.Function.ADD_TASK, 408 mStream.GetBuffer())); 409 } 410 411 public override void CancelTask(Task task) 412 { 413 MemoryStream mStream = new MemoryStream(); 414 new BinaryFormatter().Serialize(mStream, task); 415 SendRequest(new RemoteRequest(RemoteRequest.Function.CANCEL_TASK, 416 mStream.GetBuffer())); 323 buffer = new byte[65536]; 324 client.ReadMode = PipeTransmissionMode.Message; 325 do 326 { 327 int lastRead = client.Read(buffer, 0, buffer.Length); 328 mStream.Write(buffer, 0, lastRead); 329 } 330 while (!client.IsMessageComplete); 331 332 //Check if the server says there is a response. If so, read it. 333 if (BitConverter.ToInt32(mStream.ToArray(), 0) == 1) 334 { 335 mStream.Position = 0; 336 do 337 { 338 int lastRead = client.Read(buffer, 0, buffer.Length); 339 mStream.Write(buffer, 0, lastRead); 340 } 341 while (!client.IsMessageComplete); 342 343 //Deserialise the response 344 mStream.Position = 0; 345 if (mStream.Length > 0) 346 result = new BinaryFormatter().Deserialize(mStream); 347 } 348 } 349 350 return (ReturnType)result; 351 } 352 353 public override void QueueTask(Task task) 354 { 355 SendRequest<object>(RemoteExecutorFunction.QueueTask, task); 356 } 357 358 public override void ScheduleTask(Task task) 359 { 360 SendRequest<object>(RemoteExecutorFunction.ScheduleTask, task); 361 } 362 363 public override void UnqueueTask(Task task) 364 { 365 SendRequest<object>(RemoteExecutorFunction.UnqueueTask, task); 417 366 } 418 367 419 368 public override void QueueRestartTasks() 420 369 { 421 MemoryStream mStream = new MemoryStream(); 422 new BinaryFormatter().Serialize(mStream, null); 423 SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_RESTART_TASK, 424 mStream.GetBuffer())); 425 } 426 427 public override void QueueTask(Task task) 428 { 429 MemoryStream mStream = new MemoryStream(); 430 new BinaryFormatter().Serialize(mStream, task); 431 SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_TASK, 432 mStream.GetBuffer())); 433 } 434 435 public override void ReplaceTask(Task task) 436 { 437 MemoryStream mStream = new MemoryStream(); 438 new BinaryFormatter().Serialize(mStream, task); 439 SendRequest(new RemoteRequest(RemoteRequest.Function.REPLACE_TASK, 440 mStream.GetBuffer())); 441 } 442 443 public override void Run() 444 { 445 } 446 447 public override void ScheduleTask(Task task) 448 { 449 MemoryStream mStream = new MemoryStream(); 450 new BinaryFormatter().Serialize(mStream, task); 451 SendRequest(new RemoteRequest(RemoteRequest.Function.SCHEDULE_TASK, 452 mStream.GetBuffer())); 453 } 454 455 public override void SaveTaskList(Stream stream) 456 { 457 MemoryStream mStream = new MemoryStream(); 458 new BinaryFormatter().Serialize(mStream, stream); 459 SendRequest(new RemoteRequest(RemoteRequest.Function.SAVE_TASK_LIST, 460 mStream.GetBuffer())); 370 throw new NotImplementedException(); 371 } 372 373 public override ExecutorTasksCollection Tasks { get; protected set; } 374 375 /// <summary> 376 /// The named pipe used to connect to another running instance of Eraser. 377 /// </summary> 378 private NamedPipeClientStream client; 379 } 380 381 public class RemoteExecutorClientTasksCollection : ExecutorTasksCollection 382 { 383 /// <summary> 384 /// Constructor. 385 /// </summary> 386 /// <param name="executor">The <see cref="RemoteExecutor"/> object owning 387 /// this list.</param> 388 public RemoteExecutorClientTasksCollection(RemoteExecutorClient executor) 389 : base(executor) 390 { 391 } 392 393 /// <summary> 394 /// Sends a request to the executor server. 395 /// </summary> 396 /// <typeparam name="ReturnType">The expected return type of the request.</typeparam> 397 /// <param name="function">The requested operation.</param> 398 /// <param name="args">The arguments for the operation.</param> 399 /// <returns>The return result from the object as if it were executed locally.</returns> 400 private ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args) 401 { 402 RemoteExecutorClient client = (RemoteExecutorClient)Owner; 403 return client.SendRequest<ReturnType>(function, args); 404 } 405 406 #region IList<Task> Members 407 public override int IndexOf(Task item) 408 { 409 throw new NotSupportedException(); 410 } 411 412 public override void Insert(int index, Task item) 413 { 414 throw new NotSupportedException(); 415 } 416 417 public override void RemoveAt(int index) 418 { 419 throw new NotSupportedException(); 420 } 421 422 public override Task this[int index] 423 { 424 get 425 { 426 return SendRequest<Task>(RemoteExecutorFunction.GetTask, index); 427 } 428 set 429 { 430 throw new NotSupportedException(); 431 } 432 } 433 #endregion 434 435 #region ICollection<Task> Members 436 public override void Add(Task item) 437 { 438 item.Executor = Owner; 439 SendRequest<object>(RemoteExecutorFunction.AddTask, item); 440 441 //Call all the event handlers who registered to be notified of tasks 442 //being added. 443 Owner.OnTaskAdded(item); 444 } 445 446 public override void Clear() 447 { 448 throw new NotSupportedException(); 449 } 450 451 public override bool Contains(Task item) 452 { 453 throw new NotSupportedException(); 454 } 455 456 public override void CopyTo(Task[] array, int arrayIndex) 457 { 458 throw new NotSupportedException(); 459 } 460 461 public override int Count 462 { 463 get { return SendRequest<int>(RemoteExecutorFunction.GetTaskCount); } 464 } 465 466 public override bool Remove(Task item) 467 { 468 item.Cancel(); 469 item.Executor = null; 470 SendRequest<object>(RemoteExecutorFunction.DeleteTask, item); 471 472 //Call all event handlers registered to be notified of task deletions. 473 Owner.OnTaskDeleted(item); 474 return true; 475 } 476 #endregion 477 478 #region IEnumerable<Task> Members 479 public override IEnumerator<Task> GetEnumerator() 480 { 481 throw new NotSupportedException(); 482 } 483 #endregion 484 485 public override void SaveToStream(Stream stream) 486 { 487 throw new NotSupportedException(); 488 } 489 490 public override void LoadFromStream(Stream stream) 491 { 492 throw new NotSupportedException(); 461 493 } 462 494 }
Note: See TracChangeset
for help on using the changeset viewer.
