source: branches/eraser6/Manager/RemoteExecutor.cs @ 739

Revision 739, 12.2 KB checked in by lowjoel, 6 years ago (diff)

Ignore the request (or partial request) sent by clients who disconnect before the request is completely transferred

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.IO;
25using System.Text;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Remoting.Messaging;
32using System.Runtime.Serialization.Formatters.Binary;
33
34namespace Eraser.Manager
35{
36    /// <summary>
37    /// Represents a request to the RemoteExecutorServer instance
38    /// </summary>
39    [Serializable]
40    internal class RemoteRequest
41    {
42        /// <summary>
43        /// List of supported functions
44        /// </summary>
45        public enum Function : uint
46        {
47            ADD_TASK = 0,
48            GET_TASK,
49            GET_TASKS,
50            CANCEL_TASK,
51            DELETE_TASK,
52            QUEUE_TASK,
53            REPLACE_TASK,
54            SCHEDULE_TASK,
55            SAVE_TASK_LIST,
56            LOAD_TASK_LIST,
57            QUEUE_RESTART_TASK,
58        }
59
60        /// <summary>
61        /// Constructor.
62        /// </summary>
63        /// <param name="func">The function this command is wanting to execute.</param>
64        /// <param name="data">The parameters for the command, serialised using a
65        /// BinaryFormatter</param>
66        public RemoteRequest(Function func, byte[] data)
67        {
68            Func = func;
69            Data = data;
70        }
71
72        /// <summary>
73        /// The function that this request is meant to call.
74        /// </summary>
75        public Function Func;
76
77        /// <summary>
78        /// The parameters associated with the function call.
79        /// </summary>
80        public byte[] Data;
81    };
82
83    /// <summary>
84    /// The RemoteExecutorServer class is the server half required for remote execution
85    /// of tasks.
86    /// </summary>
87    public class RemoteExecutorServer : DirectExecutor
88    {
89        /// <summary>
90        /// Our Remote Server name, prevent collisions!
91        /// </summary>
92        public static readonly string ServerName =
93            "Eraser-FB6C5A7D-E47F-475f-ABA4-58F4D24BB67E-RemoteExecutor-" +
94            System.Security.Principal.WindowsIdentity.GetCurrent().User.ToString();
95
96        /// <summary>
97        /// The thread which will answer pipe connections
98        /// </summary>
99        private Thread thread = null;
100
101        /// <summary>
102        /// Our pipe instance which handles connections.
103        /// </summary>
104        private NamedPipeServerStream server;
105
106        /// <summary>
107        /// Constructor.
108        /// </summary>
109        public RemoteExecutorServer()
110        {
111            server = new NamedPipeServerStream(ServerName, PipeDirection.InOut, 4,
112                PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
113
114            thread = new Thread(Main);
115            thread.Start();
116            Thread.Sleep(0);
117        }
118
119        public override void Dispose()
120        {
121            thread.Abort();
122            base.Dispose();
123        }
124
125        /// <summary>
126        /// The polling loop dealing with every server connection.
127        /// </summary>
128        private void Main()
129        {
130            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
131            {
132                //Wait for a connection to be established
133                if (!server.IsConnected)
134                {
135                    IAsyncResult asyncWait = server.BeginWaitForConnection(
136                        server.EndWaitForConnection, null);
137                    while (!server.IsConnected && !asyncWait.AsyncWaitHandle.WaitOne(15))
138                        if (Thread.CurrentThread.ThreadState == ThreadState.AbortRequested)
139                            break;
140                }
141
142                //If we still aren't connected that means the connection failed to establish.
143                if (!server.IsConnected)
144                    continue;
145
146                //Read the request into the buffer.
147                RemoteRequest request = null;
148                using (MemoryStream mstream = new MemoryStream())
149                {
150                    byte[] buffer = new byte[65536];
151                    server.Read(buffer, 0, sizeof(int));
152                    int messageSize = BitConverter.ToInt32(buffer, 0);
153                    while (messageSize > 0)
154                    {
155                        int lastRead = server.Read(buffer, 0, Math.Min(messageSize, buffer.Length));
156                        messageSize -= lastRead;
157                        mstream.Write(buffer, 0, lastRead);
158                    }
159
160                    //Ignore the request if the client disconnected from us.
161                    if (!server.IsConnected)
162                        continue;
163
164                    //Deserialise the header of the request.
165                    mstream.Position = 0;
166                    request = (RemoteRequest)new BinaryFormatter().Deserialize(new MemoryStream(buffer));
167                }
168
169                #region Deserialise
170                object parameter = null;
171                switch (request.Func)
172                {
173                    // void \+ task
174                    case RemoteRequest.Function.CANCEL_TASK:
175                    case RemoteRequest.Function.QUEUE_TASK:
176                    case RemoteRequest.Function.REPLACE_TASK:
177                    case RemoteRequest.Function.SCHEDULE_TASK:
178                    case RemoteRequest.Function.ADD_TASK:
179                        using (MemoryStream mStream = new MemoryStream(request.Data))
180                            parameter = new BinaryFormatter().Deserialize(mStream);
181                        break;
182
183                    // bool \+ taskid
184                    case RemoteRequest.Function.DELETE_TASK:
185                    // task \+ taskid
186                    case RemoteRequest.Function.GET_TASK:
187                        using (MemoryStream mStream = new MemoryStream(request.Data))
188                            parameter = new BinaryFormatter().Deserialize(mStream);
189                        break;
190
191                    // void \+ stream
192                    case RemoteRequest.Function.LOAD_TASK_LIST:
193                    case RemoteRequest.Function.SAVE_TASK_LIST:
194                        using (MemoryStream mStream = new MemoryStream(request.Data))
195                            parameter = new BinaryFormatter().Deserialize(mStream);
196                        break;
197
198                    // list<task> \+ void
199                    case RemoteRequest.Function.GET_TASKS:
200                    // void \+ void
201                    case RemoteRequest.Function.QUEUE_RESTART_TASK:
202                        break;
203
204                    default:
205                        throw new FatalException("Unknown RemoteExecutorClient.Function");
206                }
207                #endregion
208
209                #region Invoke
210                object returnValue = null;
211                switch (request.Func)
212                {
213                    // void \+ task
214                    case RemoteRequest.Function.CANCEL_TASK:
215                        CancelTask((Task)parameter);
216                        break;
217
218                    // void \+ task
219                    case RemoteRequest.Function.QUEUE_TASK:
220                        QueueTask((Task)parameter);
221                        break;
222
223                    // void \+ task
224                    case RemoteRequest.Function.REPLACE_TASK:
225                        ReplaceTask((Task)parameter);
226                        break;
227
228                    // void \+ task
229                    case RemoteRequest.Function.SCHEDULE_TASK:
230                        ScheduleTask((Task)parameter);
231                        break;
232
233                    // void \+ ref task
234                    case RemoteRequest.Function.ADD_TASK:
235                    {
236                        Task task = (Task)parameter;
237                        AddTask(ref task);
238                        break;
239                    }
240
241                    // bool \+ taskid
242                    case RemoteRequest.Function.DELETE_TASK:
243                        returnValue = DeleteTask((uint)parameter);
244                        break;
245
246                    // task \+ taskid
247                    case RemoteRequest.Function.GET_TASK:
248                        returnValue = GetTask((uint)parameter);
249                        break;
250
251                    // void \+ stream
252                    case RemoteRequest.Function.LOAD_TASK_LIST:
253                        LoadTaskList((Stream)parameter);
254                        break;
255
256                    // void \+ stream
257                    case RemoteRequest.Function.SAVE_TASK_LIST:
258                        SaveTaskList((Stream)parameter);
259                        break;
260
261                    // list<task> \+ void
262                    case RemoteRequest.Function.GET_TASKS:
263                        returnValue = GetTasks();
264                        break;
265
266                    // void \+ void
267                    case RemoteRequest.Function.QUEUE_RESTART_TASK:
268                        QueueRestartTasks();
269                        break;
270
271                    default:
272                        throw new FatalException("Unknown RemoteExecutorClient.Function");
273                #endregion
274                }
275
276                //Return the result of the invoked function, if any.
277                if (returnValue != null)
278                    using (MemoryStream mStream = new MemoryStream())
279                    {
280                        new BinaryFormatter().Serialize(mStream, returnValue);
281                        byte[] buffer = mStream.ToArray();
282                        byte[] bufferLength = BitConverter.GetBytes(buffer.Length);
283                        server.Write(bufferLength, 0, sizeof(int));
284                        server.Write(buffer, 0, buffer.Length);
285                    }
286                else
287                {
288                    byte[] buffer = BitConverter.GetBytes(0);
289                    server.Write(buffer, 0, sizeof(int));
290                }
291
292                // we are done, disconnect
293                server.Disconnect();
294            }
295        }
296    }
297
298    public class RemoteExecutorClient : Executor
299    {
300        private NamedPipeClientStream client;
301
302        public RemoteExecutorClient()
303        {
304            client = new NamedPipeClientStream(".", RemoteExecutorServer.ServerName,
305                PipeDirection.InOut);
306        }
307
308        public override void Dispose()
309        {
310            client.Close();
311            client.Dispose();
312        }
313
314        /// <summary>
315        /// Connects to the remote server.
316        /// </summary>
317        /// <returns>True if the connection to the remote server was established.</returns>
318        public bool Connect()
319        {
320            try
321            {
322                client.Connect(250);
323            }
324            catch (TimeoutException)
325            {
326            }
327
328            return client.IsConnected;
329        }
330
331        private object SendRequest(RemoteRequest header)
332        {
333            //Connect to the server
334            object result = null;
335            client.Connect(5000);
336
337            using (MemoryStream mStream = new MemoryStream())
338            {
339                //Serialise the request
340                new BinaryFormatter().Serialize(mStream, header);
341
342                //Write the request to the pipe
343                byte[] buffer = mStream.ToArray();
344                byte[] bufferLength = BitConverter.GetBytes(buffer.Length);
345                client.Write(bufferLength, 0, sizeof(int));
346                client.Write(buffer, 0, buffer.Length);
347
348                //Read the response from the pipe
349                mStream.Position = 0;
350                buffer = new byte[32768];
351                client.Read(buffer, 0, sizeof(int));
352                int responseLength = BitConverter.ToInt32(buffer, 0);
353                while (responseLength > 0)
354                    responseLength -= client.Read(buffer, 0, Math.Min(buffer.Length, responseLength));
355
356                //Deserialise the response
357                mStream.Position = 0;
358                if (mStream.Length > 0)
359                    result = new BinaryFormatter().Deserialize(mStream);
360            }
361
362            return result;
363        }
364
365        public override bool DeleteTask(uint taskId)
366        {
367            MemoryStream mStream = new MemoryStream();
368            new BinaryFormatter().Serialize(mStream, taskId);
369            return (bool)SendRequest(new RemoteRequest(RemoteRequest.Function.DELETE_TASK,
370                mStream.GetBuffer()));
371        }
372
373        public override List<Task> GetTasks()
374        {
375            MemoryStream mStream = new MemoryStream();
376            new BinaryFormatter().Serialize(mStream, null);
377            return (List<Task>)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASKS,
378                mStream.GetBuffer()));
379        }
380
381        public override Task GetTask(uint taskId)
382        {
383            MemoryStream mStream = new MemoryStream();
384            new BinaryFormatter().Serialize(mStream, taskId);
385            return (Task)SendRequest(new RemoteRequest(RemoteRequest.Function.GET_TASK,
386                mStream.GetBuffer()));
387        }
388
389        public override void LoadTaskList(Stream stream)
390        {
391            MemoryStream mStream = new MemoryStream();
392            new BinaryFormatter().Serialize(mStream, stream);
393            SendRequest(new RemoteRequest(RemoteRequest.Function.LOAD_TASK_LIST,
394                mStream.GetBuffer()));
395        }
396
397        public override void AddTask(ref Task task)
398        {
399            MemoryStream mStream = new MemoryStream();
400            new BinaryFormatter().Serialize(mStream, task);
401            SendRequest(new RemoteRequest(RemoteRequest.Function.ADD_TASK,
402                mStream.GetBuffer()));
403        }
404
405        public override void CancelTask(Task task)
406        {
407            MemoryStream mStream = new MemoryStream();
408            new BinaryFormatter().Serialize(mStream, task);
409            SendRequest(new RemoteRequest(RemoteRequest.Function.CANCEL_TASK,
410                mStream.GetBuffer()));
411        }
412
413        public override void QueueRestartTasks()
414        {
415            MemoryStream mStream = new MemoryStream();
416            new BinaryFormatter().Serialize(mStream, null);
417            SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_RESTART_TASK,
418                mStream.GetBuffer()));
419        }
420
421        public override void QueueTask(Task task)
422        {
423            MemoryStream mStream = new MemoryStream();
424            new BinaryFormatter().Serialize(mStream, task);
425            SendRequest(new RemoteRequest(RemoteRequest.Function.QUEUE_TASK,
426                mStream.GetBuffer()));
427        }
428
429        public override void ReplaceTask(Task task)
430        {
431            MemoryStream mStream = new MemoryStream();
432            new BinaryFormatter().Serialize(mStream, task);
433            SendRequest(new RemoteRequest(RemoteRequest.Function.REPLACE_TASK,
434                mStream.GetBuffer()));
435        }
436
437        public override void Run()
438        {
439        }
440
441        public override void ScheduleTask(Task task)
442        {
443            MemoryStream mStream = new MemoryStream();
444            new BinaryFormatter().Serialize(mStream, task);
445            SendRequest(new RemoteRequest(RemoteRequest.Function.SCHEDULE_TASK,
446                mStream.GetBuffer()));
447        }
448
449        public override void SaveTaskList(Stream stream)
450        {
451            MemoryStream mStream = new MemoryStream();
452            new BinaryFormatter().Serialize(mStream, stream);
453            SendRequest(new RemoteRequest(RemoteRequest.Function.SAVE_TASK_LIST,
454                mStream.GetBuffer()));
455        }
456    }
457}
Note: See TracBrowser for help on using the repository browser.