source: branches/eraser6/Manager/RemoteExecutor.cs @ 971

Revision 971, 13.5 KB checked in by lowjoel, 5 years ago (diff)

Catch the OperationCanceledException? since on XP this is raised during program shutdown (and is harmless in this context)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.Text;
25using System.IO;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Serialization.Formatters.Binary;
32
33namespace Eraser.Manager
34{
35    /// <summary>
36    /// Represents a request to the RemoteExecutorServer instance
37    /// </summary>
38    [Serializable]
39    internal class RemoteExecutorRequest
40    {
41        /// <summary>
42        /// Constructor.
43        /// </summary>
44        /// <param name="func">The function this command is wanting to execute.</param>
45        /// <param name="data">The parameters for the command, serialised using a
46        /// BinaryFormatter</param>
47        public RemoteExecutorRequest(RemoteExecutorFunction func, params object[] data)
48        {
49            Func = func;
50            Data = data;
51        }
52
53        /// <summary>
54        /// The function that this request is meant to call.
55        /// </summary>
56        public RemoteExecutorFunction Func { get; set; }
57
58        /// <summary>
59        /// The parameters associated with the function call.
60        /// </summary>
61        public object[] Data { get; private set; }
62    };
63
64    /// <summary>
65    /// List of supported functions
66    /// </summary>
67    public enum RemoteExecutorFunction
68    {
69        QueueTask,
70        ScheduleTask,
71        UnqueueTask,
72
73        AddTask,
74        DeleteTask,
75        //UpdateTask,
76        GetTaskCount,
77        GetTask
78    }
79
80    /// <summary>
81    /// The RemoteExecutorServer class is the server half required for remote execution
82    /// of tasks.
83    /// </summary>
84    public class RemoteExecutorServer : DirectExecutor
85    {
86        /// <summary>
87        /// Our Remote Server name, prevent collisions!
88        /// </summary>
89        public static readonly string ServerName =
90            "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor-" +
91            System.Security.Principal.WindowsIdentity.GetCurrent().User.ToString();
92
93        /// <summary>
94        /// Constructor.
95        /// </summary>
96        public RemoteExecutorServer()
97        {
98            thread = new Thread(Main);
99            serverLock = new Semaphore(maxServerInstances, maxServerInstances);
100
101            thread.Start();
102            Thread.Sleep(0);
103        }
104
105        protected override void Dispose(bool disposing)
106        {
107            if (disposing)
108            {
109                //Close the polling thread that creates new server instances
110                thread.Abort();
111
112                //Acquire all available locks to ensure no more server instances exist,
113                //then destroy the semaphore
114                for (int i = 0; i < maxServerInstances; ++i)
115                    serverLock.WaitOne();
116                serverLock.Close();
117
118                base.Dispose(disposing);
119            }
120        }
121
122        /// <summary>
123        /// The polling loop dealing with every server connection.
124        /// </summary>
125        private void Main()
126        {
127            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
128            {
129                //Wait for a new server instance to be available.
130                if (!serverLock.WaitOne())
131                    continue;
132
133                //Otherwise, a new instance can be created. Create it and wait for connections.
134                NamedPipeServerStream server = new NamedPipeServerStream(ServerName,
135                    PipeDirection.InOut, maxServerInstances, PipeTransmissionMode.Message,
136                    PipeOptions.Asynchronous);
137                server.BeginWaitForConnection(EndWaitForConnection, server);
138            }
139        }
140
141        /// <summary>
142        /// Handles the arguments passed to the server and calls the real function.
143        /// </summary>
144        /// <param name="arguments">The arguments to the function.</param>
145        /// <returns>Te result of the function call.</returns>
146        private delegate object RequestHandler(object[] arguments);
147
148        /// <summary>
149        /// Waits for a connection from a client.
150        /// </summary>
151        /// <param name="result">The AsyncResult object associated with this asynchronous
152        /// operation.</param>
153        private void EndWaitForConnection(IAsyncResult result)
154        {
155            NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState;
156
157            try
158            {
159                //We're done waiting for the connection
160                server.EndWaitForConnection(result);
161
162                while (server.IsConnected)
163                {
164                    //Read the request into the buffer.
165                    RemoteExecutorRequest request = null;
166                    using (MemoryStream mstream = new MemoryStream())
167                    {
168                        byte[] buffer = new byte[65536];
169
170                        do
171                        {
172                            int lastRead = server.Read(buffer, 0, buffer.Length);
173                            mstream.Write(buffer, 0, lastRead);
174                        }
175                        while (!server.IsMessageComplete);
176
177                        //Ignore the request if the client disconnected from us.
178                        if (!server.IsConnected)
179                            return;
180
181                        //Deserialise the header of the request.
182                        mstream.Position = 0;
183                        try
184                        {
185                            request = (RemoteExecutorRequest)new BinaryFormatter().Deserialize(
186                                new MemoryStream(buffer));
187                        }
188                        catch (SerializationException)
189                        {
190                            //We got a unserialisation issue but we can't do anything about it.
191                            return;
192                        }
193                    }
194
195                    //Map the deserialisation function to a real function
196                    Dictionary<RemoteExecutorFunction, RequestHandler> functionMap =
197                        new Dictionary<RemoteExecutorFunction, RequestHandler>();
198                    functionMap.Add(RemoteExecutorFunction.QueueTask,
199                        delegate(object[] args) { QueueTask((Task)args[0]); return null; });
200                    functionMap.Add(RemoteExecutorFunction.ScheduleTask,
201                        delegate(object[] args) { ScheduleTask((Task)args[0]); return null; });
202                    functionMap.Add(RemoteExecutorFunction.UnqueueTask,
203                        delegate(object[] args) { UnqueueTask((Task)args[0]); return null; });
204
205                    functionMap.Add(RemoteExecutorFunction.AddTask,
206                        delegate(object[] args)
207                        {
208                            Tasks.Add((Task)args[0]);
209                            return null;
210                        });
211                    functionMap.Add(RemoteExecutorFunction.DeleteTask,
212                        delegate(object[] args)
213                        {
214                            Tasks.Remove((Task)args[0]);
215                            return null;
216                        });
217                    functionMap.Add(RemoteExecutorFunction.GetTaskCount,
218                        delegate(object[] args) { return Tasks.Count; });
219                    functionMap.Add(RemoteExecutorFunction.GetTask,
220                        delegate(object[] args) { return Tasks[(int)args[0]]; });
221
222                    //Execute the function
223                    object returnValue = functionMap[request.Func](request.Data);
224
225                    //Return the result of the invoked function.
226                    using (MemoryStream mStream = new MemoryStream())
227                    {
228                        if (returnValue != null)
229                        {
230                            byte[] header = BitConverter.GetBytes((Int32)1);
231                            byte[] buffer = null;
232                            new BinaryFormatter().Serialize(mStream, returnValue);
233
234                            server.Write(header, 0, header.Length);
235                            server.Write(buffer, 0, buffer.Length);
236                        }
237                        else
238                        {
239                            byte[] header = BitConverter.GetBytes((Int32)0);
240                            server.Write(header, 0, header.Length);
241                        }
242                    }
243
244                    server.WaitForPipeDrain();
245                }
246            }
247            catch (OperationCanceledException)
248            {
249            }
250            finally
251            {
252                server.Close();
253                serverLock.Release();
254            }
255        }
256
257        /// <summary>
258        /// The thread which will answer pipe connections
259        /// </summary>
260        private Thread thread;
261
262        /// <summary>
263        /// Counts the number of available server instances.
264        /// </summary>
265        private Semaphore serverLock;
266
267        /// <summary>
268        /// The maximum number of server instances existing concurrently.
269        /// </summary>
270        private const int maxServerInstances = 4;
271    }
272
273    /// <summary>
274    /// The RemoteExecutorServer class is the client half required for remote execution
275    /// of tasks, sending requests to the server running on the local computer.
276    /// </summary>
277    public class RemoteExecutorClient : Executor
278    {
279        public RemoteExecutorClient()
280        {
281            client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
282                PipeDirection.InOut);
283            Tasks = new RemoteExecutorClientTasksCollection(this);
284        }
285
286        protected override void Dispose(bool disposing)
287        {
288            if (disposing)
289            {
290                client.Close();
291                client.Dispose();
292            }
293
294            base.Dispose(disposing);
295        }
296
297        /// <summary>
298        /// Connects to the remote server.
299        /// </summary>
300        /// <returns>True if the connection to the remote server was established.</returns>
301        public bool Connect()
302        {
303            try
304            {
305                client.Connect(3000);
306            }
307            catch (TimeoutException)
308            {
309            }
310
311            return client.IsConnected;
312        }
313
314        public override void Run()
315        {
316        }
317
318        /// <summary>
319        /// Sends a request to the executor server.
320        /// </summary>
321        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
322        /// <param name="function">The requested operation.</param>
323        /// <param name="args">The arguments for the operation.</param>
324        /// <returns>The return result from the object as if it were executed locally.</returns>
325        internal ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
326        {
327            //Connect to the server
328            object result = null;
329
330            using (MemoryStream mStream = new MemoryStream())
331            {
332                //Serialise the request
333                new BinaryFormatter().Serialize(mStream, new RemoteExecutorRequest(function, args));
334
335                //Write the request to the pipe
336                byte[] buffer = mStream.ToArray();
337                client.Write(buffer, 0, buffer.Length);
338
339                //Read the response from the pipe
340                mStream.Position = 0;
341                buffer = new byte[65536];
342                client.ReadMode = PipeTransmissionMode.Message;
343                do
344                {
345                    int lastRead = client.Read(buffer, 0, buffer.Length);
346                    mStream.Write(buffer, 0, lastRead);
347                }
348                while (!client.IsMessageComplete);
349
350                //Check if the server says there is a response. If so, read it.
351                if (BitConverter.ToInt32(mStream.ToArray(), 0) == 1)
352                {
353                    mStream.Position = 0;
354                    do
355                    {
356                        int lastRead = client.Read(buffer, 0, buffer.Length);
357                        mStream.Write(buffer, 0, lastRead);
358                    }
359                    while (!client.IsMessageComplete);
360
361                    //Deserialise the response
362                    mStream.Position = 0;
363                    if (mStream.Length > 0)
364                        result = new BinaryFormatter().Deserialize(mStream);
365                }
366            }
367
368            return (ReturnType)result;
369        }
370
371        public override void QueueTask(Task task)
372        {
373            SendRequest<object>(RemoteExecutorFunction.QueueTask, task);
374        }
375
376        public override void ScheduleTask(Task task)
377        {
378            SendRequest<object>(RemoteExecutorFunction.ScheduleTask, task);
379        }
380
381        public override void UnqueueTask(Task task)
382        {
383            SendRequest<object>(RemoteExecutorFunction.UnqueueTask, task);
384        }
385
386        public override void QueueRestartTasks()
387        {
388            throw new NotImplementedException();
389        }
390
391        public override ExecutorTasksCollection Tasks { get; protected set; }
392
393        /// <summary>
394        /// The named pipe used to connect to another running instance of Eraser.
395        /// </summary>
396        private NamedPipeClientStream client;
397    }
398
399    public class RemoteExecutorClientTasksCollection : ExecutorTasksCollection
400    {
401        /// <summary>
402        /// Constructor.
403        /// </summary>
404        /// <param name="executor">The <see cref="RemoteExecutor"/> object owning
405        /// this list.</param>
406        public RemoteExecutorClientTasksCollection(RemoteExecutorClient executor)
407            : base(executor)
408        {
409        }
410
411        /// <summary>
412        /// Sends a request to the executor server.
413        /// </summary>
414        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
415        /// <param name="function">The requested operation.</param>
416        /// <param name="args">The arguments for the operation.</param>
417        /// <returns>The return result from the object as if it were executed locally.</returns>
418        private ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
419        {
420            RemoteExecutorClient client = (RemoteExecutorClient)Owner;
421            return client.SendRequest<ReturnType>(function, args);
422        }
423
424        #region IList<Task> Members
425        public override int IndexOf(Task item)
426        {
427            throw new NotSupportedException();
428        }
429
430        public override void Insert(int index, Task item)
431        {
432            throw new NotSupportedException();
433        }
434
435        public override void RemoveAt(int index)
436        {
437            throw new NotSupportedException();
438        }
439
440        public override Task this[int index]
441        {
442            get
443            {
444                return SendRequest<Task>(RemoteExecutorFunction.GetTask, index);
445            }
446            set
447            {
448                throw new NotSupportedException();
449            }
450        }
451        #endregion
452
453        #region ICollection<Task> Members
454        public override void Add(Task item)
455        {
456            item.Executor = Owner;
457            SendRequest<object>(RemoteExecutorFunction.AddTask, item);
458
459            //Call all the event handlers who registered to be notified of tasks
460            //being added.
461            Owner.OnTaskAdded(new TaskEventArgs(item));
462        }
463
464        public override void Clear()
465        {
466            throw new NotSupportedException();
467        }
468
469        public override bool Contains(Task item)
470        {
471            throw new NotSupportedException();
472        }
473
474        public override void CopyTo(Task[] array, int arrayIndex)
475        {
476            throw new NotSupportedException();
477        }
478
479        public override int Count
480        {
481            get { return SendRequest<int>(RemoteExecutorFunction.GetTaskCount); }
482        }
483
484        public override bool Remove(Task item)
485        {
486            item.Cancel();
487            item.Executor = null;
488            SendRequest<object>(RemoteExecutorFunction.DeleteTask, item);
489
490            //Call all event handlers registered to be notified of task deletions.
491            Owner.OnTaskDeleted(new TaskEventArgs(item));
492            return true;
493        }
494        #endregion
495
496        #region IEnumerable<Task> Members
497        public override IEnumerator<Task> GetEnumerator()
498        {
499            throw new NotSupportedException();
500        }
501        #endregion
502
503        public override void SaveToStream(Stream stream)
504        {
505            throw new NotSupportedException();
506        }
507
508        public override void LoadFromStream(Stream stream)
509        {
510            throw new NotSupportedException();
511        }
512    }
513}
Note: See TracBrowser for help on using the repository browser.