source: branches/eraser6/Manager/RemoteExecutor.cs @ 737

Revision 737, 12.0 KB checked in by lowjoel, 5 years ago (diff)

-Allow the --method parameter to be optional
-If a RemoteExecutorServer? instance is not running, Program should detect it and start a new Eraser instance.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.IO;
25using System.Text;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Remoting.Messaging;
32using System.Runtime.Serialization.Formatters.Binary;
33
34namespace Eraser.Manager
35{
36    /// <summary>
37    /// Represents a request to the RemoteExecutorServer instance
38    /// </summary>
39    [Serializable]
40    internal class RemoteRequest
41    {
42        /// <summary>
43        /// List of supported functions
44        /// </summary>
45        public enum Function : uint
46        {
47            ADD_TASK = 0,
48            GET_TASK,
49            GET_TASKS,
50            CANCEL_TASK,
51            DELETE_TASK,
52            QUEUE_TASK,
53            REPLACE_TASK,
54            SCHEDULE_TASK,
55            SAVE_TASK_LIST,
56            LOAD_TASK_LIST,
57            QUEUE_RESTART_TASK,
58        }
59
60        /// <summary>
61        /// Constructor.
62        /// </summary>
63        /// <param name="func">The function this command is wanting to execute.</param>
64        /// <param name="data">The parameters for the command, serialised using a
65        /// BinaryFormatter</param>
66        public RemoteRequest(Function func, byte[] data)
67        {
68            Func = func;
69            Data = data;
70        }
71
72        /// <summary>
73        /// The function that this request is meant to call.
74        /// </summary>
75        public Function Func;
76
77        /// <summary>
78        /// The parameters associated with the function call.
79        /// </summary>
80        public byte[] Data;
81    };
82
83    /// <summary>
84    /// The RemoteExecutorServer class is the server half required for remote execution
85    /// of tasks.
86    /// </summary>
87    public class RemoteExecutorServer : DirectExecutor
88    {
89        /// <summary>
90        /// Our Remote Server name, prevent collisions!
91        /// </summary>
92        public const string ServerName = "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor";
93
94        /// <summary>
95        /// The thread which will answer pipe connections
96        /// </summary>
97        private Thread thread = null;
98
99        /// <summary>
100        /// Our pipe instance which handles connections.
101        /// </summary>
102        private NamedPipeServerStream server =
103            new NamedPipeServerStream(ServerName, PipeDirection.InOut, 4,
104                PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
105
106        /// <summary>
107        /// Constructor.
108        /// </summary>
109        public RemoteExecutorServer()
110        {
111            thread = new Thread(Main);
112            thread.Start();
113
114            Thread.Sleep(0);
115        }
116
117        public override void Dispose()
118        {
119            thread.Abort();
120            base.Dispose();
121        }
122
123        /// <summary>
124        /// The polling loop dealing with every server connection.
125        /// </summary>
126        private void Main()
127        {
128            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
129            {
130                //Wait for a connection to be established
131                if (!server.IsConnected)
132                {
133                    IAsyncResult asyncWait = server.BeginWaitForConnection(
134                        server.EndWaitForConnection, null);
135                    while (!server.IsConnected && !asyncWait.AsyncWaitHandle.WaitOne(15))
136                        if (Thread.CurrentThread.ThreadState == ThreadState.AbortRequested)
137                            break;
138                }
139
140                //If we still aren't connected that means the connection failed to establish.
141                if (!server.IsConnected)
142                    continue;
143
144                //Read the request into the buffer.
145                RemoteRequest request = null;
146                using (MemoryStream mstream = new MemoryStream())
147                {
148                    byte[] buffer = new byte[65536];
149                    server.Read(buffer, 0, sizeof(int));
150                    int messageSize = BitConverter.ToInt32(buffer, 0);
151                    while (messageSize > 0)
152                    {
153                        int lastRead = server.Read(buffer, 0, Math.Min(messageSize, buffer.Length));
154                        messageSize -= lastRead;
155                        mstream.Write(buffer, 0, lastRead);
156                    }
157
158                    //Deserialise the header of the request.
159                    mstream.Position = 0;
160                    request = (RemoteRequest)new BinaryFormatter().Deserialize(new MemoryStream(buffer));
161                }
162
163                #region Deserialise
164                object parameter = null;
165                switch (request.Func)
166                {
167                    // void \+ task
168                    case RemoteRequest.Function.CANCEL_TASK:
169                    case RemoteRequest.Function.QUEUE_TASK:
170                    case RemoteRequest.Function.REPLACE_TASK:
171                    case RemoteRequest.Function.SCHEDULE_TASK:
172                    case RemoteRequest.Function.ADD_TASK:
173                        using (MemoryStream mStream = new MemoryStream(request.Data))
174                            parameter = new BinaryFormatter().Deserialize(mStream);
175                        break;
176
177                    // bool \+ taskid
178                    case RemoteRequest.Function.DELETE_TASK:
179                    // task \+ taskid
180                    case RemoteRequest.Function.GET_TASK:
181                        using (MemoryStream mStream = new MemoryStream(request.Data))
182                            parameter = new BinaryFormatter().Deserialize(mStream);
183                        break;
184
185                    // void \+ stream
186                    case RemoteRequest.Function.LOAD_TASK_LIST:
187                    case RemoteRequest.Function.SAVE_TASK_LIST:
188                        using (MemoryStream mStream = new MemoryStream(request.Data))
189                            parameter = new BinaryFormatter().Deserialize(mStream);
190                        break;
191
192                    // list<task> \+ void
193                    case RemoteRequest.Function.GET_TASKS:
194                    // void \+ void
195                    case RemoteRequest.Function.QUEUE_RESTART_TASK:
196                        break;
197
198                    default:
199                        throw new FatalException("Unknown RemoteExecutorClient.Function");
200                }
201                #endregion
202
203                #region Invoke
204                object returnValue = null;
205                switch (request.Func)
206                {
207                    // void \+ task
208                    case RemoteRequest.Function.CANCEL_TASK:
209                        CancelTask((Task)parameter);
210                        break;
211
212                    // void \+ task
213                    case RemoteRequest.Function.QUEUE_TASK:
214                        QueueTask((Task)parameter);
215                        break;
216
217                    // void \+ task
218                    case RemoteRequest.Function.REPLACE_TASK:
219                        ReplaceTask((Task)parameter);
220                        break;
221
222                    // void \+ task
223                    case RemoteRequest.Function.SCHEDULE_TASK:
224                        ScheduleTask((Task)parameter);
225                        break;
226
227                    // void \+ ref task
228                    case RemoteRequest.Function.ADD_TASK:
229                    {
230                        Task task = (Task)parameter;
231                        AddTask(ref task);
232                        break;
233                    }
234
235                    // bool \+ taskid
236                    case RemoteRequest.Function.DELETE_TASK:
237                        returnValue = DeleteTask((uint)parameter);
238                        break;
239
240                    // task \+ taskid
241                    case RemoteRequest.Function.GET_TASK:
242                        returnValue = GetTask((uint)parameter);
243                        break;
244
245                    // void \+ stream
246                    case RemoteRequest.Function.LOAD_TASK_LIST:
247                        LoadTaskList((Stream)parameter);
248                        break;
249
250                    // void \+ stream
251                    case RemoteRequest.Function.SAVE_TASK_LIST:
252                        SaveTaskList((Stream)parameter);
253                        break;
254
255                    // list<task> \+ void
256                    case RemoteRequest.Function.GET_TASKS:
257                        returnValue = GetTasks();
258                        break;
259
260                    // void \+ void
261                    case RemoteRequest.Function.QUEUE_RESTART_TASK:
262                        QueueRestartTasks();
263                        break;
264
265                    default:
266                        throw new FatalException("Unknown RemoteExecutorClient.Function");
267                #endregion
268                }
269
270                //Return the result of the invoked function, if any.
271                if (returnValue != null)
272                    using (MemoryStream mStream = new MemoryStream())
273                    {
274                        new BinaryFormatter().Serialize(mStream, returnValue);
275                        byte[] buffer = mStream.ToArray();
276                        byte[] bufferLength = BitConverter.GetBytes(buffer.Length);
277                        server.Write(bufferLength, 0, sizeof(int));
278                        server.Write(buffer, 0, buffer.Length);
279                    }
280                else
281                {
282                    byte[] buffer = BitConverter.GetBytes(0);
283                    server.Write(buffer, 0, sizeof(int));
284                }
285
286                // we are done, disconnect
287                server.Disconnect();
288            }
289        }
290    }
291
292    public class RemoteExecutorClient : Executor
293    {
294        private NamedPipeClientStream client =
295            new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
296                PipeDirection.InOut);
297
298        public RemoteExecutorClient()
299        {
300        }
301
302        public override void Dispose()
303        {
304            client.Close();
305            client.Dispose();
306        }
307
308        /// <summary>
309        /// Connects to the remote server.
310        /// </summary>
311        /// <returns>True if the connection to the remote server was established.</returns>
312        public bool Connect()
313        {
314            try
315            {
316                client.Connect(250);
317            }
318            catch (TimeoutException)
319            {
320            }
321
322            return client.IsConnected;
323        }
324
325        private object SendRequest(RemoteRequest header)
326        {
327            //Connect to the server
328            object result = null;
329            client.Connect(5000);
330
331            using (MemoryStream mStream = new MemoryStream())
332            {
333                //Serialise the request
334                new BinaryFormatter().Serialize(mStream, header);
335
336                //Write the request to the pipe
337                byte[] buffer = mStream.ToArray();
338                byte[] bufferLength = BitConverter.GetBytes(buffer.Length);
339                client.Write(bufferLength, 0, sizeof(int));
340                client.Write(buffer, 0, buffer.Length);
341
342                //Read the response from the pipe
343                mStream.Position = 0;
344                buffer = new byte[32768];
345                client.Read(buffer, 0, sizeof(int));
346                int responseLength = BitConverter.ToInt32(buffer, 0);
347                while (responseLength > 0)
348                    responseLength -= client.Read(buffer, 0, Math.Min(buffer.Length, responseLength));
349
350                //Deserialise the response
351                mStream.Position = 0;
352                if (mStream.Length > 0)
353                    result = new BinaryFormatter().Deserialize(mStream);
354            }
355
356            return result;
357        }
358
359        public override bool DeleteTask(uint taskId)
360        {
361            MemoryStream mStream = new MemoryStream();
362            new BinaryFormatter().Serialize(mStream, taskId);
363            return (bool)SendRequest(new RemoteRequest(RemoteRequest.Function.DELETE_TASK,
364                mStream.GetBuffer()));
365        }
366
367        public override List<Task> GetTasks()
368        {
369            MemoryStream mStream = new MemoryStream();
370            new BinaryFormatter().Serialize(mStream, null);
371            return (List<Task>)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASKS,
372                mStream.GetBuffer()));
373        }
374
375        public override Task GetTask(uint taskId)
376        {
377            MemoryStream mStream = new MemoryStream();
378            new BinaryFormatter().Serialize(mStream, taskId);
379            return (Task)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASK,
380                mStream.GetBuffer()));
381        }
382
383        public override void LoadTaskList(Stream stream)
384        {
385            MemoryStream mStream = new MemoryStream();
386            new BinaryFormatter().Serialize(mStream, stream);
387            SendRequest(new RemoteRequest(RemoteRequest.Function.LOAD_TASK_LIST,
388                mStream.GetBuffer()));
389        }
390
391        public override void AddTask(ref Task task)
392        {
393            MemoryStream mStream = new MemoryStream();
394            new BinaryFormatter().Serialize(mStream, task);
395            SendRequest(new RemoteRequest(RemoteRequest.Function.ADD_TASK,
396                mStream.GetBuffer()));
397        }
398
399        public override void CancelTask(Task task)
400        {
401            MemoryStream mStream = new MemoryStream();
402            new BinaryFormatter().Serialize(mStream, task);
403            SendRequest(new RemoteRequest(RemoteRequest.Function.CANCEL_TASK,
404                mStream.GetBuffer()));
405        }
406
407        public override void QueueRestartTasks()
408        {
409            MemoryStream mStream = new MemoryStream();
410            new BinaryFormatter().Serialize(mStream, null);
411            SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_RESTART_TASK,
412                mStream.GetBuffer()));
413        }
414
415        public override void QueueTask(Task task)
416        {
417            MemoryStream mStream = new MemoryStream();
418            new BinaryFormatter().Serialize(mStream, task);
419            SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_TASK,
420                mStream.GetBuffer()));
421        }
422
423        public override void ReplaceTask(Task task)
424        {
425            MemoryStream mStream = new MemoryStream();
426            new BinaryFormatter().Serialize(mStream, task);
427            SendRequest(new RemoteRequest(RemoteRequest.Function.REPLACE_TASK,
428                mStream.GetBuffer()));
429        }
430
431        public override void Run()
432        {
433        }
434
435        public override void ScheduleTask(Task task)
436        {
437            MemoryStream mStream = new MemoryStream();
438            new BinaryFormatter().Serialize(mStream, task);
439            SendRequest(new RemoteRequest(RemoteRequest.Function.SCHEDULE_TASK,
440                mStream.GetBuffer()));
441        }
442
443        public override void SaveTaskList(Stream stream)
444        {
445            MemoryStream mStream = new MemoryStream();
446            new BinaryFormatter().Serialize(mStream, stream);
447            SendRequest(new RemoteRequest(RemoteRequest.Function.SAVE_TASK_LIST,
448                mStream.GetBuffer()));
449        }
450    }
451}
Note: See TracBrowser for help on using the repository browser.