source: branches/eraser6/Manager/RemoteExecutor.cs @ 944

Revision 944, 13.5 KB checked in by lowjoel, 6 years ago (diff)

Fixed a few documentation inconsistencies and changed the event handler delegate types such that all event handlers use the EventArgs?<> generic.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.Text;
25using System.IO;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Serialization.Formatters.Binary;
32
33namespace Eraser.Manager
34{
35    /// <summary>
36    /// Represents a request to the RemoteExecutorServer instance
37    /// </summary>
38    [Serializable]
39    internal class RemoteExecutorRequest
40    {
41        /// <summary>
42        /// Constructor.
43        /// </summary>
44        /// <param name="func">The function this command is wanting to execute.</param>
45        /// <param name="data">The parameters for the command, serialised using a
46        /// BinaryFormatter</param>
47        public RemoteExecutorRequest(RemoteExecutorFunction func, params object[] data)
48        {
49            Func = func;
50            Data = data;
51        }
52
53        /// <summary>
54        /// The function that this request is meant to call.
55        /// </summary>
56        public RemoteExecutorFunction Func { get; set; }
57
58        /// <summary>
59        /// The parameters associated with the function call.
60        /// </summary>
61        public object[] Data { get; private set; }
62    };
63
64    /// <summary>
65    /// List of supported functions
66    /// </summary>
67    public enum RemoteExecutorFunction
68    {
69        QueueTask,
70        ScheduleTask,
71        UnqueueTask,
72
73        AddTask,
74        DeleteTask,
75        //UpdateTask,
76        GetTaskCount,
77        GetTask
78    }
79
80    /// <summary>
81    /// The RemoteExecutorServer class is the server half required for remote execution
82    /// of tasks.
83    /// </summary>
84    public class RemoteExecutorServer : DirectExecutor
85    {
86        /// <summary>
87        /// Our Remote Server name, prevent collisions!
88        /// </summary>
89        public static readonly string ServerName =
90            "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor-" +
91            System.Security.Principal.WindowsIdentity.GetCurrent().User.ToString();
92
93        /// <summary>
94        /// Constructor.
95        /// </summary>
96        public RemoteExecutorServer()
97        {
98            thread = new Thread(Main);
99            serverLock = new Semaphore(maxServerInstances, maxServerInstances);
100
101            thread.Start();
102            Thread.Sleep(0);
103        }
104
105        protected override void Dispose(bool disposing)
106        {
107            if (disposing)
108            {
109                //Close the polling thread that creates new server instances
110                thread.Abort();
111
112                //Acquire all available locks to ensure no more server instances exist,
113                //then destroy the semaphore
114                for (int i = 0; i < maxServerInstances; ++i)
115                    serverLock.WaitOne();
116                serverLock.Close();
117
118                base.Dispose(disposing);
119            }
120        }
121
122        /// <summary>
123        /// The polling loop dealing with every server connection.
124        /// </summary>
125        private void Main()
126        {
127            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
128            {
129                //Wait for a new server instance to be available.
130                if (!serverLock.WaitOne())
131                    continue;
132
133                //Otherwise, a new instance can be created. Create it and wait for connections.
134                NamedPipeServerStream server = new NamedPipeServerStream(ServerName,
135                    PipeDirection.InOut, maxServerInstances, PipeTransmissionMode.Message,
136                    PipeOptions.Asynchronous);
137                server.BeginWaitForConnection(EndWaitForConnection, server);
138            }
139        }
140
141        /// <summary>
142        /// Handles the arguments passed to the server and calls the real function.
143        /// </summary>
144        /// <param name="arguments">The arguments to the function.</param>
145        /// <returns>Te result of the function call.</returns>
146        private delegate object RequestHandler(object[] arguments);
147
148        /// <summary>
149        /// Waits for a connection from a client.
150        /// </summary>
151        /// <param name="result">The AsyncResult object associated with this asynchronous
152        /// operation.</param>
153        private void EndWaitForConnection(IAsyncResult result)
154        {
155            NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState;
156
157            try
158            {
159                //We're done waiting for the connection
160                server.EndWaitForConnection(result);
161
162                while (server.IsConnected)
163                {
164                    //Read the request into the buffer.
165                    RemoteExecutorRequest request = null;
166                    using (MemoryStream mstream = new MemoryStream())
167                    {
168                        byte[] buffer = new byte[65536];
169
170                        do
171                        {
172                            int lastRead = server.Read(buffer, 0, buffer.Length);
173                            mstream.Write(buffer, 0, lastRead);
174                        }
175                        while (!server.IsMessageComplete);
176
177                        //Ignore the request if the client disconnected from us.
178                        if (!server.IsConnected)
179                            return;
180
181                        //Deserialise the header of the request.
182                        mstream.Position = 0;
183                        try
184                        {
185                            request = (RemoteExecutorRequest)new BinaryFormatter().Deserialize(
186                                new MemoryStream(buffer));
187                        }
188                        catch (SerializationException)
189                        {
190                            //We got a unserialisation issue but we can't do anything about it.
191                            return;
192                        }
193                    }
194
195                    //Map the deserialisation function to a real function
196                    Dictionary<RemoteExecutorFunction, RequestHandler> functionMap =
197                        new Dictionary<RemoteExecutorFunction, RequestHandler>();
198                    functionMap.Add(RemoteExecutorFunction.QueueTask,
199                        delegate(object[] args) { QueueTask((Task)args[0]); return null; });
200                    functionMap.Add(RemoteExecutorFunction.ScheduleTask,
201                        delegate(object[] args) { ScheduleTask((Task)args[0]); return null; });
202                    functionMap.Add(RemoteExecutorFunction.UnqueueTask,
203                        delegate(object[] args) { UnqueueTask((Task)args[0]); return null; });
204
205                    functionMap.Add(RemoteExecutorFunction.AddTask,
206                        delegate(object[] args)
207                        {
208                            Tasks.Add((Task)args[0]);
209                            return null;
210                        });
211                    functionMap.Add(RemoteExecutorFunction.DeleteTask,
212                        delegate(object[] args)
213                        {
214                            Tasks.Remove((Task)args[0]);
215                            return null;
216                        });
217                    functionMap.Add(RemoteExecutorFunction.GetTaskCount,
218                        delegate(object[] args) { return Tasks.Count; });
219                    functionMap.Add(RemoteExecutorFunction.GetTask,
220                        delegate(object[] args) { return Tasks[(int)args[0]]; });
221
222                    //Execute the function
223                    object returnValue = functionMap[request.Func](request.Data);
224
225                    //Return the result of the invoked function.
226                    using (MemoryStream mStream = new MemoryStream())
227                    {
228                        if (returnValue != null)
229                        {
230                            byte[] header = BitConverter.GetBytes((Int32)1);
231                            byte[] buffer = null;
232                            new BinaryFormatter().Serialize(mStream, returnValue);
233
234                            server.Write(header, 0, header.Length);
235                            server.Write(buffer, 0, buffer.Length);
236                        }
237                        else
238                        {
239                            byte[] header = BitConverter.GetBytes((Int32)0);
240                            server.Write(header, 0, header.Length);
241                        }
242                    }
243
244                    server.WaitForPipeDrain();
245                }
246            }
247            finally
248            {
249                server.Close();
250                serverLock.Release();
251            }
252        }
253
254        /// <summary>
255        /// The thread which will answer pipe connections
256        /// </summary>
257        private Thread thread;
258
259        /// <summary>
260        /// Counts the number of available server instances.
261        /// </summary>
262        private Semaphore serverLock;
263
264        /// <summary>
265        /// The maximum number of server instances existing concurrently.
266        /// </summary>
267        private const int maxServerInstances = 4;
268    }
269
270    /// <summary>
271    /// The RemoteExecutorServer class is the client half required for remote execution
272    /// of tasks, sending requests to the server running on the local computer.
273    /// </summary>
274    public class RemoteExecutorClient : Executor
275    {
276        public RemoteExecutorClient()
277        {
278            client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
279                PipeDirection.InOut);
280            Tasks = new RemoteExecutorClientTasksCollection(this);
281        }
282
283        protected override void Dispose(bool disposing)
284        {
285            if (disposing)
286            {
287                client.Close();
288                client.Dispose();
289            }
290
291            base.Dispose(disposing);
292        }
293
294        /// <summary>
295        /// Connects to the remote server.
296        /// </summary>
297        /// <returns>True if the connection to the remote server was established.</returns>
298        public bool Connect()
299        {
300            try
301            {
302                client.Connect(3000);
303            }
304            catch (TimeoutException)
305            {
306            }
307
308            return client.IsConnected;
309        }
310
311        public override void Run()
312        {
313        }
314
315        /// <summary>
316        /// Sends a request to the executor server.
317        /// </summary>
318        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
319        /// <param name="function">The requested operation.</param>
320        /// <param name="args">The arguments for the operation.</param>
321        /// <returns>The return result from the object as if it were executed locally.</returns>
322        internal ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
323        {
324            //Connect to the server
325            object result = null;
326
327            using (MemoryStream mStream = new MemoryStream())
328            {
329                //Serialise the request
330                new BinaryFormatter().Serialize(mStream, new RemoteExecutorRequest(function, args));
331
332                //Write the request to the pipe
333                byte[] buffer = mStream.ToArray();
334                client.Write(buffer, 0, buffer.Length);
335
336                //Read the response from the pipe
337                mStream.Position = 0;
338                buffer = new byte[65536];
339                client.ReadMode = PipeTransmissionMode.Message;
340                do
341                {
342                    int lastRead = client.Read(buffer, 0, buffer.Length);
343                    mStream.Write(buffer, 0, lastRead);
344                }
345                while (!client.IsMessageComplete);
346
347                //Check if the server says there is a response. If so, read it.
348                if (BitConverter.ToInt32(mStream.ToArray(), 0) == 1)
349                {
350                    mStream.Position = 0;
351                    do
352                    {
353                        int lastRead = client.Read(buffer, 0, buffer.Length);
354                        mStream.Write(buffer, 0, lastRead);
355                    }
356                    while (!client.IsMessageComplete);
357
358                    //Deserialise the response
359                    mStream.Position = 0;
360                    if (mStream.Length > 0)
361                        result = new BinaryFormatter().Deserialize(mStream);
362                }
363            }
364
365            return (ReturnType)result;
366        }
367
368        public override void QueueTask(Task task)
369        {
370            SendRequest<object>(RemoteExecutorFunction.QueueTask, task);
371        }
372
373        public override void ScheduleTask(Task task)
374        {
375            SendRequest<object>(RemoteExecutorFunction.ScheduleTask, task);
376        }
377
378        public override void UnqueueTask(Task task)
379        {
380            SendRequest<object>(RemoteExecutorFunction.UnqueueTask, task);
381        }
382
383        public override void QueueRestartTasks()
384        {
385            throw new NotImplementedException();
386        }
387
388        public override ExecutorTasksCollection Tasks { get; protected set; }
389
390        /// <summary>
391        /// The named pipe used to connect to another running instance of Eraser.
392        /// </summary>
393        private NamedPipeClientStream client;
394    }
395
396    public class RemoteExecutorClientTasksCollection : ExecutorTasksCollection
397    {
398        /// <summary>
399        /// Constructor.
400        /// </summary>
401        /// <param name="executor">The <see cref="RemoteExecutor"/> object owning
402        /// this list.</param>
403        public RemoteExecutorClientTasksCollection(RemoteExecutorClient executor)
404            : base(executor)
405        {
406        }
407
408        /// <summary>
409        /// Sends a request to the executor server.
410        /// </summary>
411        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
412        /// <param name="function">The requested operation.</param>
413        /// <param name="args">The arguments for the operation.</param>
414        /// <returns>The return result from the object as if it were executed locally.</returns>
415        private ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
416        {
417            RemoteExecutorClient client = (RemoteExecutorClient)Owner;
418            return client.SendRequest<ReturnType>(function, args);
419        }
420
421        #region IList<Task> Members
422        public override int IndexOf(Task item)
423        {
424            throw new NotSupportedException();
425        }
426
427        public override void Insert(int index, Task item)
428        {
429            throw new NotSupportedException();
430        }
431
432        public override void RemoveAt(int index)
433        {
434            throw new NotSupportedException();
435        }
436
437        public override Task this[int index]
438        {
439            get
440            {
441                return SendRequest<Task>(RemoteExecutorFunction.GetTask, index);
442            }
443            set
444            {
445                throw new NotSupportedException();
446            }
447        }
448        #endregion
449
450        #region ICollection<Task> Members
451        public override void Add(Task item)
452        {
453            item.Executor = Owner;
454            SendRequest<object>(RemoteExecutorFunction.AddTask, item);
455
456            //Call all the event handlers who registered to be notified of tasks
457            //being added.
458            Owner.OnTaskAdded(new TaskEventArgs(item));
459        }
460
461        public override void Clear()
462        {
463            throw new NotSupportedException();
464        }
465
466        public override bool Contains(Task item)
467        {
468            throw new NotSupportedException();
469        }
470
471        public override void CopyTo(Task[] array, int arrayIndex)
472        {
473            throw new NotSupportedException();
474        }
475
476        public override int Count
477        {
478            get { return SendRequest<int>(RemoteExecutorFunction.GetTaskCount); }
479        }
480
481        public override bool Remove(Task item)
482        {
483            item.Cancel();
484            item.Executor = null;
485            SendRequest<object>(RemoteExecutorFunction.DeleteTask, item);
486
487            //Call all event handlers registered to be notified of task deletions.
488            Owner.OnTaskDeleted(new TaskEventArgs(item));
489            return true;
490        }
491        #endregion
492
493        #region IEnumerable<Task> Members
494        public override IEnumerator<Task> GetEnumerator()
495        {
496            throw new NotSupportedException();
497        }
498        #endregion
499
500        public override void SaveToStream(Stream stream)
501        {
502            throw new NotSupportedException();
503        }
504
505        public override void LoadFromStream(Stream stream)
506        {
507            throw new NotSupportedException();
508        }
509    }
510}
Note: See TracBrowser for help on using the repository browser.