source: branches/eraser6/Manager/RemoteExecutor.cs @ 749

Revision 749, 12.9 KB checked in by lowjoel, 5 years ago (diff)

Implement pipe connections correctly.
-Create one new instance every time a pipe is used up and close it when it is done.
-Use a threadpool to execute the tasks asynchronously (and so the main thread can wait for more connections)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.IO;
25using System.Text;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Remoting.Messaging;
32using System.Runtime.Serialization.Formatters.Binary;
33
34namespace Eraser.Manager
35{
36    /// <summary>
37    /// Represents a request to the RemoteExecutorServer instance
38    /// </summary>
39    [Serializable]
40    internal class RemoteRequest
41    {
42        /// <summary>
43        /// List of supported functions
44        /// </summary>
45        public enum Function : uint
46        {
47            ADD_TASK = 0,
48            GET_TASK,
49            GET_TASKS,
50            CANCEL_TASK,
51            DELETE_TASK,
52            QUEUE_TASK,
53            REPLACE_TASK,
54            SCHEDULE_TASK,
55            SAVE_TASK_LIST,
56            LOAD_TASK_LIST,
57            QUEUE_RESTART_TASK,
58        }
59
60        /// <summary>
61        /// Constructor.
62        /// </summary>
63        /// <param name="func">The function this command is wanting to execute.</param>
64        /// <param name="data">The parameters for the command, serialised using a
65        /// BinaryFormatter</param>
66        public RemoteRequest(Function func, byte[] data)
67        {
68            Func = func;
69            Data = data;
70        }
71
72        /// <summary>
73        /// The function that this request is meant to call.
74        /// </summary>
75        public Function Func;
76
77        /// <summary>
78        /// The parameters associated with the function call.
79        /// </summary>
80        public byte[] Data;
81    };
82
83    /// <summary>
84    /// The RemoteExecutorServer class is the server half required for remote execution
85    /// of tasks.
86    /// </summary>
87    public class RemoteExecutorServer : DirectExecutor
88    {
89        /// <summary>
90        /// Our Remote Server name, prevent collisions!
91        /// </summary>
92        public static readonly string ServerName =
93            "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor-" +
94            System.Security.Principal.WindowsIdentity.GetCurrent().User.ToString();
95
96        /// <summary>
97        /// The thread which will answer pipe connections
98        /// </summary>
99        private Thread thread = null;
100
101        /// <summary>
102        /// Constructor.
103        /// </summary>
104        public RemoteExecutorServer()
105        {
106            thread = new Thread(Main);
107            thread.Start();
108            Thread.Sleep(0);
109        }
110
111        public override void Dispose()
112        {
113            thread.Abort();
114            base.Dispose();
115        }
116
117        /// <summary>
118        /// The polling loop dealing with every server connection.
119        /// </summary>
120        private void Main()
121        {
122            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
123            {
124                //Wait for a connection to be established
125                NamedPipeServerStream server = new NamedPipeServerStream(ServerName,
126                    PipeDirection.InOut, 4, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
127                IAsyncResult asyncWait = server.BeginWaitForConnection(
128                    EndWaitForConnection, server);
129
130                while (!server.IsConnected && !asyncWait.AsyncWaitHandle.WaitOne(15))
131                    if (Thread.CurrentThread.ThreadState == ThreadState.AbortRequested)
132                        break;
133
134                //Execute the handler if the server was connected.
135                if (server.IsConnected)
136                    ThreadPool.QueueUserWorkItem(ProcessConnection, server);
137            }
138        }
139
140        /// <summary>
141        /// Waits for a connection from a client.
142        /// </summary>
143        /// <param name="result">The AsyncResult object associated with this asynchronous
144        /// operation.</param>
145        private void EndWaitForConnection(IAsyncResult result)
146        {
147            NamedPipeServerStream server = (NamedPipeServerStream)result.AsyncState;
148            server.WaitForConnection();
149            if (server.IsConnected)
150                server.EndWaitForConnection(result);
151        }
152
153        /// <summary>
154        /// Handles a new connection from the client.
155        /// </summary>
156        /// <param name="param">The connected NamedPipeServerStream instance.</param>
157        private void ProcessConnection(object param)
158        {
159            //Get the Server instance.
160            using (NamedPipeServerStream server = (NamedPipeServerStream)param)
161            {
162                //Read the request into the buffer.
163                RemoteRequest request = null;
164                using (MemoryStream mstream = new MemoryStream())
165                {
166                    byte[] buffer = new byte[65536];
167                    server.Read(buffer, 0, sizeof(int));
168                    int messageSize = BitConverter.ToInt32(buffer, 0);
169                    while (messageSize > 0)
170                    {
171                        int lastRead = server.Read(buffer, 0, Math.Min(messageSize, buffer.Length));
172                        messageSize -= lastRead;
173                        mstream.Write(buffer, 0, lastRead);
174                    }
175
176                    //Ignore the request if the client disconnected from us.
177                    if (!server.IsConnected)
178                        return;
179
180                    //Deserialise the header of the request.
181                    mstream.Position = 0;
182                    try
183                    {
184                        request = (RemoteRequest)new BinaryFormatter().Deserialize(
185                            new MemoryStream(buffer));
186                    }
187                    catch (SerializationException)
188                    {
189                        //We got a unserialisation issue but we can't do anything about it.
190                        return;
191                    }
192                }
193
194                #region Deserialise
195                object parameter = null;
196                switch (request.Func)
197                {
198                    // void \+ task
199                    case RemoteRequest.Function.CANCEL_TASK:
200                    case RemoteRequest.Function.QUEUE_TASK:
201                    case RemoteRequest.Function.REPLACE_TASK:
202                    case RemoteRequest.Function.SCHEDULE_TASK:
203                    case RemoteRequest.Function.ADD_TASK:
204                        using (MemoryStream mStream = new MemoryStream(request.Data))
205                            parameter = new BinaryFormatter().Deserialize(mStream);
206                        break;
207
208                    // bool \+ taskid
209                    case RemoteRequest.Function.DELETE_TASK:
210                    // task \+ taskid
211                    case RemoteRequest.Function.GET_TASK:
212                        using (MemoryStream mStream = new MemoryStream(request.Data))
213                            parameter = new BinaryFormatter().Deserialize(mStream);
214                        break;
215
216                    // void \+ stream
217                    case RemoteRequest.Function.LOAD_TASK_LIST:
218                    case RemoteRequest.Function.SAVE_TASK_LIST:
219                        using (MemoryStream mStream = new MemoryStream(request.Data))
220                            parameter = new BinaryFormatter().Deserialize(mStream);
221                        break;
222
223                    // list<task> \+ void
224                    case RemoteRequest.Function.GET_TASKS:
225                    // void \+ void
226                    case RemoteRequest.Function.QUEUE_RESTART_TASK:
227                        break;
228
229                    default:
230                        throw new FatalException("Unknown RemoteExecutorClient.Function");
231                }
232                #endregion
233
234                #region Invoke
235                object returnValue = null;
236                switch (request.Func)
237                {
238                    // void \+ task
239                    case RemoteRequest.Function.CANCEL_TASK:
240                        CancelTask((Task)parameter);
241                        break;
242
243                    // void \+ task
244                    case RemoteRequest.Function.QUEUE_TASK:
245                        QueueTask((Task)parameter);
246                        break;
247
248                    // void \+ task
249                    case RemoteRequest.Function.REPLACE_TASK:
250                        ReplaceTask((Task)parameter);
251                        break;
252
253                    // void \+ task
254                    case RemoteRequest.Function.SCHEDULE_TASK:
255                        ScheduleTask((Task)parameter);
256                        break;
257
258                    // void \+ ref task
259                    case RemoteRequest.Function.ADD_TASK:
260                        {
261                            Task task = (Task)parameter;
262                            AddTask(ref task);
263                            break;
264                        }
265
266                    // bool \+ taskid
267                    case RemoteRequest.Function.DELETE_TASK:
268                        returnValue = DeleteTask((uint)parameter);
269                        break;
270
271                    // task \+ taskid
272                    case RemoteRequest.Function.GET_TASK:
273                        returnValue = GetTask((uint)parameter);
274                        break;
275
276                    // void \+ stream
277                    case RemoteRequest.Function.LOAD_TASK_LIST:
278                        LoadTaskList((Stream)parameter);
279                        break;
280
281                    // void \+ stream
282                    case RemoteRequest.Function.SAVE_TASK_LIST:
283                        SaveTaskList((Stream)parameter);
284                        break;
285
286                    // list<task> \+ void
287                    case RemoteRequest.Function.GET_TASKS:
288                        returnValue = GetTasks();
289                        break;
290
291                    // void \+ void
292                    case RemoteRequest.Function.QUEUE_RESTART_TASK:
293                        QueueRestartTasks();
294                        break;
295
296                    default:
297                        throw new FatalException("Unknown RemoteExecutorClient.Function");
298                #endregion
299                }
300
301                //Return the result of the invoked function, if any.
302                if (returnValue != null)
303                    using (MemoryStream mStream = new MemoryStream())
304                    {
305                        new BinaryFormatter().Serialize(mStream, returnValue);
306                        byte[] buffer = mStream.ToArray();
307                        byte[] bufferLength = BitConverter.GetBytes(buffer.Length);
308                        server.Write(bufferLength, 0, sizeof(int));
309                        server.Write(buffer, 0, buffer.Length);
310                    }
311                else
312                {
313                    byte[] buffer = BitConverter.GetBytes(0);
314                    server.Write(buffer, 0, sizeof(int));
315                }
316            }
317        }
318    }
319
320    public class RemoteExecutorClient : Executor
321    {
322        private NamedPipeClientStream client;
323
324        public RemoteExecutorClient()
325        {
326            client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
327                PipeDirection.InOut);
328        }
329
330        public override void Dispose()
331        {
332            client.Close();
333            client.Dispose();
334        }
335
336        /// <summary>
337        /// Connects to the remote server.
338        /// </summary>
339        /// <returns>True if the connection to the remote server was established.</returns>
340        public bool Connect()
341        {
342            try
343            {
344                client.Connect(3000);
345            }
346            catch (TimeoutException)
347            {
348            }
349
350            return client.IsConnected;
351        }
352
353        private object SendRequest(RemoteRequest header)
354        {
355            //Connect to the server
356            object result = null;
357
358            using (MemoryStream mStream = new MemoryStream())
359            {
360                //Serialise the request
361                new BinaryFormatter().Serialize(mStream, header);
362
363                //Write the request to the pipe
364                byte[] buffer = mStream.ToArray();
365                byte[] bufferLength = BitConverter.GetBytes(buffer.Length);
366                client.Write(bufferLength, 0, sizeof(int));
367                client.Write(buffer, 0, buffer.Length);
368
369                //Read the response from the pipe
370                mStream.Position = 0;
371                buffer = new byte[32768];
372                client.Read(buffer, 0, sizeof(int));
373                int responseLength = BitConverter.ToInt32(buffer, 0);
374                while (responseLength > 0)
375                    responseLength -= client.Read(buffer, 0, Math.Min(buffer.Length, responseLength));
376
377                //Deserialise the response
378                mStream.Position = 0;
379                if (mStream.Length > 0)
380                    result = new BinaryFormatter().Deserialize(mStream);
381            }
382
383            return result;
384        }
385
386        public override bool DeleteTask(uint taskId)
387        {
388            MemoryStream mStream = new MemoryStream();
389            new BinaryFormatter().Serialize(mStream, taskId);
390            return (bool)SendRequest(new RemoteRequest(RemoteRequest.Function.DELETE_TASK,
391                mStream.GetBuffer()));
392        }
393
394        public override List<Task> GetTasks()
395        {
396            MemoryStream mStream = new MemoryStream();
397            new BinaryFormatter().Serialize(mStream, null);
398            return (List<Task>)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASKS,
399                mStream.GetBuffer()));
400        }
401
402        public override Task GetTask(uint taskId)
403        {
404            MemoryStream mStream = new MemoryStream();
405            new BinaryFormatter().Serialize(mStream, taskId);
406            return (Task)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASK,
407                mStream.GetBuffer()));
408        }
409
410        public override void LoadTaskList(Stream stream)
411        {
412            MemoryStream mStream = new MemoryStream();
413            new BinaryFormatter().Serialize(mStream, stream);
414            SendRequest(new RemoteRequest(RemoteRequest.Function.LOAD_TASK_LIST,
415                mStream.GetBuffer()));
416        }
417
418        public override void AddTask(ref Task task)
419        {
420            MemoryStream mStream = new MemoryStream();
421            new BinaryFormatter().Serialize(mStream, task);
422            SendRequest(new RemoteRequest(RemoteRequest.Function.ADD_TASK,
423                mStream.GetBuffer()));
424        }
425
426        public override void CancelTask(Task task)
427        {
428            MemoryStream mStream = new MemoryStream();
429            new BinaryFormatter().Serialize(mStream, task);
430            SendRequest(new RemoteRequest(RemoteRequest.Function.CANCEL_TASK,
431                mStream.GetBuffer()));
432        }
433
434        public override void QueueRestartTasks()
435        {
436            MemoryStream mStream = new MemoryStream();
437            new BinaryFormatter().Serialize(mStream, null);
438            SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_RESTART_TASK,
439                mStream.GetBuffer()));
440        }
441
442        public override void QueueTask(Task task)
443        {
444            MemoryStream mStream = new MemoryStream();
445            new BinaryFormatter().Serialize(mStream, task);
446            SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_TASK,
447                mStream.GetBuffer()));
448        }
449
450        public override void ReplaceTask(Task task)
451        {
452            MemoryStream mStream = new MemoryStream();
453            new BinaryFormatter().Serialize(mStream, task);
454            SendRequest(new RemoteRequest(RemoteRequest.Function.REPLACE_TASK,
455                mStream.GetBuffer()));
456        }
457
458        public override void Run()
459        {
460        }
461
462        public override void ScheduleTask(Task task)
463        {
464            MemoryStream mStream = new MemoryStream();
465            new BinaryFormatter().Serialize(mStream, task);
466            SendRequest(new RemoteRequest(RemoteRequest.Function.SCHEDULE_TASK,
467                mStream.GetBuffer()));
468        }
469
470        public override void SaveTaskList(Stream stream)
471        {
472            MemoryStream mStream = new MemoryStream();
473            new BinaryFormatter().Serialize(mStream, stream);
474            SendRequest(new RemoteRequest(RemoteRequest.Function.SAVE_TASK_LIST,
475                mStream.GetBuffer()));
476        }
477    }
478}
Note: See TracBrowser for help on using the repository browser.