source: branches/eraser6/Manager/RemoteExecutor.cs @ 615

Revision 615, 10.4 KB checked in by cjax, 6 years ago (diff)

Still Client could not connect to the server, however, the back bone is there.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1/*
2 * $Id$
3 * Copyright 2008 The Eraser Project
4 * Original Author: Kasra Nassiri <cjax@users.sourceforge.net>
5 * Modified By: Joel Low <lowjoel@users.sourceforge.net>
6 * Modified By:
7 *
8 * This file is part of Eraser.
9 *
10 * Eraser is free software: you can redistribute it and/or modify it under the
11 * terms of the GNU General Public License as published by the Free Software
12 * Foundation, either version 3 of the License, or (at your option) any later
13 * version.
14 *
15 * Eraser is distributed in the hope that it will be useful, but WITHOUT ANY
16 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
17 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 *
19 * A copy of the GNU General Public License can be found at
20 * <http://www.gnu.org/licenses/>.
21 */
22
23using System;
24using System.IO;
25using System.Text;
26using System.IO.Pipes;
27using System.Threading;
28using System.Collections.Generic;
29
30using System.Runtime.Serialization;
31using System.Runtime.Remoting.Messaging;
32using System.Runtime.Serialization.Formatters.Binary;
33
34namespace Eraser.Manager
35{
36    // we allways pass complete tasks accross our server/clinet
37    // streams
38    public class RemoteExecutorServer : DirectExecutor
39    {
40        public const string ServerName = "EraserRemoteExecutorServer";
41
42        private Thread thread = null;
43        private NamedPipeServerStream server =
44            new NamedPipeServerStream(ServerName, PipeDirection.InOut, 32,
45                PipeTransmissionMode.Message, PipeOptions.Asynchronous);
46
47        public RemoteExecutorServer()
48            : base()
49        {
50            thread = new Thread(Main);
51            thread.Start();
52
53            Thread.Sleep(0);
54        }
55
56        ~RemoteExecutorServer()
57        {
58            thread.Interrupt();
59            Abort();
60        }
61
62        public void Abort()
63        {
64            thread.Abort();
65        }
66
67        private void Main()
68        {
69            byte[] buffer = new byte[32768];
70            MemoryStream mstream = new MemoryStream();
71
72            while (Thread.CurrentThread.ThreadState != ThreadState.AbortRequested)
73            {
74                if (!server.IsConnected)
75                    server.WaitForConnection();
76
77                while (server.Position < server.Length)
78                    mstream.Write(buffer, 0, server.Read(buffer, 0, buffer.Length));
79
80                // the value should not stay null since we have to deserialise it
81                object returnValue = null;
82                using (RemoteExecutorClient.RemoteHeader data = (RemoteExecutorClient.RemoteHeader)
83                    new BinaryFormatter().Deserialize(mstream))
84                {
85                    data.SerializationStream.Position = 0;
86
87                    uint taskId = 0;
88                    Task task = null;
89                    Stream stream = null;
90
91                    #region Deserialise
92                    switch (data.Function)
93                    {
94                        // void \+ task
95                        case RemoteExecutorClient.Function.CANCEL_TASK:
96                        // void \+ task
97                        case RemoteExecutorClient.Function.QUEUE_TASK:
98                        // void \+ task
99                        case RemoteExecutorClient.Function.REPLACE_TASK:
100                        // void \+ task
101                        case RemoteExecutorClient.Function.SCHEDULE_TASK:
102                        // void \+ ref task
103                        case RemoteExecutorClient.Function.ADD_TASK:
104                            task = (Task)new BinaryFormatter().Deserialize(data.SerializationStream);
105                            returnValue = new object();
106                            break;
107
108                        // bool \+ taskid
109                        case RemoteExecutorClient.Function.DELETE_TASK:
110                        // task \+ taskid
111                        case RemoteExecutorClient.Function.GET_TASK:
112                            taskId = (uint)new BinaryFormatter().Deserialize(data.SerializationStream);
113                            break;
114
115                        // void \+ stream
116                        case RemoteExecutorClient.Function.LOAD_TASK_LIST:
117                        // void \+ stream
118                        case RemoteExecutorClient.Function.SAVE_TASK_LIST:
119                            stream = (Stream)new BinaryFormatter().Deserialize(data.SerializationStream);
120                            returnValue = new object();
121                            break;
122
123                        // list<task> \+ void
124                        case RemoteExecutorClient.Function.GET_TASKS:
125                        // void \+ void
126                        case RemoteExecutorClient.Function.QUEUE_RESTART_TASK:
127                        // void \+ void
128                        case RemoteExecutorClient.Function.RUN:
129                            returnValue = new object();
130                            break;
131
132                        default:
133                            throw new FatalException("Unknown RemoteExecutorClient.Function");
134                    }
135                    #endregion
136
137                    #region Invoke
138                    switch (data.Function)
139                    {
140                        // void \+ task
141                        case RemoteExecutorClient.Function.CANCEL_TASK:
142                            CancelTask(task);
143                            break;
144
145                        // void \+ task
146                        case RemoteExecutorClient.Function.QUEUE_TASK:
147                            QueueTask(task);
148                            break;
149
150                        // void \+ task
151                        case RemoteExecutorClient.Function.REPLACE_TASK:
152                            ReplaceTask(task);
153                            break;
154
155                        // void \+ task
156                        case RemoteExecutorClient.Function.SCHEDULE_TASK:
157                            ScheduleTask(task);
158                            break;
159
160                        // void \+ ref task
161                        case RemoteExecutorClient.Function.ADD_TASK:
162                            AddTask(ref task);
163                            break;
164
165                        // bool \+ taskid
166                        case RemoteExecutorClient.Function.DELETE_TASK:
167                            returnValue = DeleteTask(taskId);
168                            break;
169
170                        // task \+ taskid
171                        case RemoteExecutorClient.Function.GET_TASK:
172                            returnValue = GetTask(taskId);
173                            break;
174
175                        // void \+ stream
176                        case RemoteExecutorClient.Function.LOAD_TASK_LIST:
177                            LoadTaskList(stream);
178                            break;
179
180                        // void \+ stream
181                        case RemoteExecutorClient.Function.SAVE_TASK_LIST:
182                            SaveTaskList(stream);
183                            break;
184
185                        // list<task> \+ void
186                        case RemoteExecutorClient.Function.GET_TASKS:
187                            returnValue = GetTasks();
188                            break;
189
190                        // void \+ void
191                        case RemoteExecutorClient.Function.QUEUE_RESTART_TASK:
192                            QueueRestartTasks();
193                            break;
194
195                        // void \+ void
196                        case RemoteExecutorClient.Function.RUN:
197                            Run();
198                            break;
199
200                        default:
201                            throw new FatalException("Unknown RemoteExecutorClient.Function");
202                    }
203                    #endregion
204                }
205
206                // return the returnValue and disconnect
207                using (MemoryStream ms = new MemoryStream())
208                {
209                    new BinaryFormatter().Serialize(ms, returnValue);
210                    server.Write(ms.GetBuffer(), 0, ms.GetBuffer().Length);
211                }
212
213                // we are done, disconnect
214                server.Disconnect();
215            }
216        }
217    }
218
219    public class RemoteExecutorClient : Executor
220    {
221        public static int Instances = 0;
222        public const string ClientName = "EraserRemoteExecutorClient_";
223
224        private NamedPipeClientStream client = 
225            new NamedPipeClientStream(RemoteExecutorServer.ServerName,
226                ClientName + Instances.ToString(), PipeDirection.InOut);
227
228        public enum Function : uint
229        {
230            ADD_TASK = 0,
231            CANCEL_TASK,
232            DELETE_TASK,
233            GET_TASK,
234            GET_TASKS,
235            RUN,
236            QUEUE_RESTART_TASK,
237            QUEUE_TASK,
238            REPLACE_TASK,
239            LOAD_TASK_LIST,
240            SCHEDULE_TASK,
241            SAVE_TASK_LIST,
242        }
243
244        public RemoteExecutorClient()
245        {
246            Instances += 1;
247        }
248
249        public override void Dispose()
250        {
251            client.Close();
252            client.Dispose();
253        }
254
255        public class RemoteHeader : IDisposable
256        {
257            public void Dispose()
258            {
259            }
260
261            public Function Function;
262            public Stream SerializationStream = new MemoryStream();
263        };
264
265        private object Communicate(RemoteHeader header)
266        {
267            // initialise client and connect to the server
268            object results = null;
269            IAsyncResult asyncResult = null;
270            client = new NamedPipeClientStream(RemoteExecutorServer.ServerName,
271                ClientName + Instances.ToString(), PipeDirection.InOut);
272           
273            // wait for a connection for at least 5s
274            client.Connect(5000);   
275
276            // serialise the data
277            using (MemoryStream ms = new MemoryStream())
278            {
279                byte[] buffer = new byte[32768];
280
281                new BinaryFormatter().Serialize(ms, header);
282                long clinetPos = client.Position;
283
284                // write async
285                (asyncResult = client.BeginWrite(ms.GetBuffer(), 0, ms.GetBuffer().Length,
286                    delegate(IAsyncResult ar)
287                    {
288                        // completed, might throw
289                        client.EndWrite(ar);
290
291                        ms.Position = 0;
292                        ms.Capacity = (int)(client.Length - (client.Position = clinetPos));
293
294                        while (client.Position < client.Length)
295                            ms.Write(buffer, 0, client.Read(buffer, 0, buffer.Length));
296
297                        // deserialise the result
298                        results = new BinaryFormatter().Deserialize(ms);
299
300                    }, this)).AsyncWaitHandle.WaitOne();
301            }
302
303            return results;
304        }
305
306        public override bool DeleteTask(uint taskId)
307        {
308            RemoteHeader rh = new RemoteHeader();
309            rh.Function = Function.DELETE_TASK;
310            new BinaryFormatter().Serialize(rh.SerializationStream, taskId);
311            return (bool)Communicate(rh);
312        }
313
314        public override List<Task> GetTasks()
315        {
316            RemoteHeader rh = new RemoteHeader();
317            rh.Function = Function.GET_TASK;
318            new BinaryFormatter().Serialize(rh.SerializationStream, null);
319            return (List<Task>)Communicate(rh);
320        }
321
322        public override Task GetTask(uint taskId)
323        {
324            RemoteHeader rh = new RemoteHeader();
325            rh.Function = Function.GET_TASK;
326            new BinaryFormatter().Serialize(rh.SerializationStream, null);
327            return (Task)Communicate(rh);
328        }
329
330        public override void LoadTaskList(Stream stream)
331        {
332            RemoteHeader rh = new RemoteHeader();
333            rh.Function = Function.LOAD_TASK_LIST;
334            new BinaryFormatter().Serialize(rh.SerializationStream, stream);
335            Communicate(rh);
336        }
337
338        public override void AddTask(ref Task task)
339        {
340            RemoteHeader rh = new RemoteHeader();
341            rh.Function = Function.ADD_TASK;
342            new BinaryFormatter().Serialize(rh.SerializationStream, task);
343            Communicate(rh);
344        }
345
346        public override void CancelTask(Task task)
347        {
348            RemoteHeader rh = new RemoteHeader();
349            rh.Function = Function.CANCEL_TASK;
350            new BinaryFormatter().Serialize(rh.SerializationStream, task);
351            Communicate(rh);
352        }
353
354        public override void QueueRestartTasks()
355        {
356            RemoteHeader rh = new RemoteHeader();
357            rh.Function = Function.QUEUE_RESTART_TASK;
358            new BinaryFormatter().Serialize(rh.SerializationStream, null);
359            Communicate(rh);
360        }
361
362        public override void QueueTask(Task task)
363        {
364            RemoteHeader rh = new RemoteHeader();
365            rh.Function = Function.QUEUE_TASK;
366            new BinaryFormatter().Serialize(rh.SerializationStream, null);
367            Communicate(rh);
368        }
369
370        public override void ReplaceTask(Task task)
371        {
372            RemoteHeader rh = new RemoteHeader();
373            rh.Function = Function.REPLACE_TASK;
374            new BinaryFormatter().Serialize(rh.SerializationStream, null);
375            Communicate(rh);
376        }
377
378        public override void Run()
379        {
380            RemoteHeader rh = new RemoteHeader();
381            rh.Function = Function.RUN;
382            new BinaryFormatter().Serialize(rh.SerializationStream, null);
383            Communicate(rh);
384        }
385
386        public override void ScheduleTask(Task task)
387        {
388            RemoteHeader rh = new RemoteHeader();
389            rh.Function = Function.SCHEDULE_TASK;
390            new BinaryFormatter().Serialize(rh.SerializationStream, null);
391            Communicate(rh);
392        }
393
394        public override void SaveTaskList(Stream stream)
395        {
396            RemoteHeader rh = new RemoteHeader();
397            rh.Function = Function.SAVE_TASK_LIST;
398            new BinaryFormatter().Serialize(rh.SerializationStream, null);
399            Communicate(rh);
400        }
401    }
402}
Note: See TracBrowser for help on using the repository browser.