source: branches/eraser6/Manager/RemoteExecutor.cs @ 928

Revision 928, 13.1 KB checked in by lowjoel, 6 years ago (diff)

Fundamentally redesigned the RemoteExecutor? classes to work with the new API.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.Text;
25using System.IO;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Serialization.Formatters.Binary;
32
33namespace Eraser.Manager
34{
35    /// <summary>
36    /// Represents a request to the RemoteExecutorServer instance
37    /// </summary>
38    [Serializable]
39    internal class RemoteExecutorRequest
40    {
41        /// <summary>
42        /// Constructor.
43        /// </summary>
44        /// <param name="func">The function this command is wanting to execute.</param>
45        /// <param name="data">The parameters for the command, serialised using a
46        /// BinaryFormatter</param>
47        public RemoteExecutorRequest(RemoteExecutorFunction func, params object[] data)
48        {
49            Func = func;
50            Data = data;
51        }
52
53        /// <summary>
54        /// The function that this request is meant to call.
55        /// </summary>
56        public RemoteExecutorFunction Func { get; set; }
57
58        /// <summary>
59        /// The parameters associated with the function call.
60        /// </summary>
61        public object[] Data { get; private set; }
62    };
63
64    /// <summary>
65    /// List of supported functions
66    /// </summary>
67    public enum RemoteExecutorFunction : uint
68    {
69        QueueTask,
70        ScheduleTask,
71        UnqueueTask,
72
73        AddTask,
74        DeleteTask,
75        //UpdateTask,
76        GetTaskCount,
77        GetTask
78    }
79
80    /// <summary>
81    /// The RemoteExecutorServer class is the server half required for remote execution
82    /// of tasks.
83    /// </summary>
84    public class RemoteExecutorServer : DirectExecutor
85    {
86        /// <summary>
87        /// Our Remote Server name, prevent collisions!
88        /// </summary>
89        public static readonly string ServerName =
90            "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor-" +
91            System.Security.Principal.WindowsIdentity.GetCurrent().User.ToString();
92
93        /// <summary>
94        /// Constructor.
95        /// </summary>
96        public RemoteExecutorServer()
97        {
98            thread = new Thread(Main);
99            serverLock = new Semaphore(maxServerInstances, maxServerInstances);
100
101            thread.Start();
102            Thread.Sleep(0);
103        }
104
105        protected override void Dispose(bool disposing)
106        {
107            thread.Abort();
108            base.Dispose(disposing);
109        }
110
111        /// <summary>
112        /// The polling loop dealing with every server connection.
113        /// </summary>
114        private void Main()
115        {
116            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
117            {
118                //Wait for a new server instance to be available.
119                if (!serverLock.WaitOne())
120                    continue;
121
122                //Otherwise, a new instance can be created. Create it and wait for connections.
123                NamedPipeServerStream server = new NamedPipeServerStream(ServerName,
124                    PipeDirection.InOut, maxServerInstances, PipeTransmissionMode.Message,
125                    PipeOptions.Asynchronous);
126                server.BeginWaitForConnection(EndWaitForConnection, server);
127            }
128        }
129
130        /// <summary>
131        /// Handles the arguments passed to the server and calls the real function.
132        /// </summary>
133        /// <param name="arguments">The arguments to the function.</param>
134        /// <returns>Te result of the function call.</returns>
135        private delegate object RequestHandler(object[] arguments);
136
137        /// <summary>
138        /// Waits for a connection from a client.
139        /// </summary>
140        /// <param name="result">The AsyncResult object associated with this asynchronous
141        /// operation.</param>
142        private void EndWaitForConnection(IAsyncResult result)
143        {
144            NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState;
145
146            try
147            {
148                //We're done waiting for the connection
149                server.EndWaitForConnection(result);
150
151                while (server.IsConnected)
152                {
153                    //Read the request into the buffer.
154                    RemoteExecutorRequest request = null;
155                    using (MemoryStream mstream = new MemoryStream())
156                    {
157                        byte[] buffer = new byte[65536];
158
159                        do
160                        {
161                            int lastRead = server.Read(buffer, 0, buffer.Length);
162                            mstream.Write(buffer, 0, lastRead);
163                        }
164                        while (!server.IsMessageComplete);
165
166                        //Ignore the request if the client disconnected from us.
167                        if (!server.IsConnected)
168                            return;
169
170                        //Deserialise the header of the request.
171                        mstream.Position = 0;
172                        try
173                        {
174                            request = (RemoteExecutorRequest)new BinaryFormatter().Deserialize(
175                                new MemoryStream(buffer));
176                        }
177                        catch (SerializationException)
178                        {
179                            //We got a unserialisation issue but we can't do anything about it.
180                            return;
181                        }
182                    }
183
184                    //Map the deserialisation function to a real function
185                    Dictionary<RemoteExecutorFunction, RequestHandler> functionMap =
186                        new Dictionary<RemoteExecutorFunction, RequestHandler>();
187                    functionMap.Add(RemoteExecutorFunction.QueueTask,
188                        delegate(object[] args) { QueueTask((Task)args[0]); return null; });
189                    functionMap.Add(RemoteExecutorFunction.ScheduleTask,
190                        delegate(object[] args) { ScheduleTask((Task)args[0]); return null; });
191                    functionMap.Add(RemoteExecutorFunction.UnqueueTask,
192                        delegate(object[] args) { UnqueueTask((Task)args[0]); return null; });
193
194                    functionMap.Add(RemoteExecutorFunction.AddTask,
195                        delegate(object[] args)
196                        {
197                            Tasks.Add((Task)args[0]);
198                            return null;
199                        });
200                    functionMap.Add(RemoteExecutorFunction.DeleteTask,
201                        delegate(object[] args)
202                        {
203                            Tasks.Remove((Task)args[0]);
204                            return null;
205                        });
206                    functionMap.Add(RemoteExecutorFunction.GetTaskCount,
207                        delegate(object[] args) { return Tasks.Count; });
208                    functionMap.Add(RemoteExecutorFunction.GetTask,
209                        delegate(object[] args) { return Tasks[(int)args[0]]; });
210
211                    //Execute the function
212                    object returnValue = functionMap[request.Func](request.Data);
213
214                    //Return the result of the invoked function.
215                    using (MemoryStream mStream = new MemoryStream())
216                    {
217                        if (returnValue != null)
218                        {
219                            byte[] header = BitConverter.GetBytes((Int32)1);
220                            byte[] buffer = null;
221                            new BinaryFormatter().Serialize(mStream, returnValue);
222
223                            server.Write(header, 0, header.Length);
224                            server.Write(buffer, 0, buffer.Length);
225                        }
226                        else
227                        {
228                            byte[] header = BitConverter.GetBytes((Int32)0);
229                            server.Write(header, 0, header.Length);
230                        }
231                    }
232
233                    server.WaitForPipeDrain();
234                }
235            }
236            finally
237            {
238                server.Close();
239                serverLock.Release();
240            }
241        }
242
243        /// <summary>
244        /// The thread which will answer pipe connections
245        /// </summary>
246        private Thread thread;
247
248        /// <summary>
249        /// Counts the number of available server instances.
250        /// </summary>
251        private Semaphore serverLock;
252
253        /// <summary>
254        /// The maximum number of server instances existing concurrently.
255        /// </summary>
256        private const int maxServerInstances = 4;
257    }
258
259    /// <summary>
260    /// The RemoteExecutorServer class is the client half required for remote execution
261    /// of tasks, sending requests to the server running on the local computer.
262    /// </summary>
263    public class RemoteExecutorClient : Executor
264    {
265        public RemoteExecutorClient()
266        {
267            client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
268                PipeDirection.InOut);
269            Tasks = new RemoteExecutorClientTasksCollection(this);
270        }
271
272        protected override void Dispose(bool disposing)
273        {
274            client.Close();
275            client.Dispose();
276            base.Dispose(disposing);
277        }
278
279        /// <summary>
280        /// Connects to the remote server.
281        /// </summary>
282        /// <returns>True if the connection to the remote server was established.</returns>
283        public bool Connect()
284        {
285            try
286            {
287                client.Connect(3000);
288            }
289            catch (TimeoutException)
290            {
291            }
292
293            return client.IsConnected;
294        }
295
296        public override void Run()
297        {
298        }
299
300        /// <summary>
301        /// Sends a request to the executor server.
302        /// </summary>
303        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
304        /// <param name="function">The requested operation.</param>
305        /// <param name="args">The arguments for the operation.</param>
306        /// <returns>The return result from the object as if it were executed locally.</returns>
307        internal ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
308        {
309            //Connect to the server
310            object result = null;
311
312            using (MemoryStream mStream = new MemoryStream())
313            {
314                //Serialise the request
315                new BinaryFormatter().Serialize(mStream, new RemoteExecutorRequest(function, args));
316
317                //Write the request to the pipe
318                byte[] buffer = mStream.ToArray();
319                client.Write(buffer, 0, buffer.Length);
320
321                //Read the response from the pipe
322                mStream.Position = 0;
323                buffer = new byte[65536];
324                client.ReadMode = PipeTransmissionMode.Message;
325                do
326                {
327                    int lastRead = client.Read(buffer, 0, buffer.Length);
328                    mStream.Write(buffer, 0, lastRead);
329                }
330                while (!client.IsMessageComplete);
331
332                //Check if the server says there is a response. If so, read it.
333                if (BitConverter.ToInt32(mStream.ToArray(), 0) == 1)
334                {
335                    mStream.Position = 0;
336                    do
337                    {
338                        int lastRead = client.Read(buffer, 0, buffer.Length);
339                        mStream.Write(buffer, 0, lastRead);
340                    }
341                    while (!client.IsMessageComplete);
342
343                    //Deserialise the response
344                    mStream.Position = 0;
345                    if (mStream.Length > 0)
346                        result = new BinaryFormatter().Deserialize(mStream);
347                }
348            }
349
350            return (ReturnType)result;
351        }
352
353        public override void QueueTask(Task task)
354        {
355            SendRequest<object>(RemoteExecutorFunction.QueueTask, task);
356        }
357
358        public override void ScheduleTask(Task task)
359        {
360            SendRequest<object>(RemoteExecutorFunction.ScheduleTask, task);
361        }
362
363        public override void UnqueueTask(Task task)
364        {
365            SendRequest<object>(RemoteExecutorFunction.UnqueueTask, task);
366        }
367
368        public override void QueueRestartTasks()
369        {
370            throw new NotImplementedException();
371        }
372
373        public override ExecutorTasksCollection Tasks { get; protected set; }
374
375        /// <summary>
376        /// The named pipe used to connect to another running instance of Eraser.
377        /// </summary>
378        private NamedPipeClientStream client;
379    }
380
381    public class RemoteExecutorClientTasksCollection : ExecutorTasksCollection
382    {
383        /// <summary>
384        /// Constructor.
385        /// </summary>
386        /// <param name="executor">The <see cref="RemoteExecutor"/> object owning
387        /// this list.</param>
388        public RemoteExecutorClientTasksCollection(RemoteExecutorClient executor)
389            : base(executor)
390        {
391        }
392
393        /// <summary>
394        /// Sends a request to the executor server.
395        /// </summary>
396        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
397        /// <param name="function">The requested operation.</param>
398        /// <param name="args">The arguments for the operation.</param>
399        /// <returns>The return result from the object as if it were executed locally.</returns>
400        private ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
401        {
402            RemoteExecutorClient client = (RemoteExecutorClient)Owner;
403            return client.SendRequest<ReturnType>(function, args);
404        }
405
406        #region IList<Task> Members
407        public override int IndexOf(Task item)
408        {
409            throw new NotSupportedException();
410        }
411
412        public override void Insert(int index, Task item)
413        {
414            throw new NotSupportedException();
415        }
416
417        public override void RemoveAt(int index)
418        {
419            throw new NotSupportedException();
420        }
421
422        public override Task this[int index]
423        {
424            get
425            {
426                return SendRequest<Task>(RemoteExecutorFunction.GetTask, index);
427            }
428            set
429            {
430                throw new NotSupportedException();
431            }
432        }
433        #endregion
434
435        #region ICollection<Task> Members
436        public override void Add(Task item)
437        {
438            item.Executor = Owner;
439            SendRequest<object>(RemoteExecutorFunction.AddTask, item);
440
441            //Call all the event handlers who registered to be notified of tasks
442            //being added.
443            Owner.OnTaskAdded(item);
444        }
445
446        public override void Clear()
447        {
448            throw new NotSupportedException();
449        }
450
451        public override bool Contains(Task item)
452        {
453            throw new NotSupportedException();
454        }
455
456        public override void CopyTo(Task[] array, int arrayIndex)
457        {
458            throw new NotSupportedException();
459        }
460
461        public override int Count
462        {
463            get { return SendRequest<int>(RemoteExecutorFunction.GetTaskCount); }
464        }
465
466        public override bool Remove(Task item)
467        {
468            item.Cancel();
469            item.Executor = null;
470            SendRequest<object>(RemoteExecutorFunction.DeleteTask, item);
471
472            //Call all event handlers registered to be notified of task deletions.
473            Owner.OnTaskDeleted(item);
474            return true;
475        }
476        #endregion
477
478        #region IEnumerable<Task> Members
479        public override IEnumerator<Task> GetEnumerator()
480        {
481            throw new NotSupportedException();
482        }
483        #endregion
484
485        public override void SaveToStream(Stream stream)
486        {
487            throw new NotSupportedException();
488        }
489
490        public override void LoadFromStream(Stream stream)
491        {
492            throw new NotSupportedException();
493        }
494    }
495}
Note: See TracBrowser for help on using the repository browser.