source: branches/eraser6/6.0/Eraser.Manager/RemoteExecutor.cs @ 1719

Revision 1719, 14.5 KB checked in by lowjoel, 5 years ago (diff)

Backported changes from trunk.

r1718: Author: lowjoel: When we have problems starting Eraser from the shell extension check the return code for ERROR_ELEVATION_REQUIRED; if we get that, then we should re-run the operation as an administrator.
r1717: Author: lowjoel: If the directory we are deleting does not exist, we should just return -- there's nothing to be deleted.
r1716: Author: lowjoel: Catch IOExceptions when we try to connect to other running instances and show a error message when one occurs.
r1715: Author: lowjoel: Set that files are not meant to be indexed when it is meant for deletion before we even set the file times.
r1714: Author: lowjoel: Fixed race condition potentially created by initialising the remote executor server thread immediately upon construction since Run is not yet called.
r1713: Author: lowjoel: Since we only force the creation of the SchedulerPanel?'s handle in the constructor, InvokeRequired? should be called on the panel itself, and not on subcontrols as they are still delay-constructed. Fixes crash when Eraser is started quietly and a task is created remotely.
r1712: Author: lowjoel: ThreadAbortExceptions? should not trigger BlackBox? report creation.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008-2010 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.Text;
25using System.IO;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Serialization.Formatters.Binary;
32using System.Security.Principal;
33using System.Security.AccessControl;
34
35namespace Eraser.Manager
36{
37    /// <summary>
38    /// Represents a request to the RemoteExecutorServer instance
39    /// </summary>
40    [Serializable]
41    internal class RemoteExecutorRequest
42    {
43        /// <summary>
44        /// Constructor.
45        /// </summary>
46        /// <param name="func">The function this command is wanting to execute.</param>
47        /// <param name="data">The parameters for the command, serialised using a
48        /// BinaryFormatter</param>
49        public RemoteExecutorRequest(RemoteExecutorFunction func, params object[] data)
50        {
51            Func = func;
52            Data = data;
53        }
54
55        /// <summary>
56        /// The function that this request is meant to call.
57        /// </summary>
58        public RemoteExecutorFunction Func { get; set; }
59
60        /// <summary>
61        /// The parameters associated with the function call.
62        /// </summary>
63        public object[] Data { get; private set; }
64    };
65
66    /// <summary>
67    /// List of supported functions
68    /// </summary>
69    public enum RemoteExecutorFunction
70    {
71        QueueTask,
72        ScheduleTask,
73        UnqueueTask,
74
75        AddTask,
76        DeleteTask,
77        //UpdateTask,
78        GetTaskCount,
79        GetTask
80    }
81
82    /// <summary>
83    /// The RemoteExecutorServer class is the server half required for remote execution
84    /// of tasks.
85    /// </summary>
86    public class RemoteExecutorServer : DirectExecutor
87    {
88        /// <summary>
89        /// Our Remote Server name, prevent collisions!
90        /// </summary>
91        public static readonly string ServerName =
92            "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor-" +
93            WindowsIdentity.GetCurrent().User.ToString();
94
95        /// <summary>
96        /// Constructor.
97        /// </summary>
98        public RemoteExecutorServer()
99        {
100            thread = new Thread(Main);
101            serverLock = new Semaphore(maxServerInstances, maxServerInstances);
102        }
103
104        protected override void Dispose(bool disposing)
105        {
106            if (disposing)
107            {
108                //Close the polling thread that creates new server instances
109                thread.Abort();
110
111                //Close all waiting streams
112                lock (servers)
113                    foreach (NamedPipeServerStream server in servers)
114                        server.Close();
115
116                //Acquire all available locks to ensure no more server instances exist,
117                //then destroy the semaphore
118                for (int i = 0; i < maxServerInstances; ++i)
119                    serverLock.WaitOne();
120                serverLock.Close();
121            }
122
123            base.Dispose(disposing);
124        }
125
126        public override void Run()
127        {
128            thread.Start();
129            Thread.Sleep(0);
130            base.Run();
131        }
132
133        /// <summary>
134        /// The polling loop dealing with every server connection.
135        /// </summary>
136        private void Main()
137        {
138            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
139            {
140                //Wait for a new server instance to be available.
141                if (!serverLock.WaitOne())
142                    continue;
143
144                PipeSecurity security = new PipeSecurity();
145                security.AddAccessRule(new PipeAccessRule(
146                    WindowsIdentity.GetCurrent().User,
147                    PipeAccessRights.FullControl, AccessControlType.Allow));
148
149                //Otherwise, a new instance can be created. Create it and wait for connections.
150                NamedPipeServerStream server = new NamedPipeServerStream(ServerName,
151                    PipeDirection.InOut, maxServerInstances, PipeTransmissionMode.Message,
152                    PipeOptions.Asynchronous, 128, 128, security);
153                server.BeginWaitForConnection(EndWaitForConnection, server);
154               
155                lock (servers)
156                    servers.Add(server);
157            }
158        }
159
160        /// <summary>
161        /// Handles the arguments passed to the server and calls the real function.
162        /// </summary>
163        /// <param name="arguments">The arguments to the function.</param>
164        /// <returns>Te result of the function call.</returns>
165        private delegate object RequestHandler(object[] arguments);
166
167        /// <summary>
168        /// Waits for a connection from a client.
169        /// </summary>
170        /// <param name="result">The AsyncResult object associated with this asynchronous
171        /// operation.</param>
172        private void EndWaitForConnection(IAsyncResult result)
173        {
174            NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState;
175
176            try
177            {
178                //We're done waiting for the connection
179                server.EndWaitForConnection(result);
180
181                while (server.IsConnected)
182                {
183                    //Read the request into the buffer.
184                    RemoteExecutorRequest request = null;
185                    using (MemoryStream mstream = new MemoryStream())
186                    {
187                        byte[] buffer = new byte[65536];
188
189                        do
190                        {
191                            int lastRead = server.Read(buffer, 0, buffer.Length);
192                            mstream.Write(buffer, 0, lastRead);
193                        }
194                        while (!server.IsMessageComplete);
195
196                        //Ignore the request if the client disconnected from us.
197                        if (!server.IsConnected)
198                            return;
199
200                        //Deserialise the header of the request.
201                        mstream.Position = 0;
202                        try
203                        {
204                            request = (RemoteExecutorRequest)new BinaryFormatter().Deserialize(
205                                new MemoryStream(buffer));
206                        }
207                        catch (SerializationException)
208                        {
209                            //We got a unserialisation issue but we can't do anything about it.
210                            return;
211                        }
212                    }
213
214                    //Map the deserialisation function to a real function
215                    Dictionary<RemoteExecutorFunction, RequestHandler> functionMap =
216                        new Dictionary<RemoteExecutorFunction, RequestHandler>();
217                    functionMap.Add(RemoteExecutorFunction.QueueTask,
218                        delegate(object[] args) { QueueTask((Task)args[0]); return null; });
219                    functionMap.Add(RemoteExecutorFunction.ScheduleTask,
220                        delegate(object[] args) { ScheduleTask((Task)args[0]); return null; });
221                    functionMap.Add(RemoteExecutorFunction.UnqueueTask,
222                        delegate(object[] args) { UnqueueTask((Task)args[0]); return null; });
223
224                    functionMap.Add(RemoteExecutorFunction.AddTask,
225                        delegate(object[] args)
226                        {
227                            Tasks.Add((Task)args[0]);
228                            return null;
229                        });
230                    functionMap.Add(RemoteExecutorFunction.DeleteTask,
231                        delegate(object[] args)
232                        {
233                            Tasks.Remove((Task)args[0]);
234                            return null;
235                        });
236                    functionMap.Add(RemoteExecutorFunction.GetTaskCount,
237                        delegate(object[] args) { return Tasks.Count; });
238                    functionMap.Add(RemoteExecutorFunction.GetTask,
239                        delegate(object[] args) { return Tasks[(int)args[0]]; });
240
241                    //Execute the function
242                    object returnValue = functionMap[request.Func](request.Data);
243
244                    //Return the result of the invoked function.
245                    using (MemoryStream mStream = new MemoryStream())
246                    {
247                        if (returnValue != null)
248                        {
249                            byte[] header = BitConverter.GetBytes((Int32)1);
250                            byte[] buffer = null;
251                            new BinaryFormatter().Serialize(mStream, returnValue);
252
253                            server.Write(header, 0, header.Length);
254                            server.Write(buffer, 0, buffer.Length);
255                        }
256                        else
257                        {
258                            byte[] header = BitConverter.GetBytes((Int32)0);
259                            server.Write(header, 0, header.Length);
260                        }
261                    }
262
263                    server.WaitForPipeDrain();
264                }
265            }
266            catch (OperationCanceledException)
267            {
268            }
269            catch (ObjectDisposedException)
270            {
271            }
272            finally
273            {
274                lock (servers)
275                    servers.Remove(server);
276                server.Close();
277                serverLock.Release();
278            }
279        }
280
281        /// <summary>
282        /// The thread which will answer pipe connections
283        /// </summary>
284        private Thread thread;
285
286        /// <summary>
287        /// Counts the number of available server instances.
288        /// </summary>
289        private Semaphore serverLock;
290
291        /// <summary>
292        /// The list storing all available created server instances.
293        /// </summary>
294        private List<NamedPipeServerStream> servers = new List<NamedPipeServerStream>();
295
296        /// <summary>
297        /// The maximum number of server instances existing concurrently.
298        /// </summary>
299        private const int maxServerInstances = 4;
300    }
301
302    /// <summary>
303    /// The RemoteExecutorServer class is the client half required for remote execution
304    /// of tasks, sending requests to the server running on the local computer.
305    /// </summary>
306    public class RemoteExecutorClient : Executor
307    {
308        public RemoteExecutorClient()
309        {
310            client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
311                PipeDirection.InOut);
312            tasks = new RemoteExecutorClientTasksCollection(this);
313        }
314
315        protected override void Dispose(bool disposing)
316        {
317            if (disposing)
318            {
319                client.Close();
320            }
321
322            base.Dispose(disposing);
323        }
324
325        public override void Run()
326        {
327            try
328            {
329                client.Connect(500);
330            }
331            catch (TimeoutException)
332            {
333            }
334        }
335
336        /// <summary>
337        /// Sends a request to the executor server.
338        /// </summary>
339        /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
340        /// <param name="function">The requested operation.</param>
341        /// <param name="args">The arguments for the operation.</param>
342        /// <returns>The return result from the object as if it were executed locally.</returns>
343        internal ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
344        {
345            //Connect to the server
346            object result = null;
347
348            using (MemoryStream mStream = new MemoryStream())
349            {
350                //Serialise the request
351                new BinaryFormatter().Serialize(mStream, new RemoteExecutorRequest(function, args));
352
353                //Write the request to the pipe
354                byte[] buffer = mStream.ToArray();
355                client.Write(buffer, 0, buffer.Length);
356
357                //Read the response from the pipe
358                mStream.Position = 0;
359                buffer = new byte[65536];
360                client.ReadMode = PipeTransmissionMode.Message;
361                do
362                {
363                    int lastRead = client.Read(buffer, 0, buffer.Length);
364                    mStream.Write(buffer, 0, lastRead);
365                }
366                while (!client.IsMessageComplete);
367
368                //Check if the server says there is a response. If so, read it.
369                if (BitConverter.ToInt32(mStream.ToArray(), 0) == 1)
370                {
371                    mStream.Position = 0;
372                    do
373                    {
374                        int lastRead = client.Read(buffer, 0, buffer.Length);
375                        mStream.Write(buffer, 0, lastRead);
376                    }
377                    while (!client.IsMessageComplete);
378
379                    //Deserialise the response
380                    mStream.Position = 0;
381                    if (mStream.Length > 0)
382                        result = new BinaryFormatter().Deserialize(mStream);
383                }
384            }
385
386            return (ReturnType)result;
387        }
388
389        public override void QueueTask(Task task)
390        {
391            SendRequest<object>(RemoteExecutorFunction.QueueTask, task);
392        }
393
394        public override void ScheduleTask(Task task)
395        {
396            SendRequest<object>(RemoteExecutorFunction.ScheduleTask, task);
397        }
398
399        public override void UnqueueTask(Task task)
400        {
401            SendRequest<object>(RemoteExecutorFunction.UnqueueTask, task);
402        }
403
404        public override void QueueRestartTasks()
405        {
406            throw new NotImplementedException();
407        }
408
409        internal override bool IsTaskQueued(Task task)
410        {
411            throw new NotImplementedException();
412        }
413
414        public override ExecutorTasksCollection Tasks
415        {
416            get
417            {
418                return tasks;
419            }
420        }
421
422        /// <summary>
423        /// Checks whether the executor instance has connected to a server.
424        /// </summary>
425        public bool IsConnected
426        {
427            get { return client.IsConnected; }
428        }
429
430        /// <summary>
431        /// The list of tasks belonging to this executor instance.
432        /// </summary>
433        private RemoteExecutorClientTasksCollection tasks;
434
435        /// <summary>
436        /// The named pipe used to connect to another running instance of Eraser.
437        /// </summary>
438        private NamedPipeClientStream client;
439
440        private class RemoteExecutorClientTasksCollection : ExecutorTasksCollection
441        {
442            /// <summary>
443            /// Constructor.
444            /// </summary>
445            /// <param name="executor">The <see cref="RemoteExecutor"/> object owning
446            /// this list.</param>
447            public RemoteExecutorClientTasksCollection(RemoteExecutorClient executor)
448                : base(executor)
449            {
450            }
451
452            /// <summary>
453            /// Sends a request to the executor server.
454            /// </summary>
455            /// <typeparam name="ReturnType">The expected return type of the request.</typeparam>
456            /// <param name="function">The requested operation.</param>
457            /// <param name="args">The arguments for the operation.</param>
458            /// <returns>The return result from the object as if it were executed locally.</returns>
459            private ReturnType SendRequest<ReturnType>(RemoteExecutorFunction function, params object[] args)
460            {
461                RemoteExecutorClient client = (RemoteExecutorClient)Owner;
462                return client.SendRequest<ReturnType>(function, args);
463            }
464
465            #region IList<Task> Members
466            public override int IndexOf(Task item)
467            {
468                throw new NotSupportedException();
469            }
470
471            public override void Insert(int index, Task item)
472            {
473                throw new NotSupportedException();
474            }
475
476            public override void RemoveAt(int index)
477            {
478                throw new NotSupportedException();
479            }
480
481            public override Task this[int index]
482            {
483                get
484                {
485                    return SendRequest<Task>(RemoteExecutorFunction.GetTask, index);
486                }
487                set
488                {
489                    throw new NotSupportedException();
490                }
491            }
492            #endregion
493
494            #region ICollection<Task> Members
495            public override void Add(Task item)
496            {
497                item.Executor = Owner;
498                SendRequest<object>(RemoteExecutorFunction.AddTask, item);
499
500                //Call all the event handlers who registered to be notified of tasks
501                //being added.
502                Owner.OnTaskAdded(new TaskEventArgs(item));
503            }
504
505            public override void Clear()
506            {
507                throw new NotSupportedException();
508            }
509
510            public override bool Contains(Task item)
511            {
512                throw new NotSupportedException();
513            }
514
515            public override void CopyTo(Task[] array, int arrayIndex)
516            {
517                throw new NotSupportedException();
518            }
519
520            public override int Count
521            {
522                get { return SendRequest<int>(RemoteExecutorFunction.GetTaskCount); }
523            }
524
525            public override bool Remove(Task item)
526            {
527                item.Cancel();
528                item.Executor = null;
529                SendRequest<object>(RemoteExecutorFunction.DeleteTask, item);
530
531                //Call all event handlers registered to be notified of task deletions.
532                Owner.OnTaskDeleted(new TaskEventArgs(item));
533                return true;
534            }
535            #endregion
536
537            #region IEnumerable<Task> Members
538            public override IEnumerator<Task> GetEnumerator()
539            {
540                throw new NotSupportedException();
541            }
542            #endregion
543
544            public override void SaveToStream(Stream stream)
545            {
546                throw new NotSupportedException();
547            }
548
549            public override void LoadFromStream(Stream stream)
550            {
551                throw new NotSupportedException();
552            }
553        }
554    }
555}
Note: See TracBrowser for help on using the repository browser.