1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *
15 *  See the License for the specific language governing permissions and
16 *  limitations under the License.
17 */
18
19/**
20 * @author Anton V. Karnachuk
21 */
22
23/**
24 * Created on 16.03.2005
25 */
26package org.apache.harmony.jpda.tests.framework.jdwp;
27
28import java.util.List;
29import java.io.IOException;
30import java.io.InterruptedIOException;
31import java.util.ArrayList;
32import java.util.Enumeration;
33import java.util.Hashtable;
34
35import org.apache.harmony.jpda.tests.framework.LogWriter;
36import org.apache.harmony.jpda.tests.framework.TestOptions;
37import org.apache.harmony.jpda.tests.framework.jdwp.exceptions.TimeoutException;
38
39/**
40 * This class provides asynchronous sending JDWP commands and receiving JDWP
41 * events through established JDWP connection and supports timeout for these
42 * operations.
43 */
44public class PacketDispatcher extends Thread {
45
46    /**
47     * Variables below are intended only to help with tests failures
48     * investigation. They turn on/off some kinds of trace during
49     * tests execution which can clear up details of test failure.
50     *
51     * commandsNumberForTrace and begCommandIdForTrace define trace
52     * of sent JDWP commands and received replies for these commands:
53     * - begCommandIdForTrace defines starting command ID for trace
54     *   (the first command has ID=1, the second - ID=2 and so on).
55     *   if <= 0 then the same as = 1.
56     * - commandsNumberForTrace defines number of command for trace.
57     *   if <= 0 then commands' trace is off.
58     *
59     * - eventRequestIDForTrace defines trace of received events
60     *   according to request ID value:
61     *   if < 0 then this trace is off;
62     *   if = 0 then trace is for all received events;
63     *   if > 0 then trace is for received events, which are triggered
64     *          by this specified request ID value;
65     *
66     * - eventKindForTrace defines trace of received events
67     *   according to this specified kind of event.
68     *   if = 0 then this trace is off;
69     *   See JDWPConstants.EventKind class for values of
70     *   event kinds.
71     */
72    int begCommandIdForTrace = 1;
73
74    int commandsNumberForTrace = 0;
75
76    int eventRequestIDForTrace = -1;
77
78    byte eventKindForTrace = 0;
79
80    /**
81     * Internal class to synchronize jdwp events. When an event is received it
82     * is stored in eventQueue. If there are any thread that waits for event it
83     * is notified.
84     */
85    private class EventsSynchronyzer {
86
87        /**
88         * List of received events.
89         */
90        private List<EventPacket> eventQueue;
91
92        /**
93         * A default constructor.
94         */
95        EventsSynchronyzer() {
96            // initialize eventQueue
97            eventQueue = new ArrayList<EventPacket>();
98        }
99
100        /**
101         * Notifies thread that the new event has been received.
102         *
103         * @param eventPacket
104         *            instance of EventPacket
105         * @throws InterruptedException
106         */
107        public void notifyThread(EventPacket eventPacket)
108                throws InterruptedException {
109
110            // use this object as lock
111            synchronized (this) {
112                // add the event to eventQueue
113                eventQueue.add(eventPacket);
114
115                // notify next waiting thread
116                this.notify();
117            }
118        }
119
120        /**
121         * Waits for new event during timeout.
122         *
123         * @param timeout
124         *            wait timeout
125         * @return EventPacket
126         * @throws InterruptedException
127         * @throws IOException
128         * @throws TimeoutException
129         *             if no event was received
130         */
131        public EventPacket waitForNextEvent(long timeout)
132                throws InterruptedException, IOException {
133
134            // use this object as lock
135            synchronized (this) {
136
137                // if there is already received event in eventQueue,
138                // then return it
139                synchronized (eventQueue) {
140                    if (!eventQueue.isEmpty()) {
141                        return (EventPacket) eventQueue.remove(0);
142                    }
143
144                    // if eventQueue is empty and connection is already closed
145                    // reraise the exception
146                    if (connectionException != null)
147                        throw connectionException;
148                }
149
150                // wait for the next event
151                this.wait(timeout);
152
153                // We have the following opportunities here -
154                // next event was received, exception in main cyrcle or timeout
155                // happens
156                synchronized (eventQueue) {
157                    if (!eventQueue.isEmpty()) {
158                        // event received
159                        EventPacket event = (EventPacket) eventQueue.remove(0);
160                        return event;
161                    }
162
163                    if (connectionException != null) {
164                        // if eventQueue is empty and connection is already
165                        // closed
166                        // reraise the exception
167                        throw connectionException;
168                    }
169                }
170            }
171
172            // no events were occurred during timeout
173            throw new TimeoutException(false);
174        }
175
176        /**
177         * This method is called when connection is closed. It notifies all the
178         * waiting threads.
179         */
180        public void terminate() {
181            synchronized (this) {
182                this.notifyAll();
183            }
184        }
185    }
186
187    /**
188     * Internal class to synchronize jdwp commands. It sends command packets
189     * through connection and returns replies.
190     */
191    class CommandsSynchronyzer {
192
193        private int commandId;
194
195        private Hashtable<Integer, CommandPacket> commands;
196
197        private Hashtable<Integer, ReplyPacket> replies;
198
199        /**
200         * A default constructor.
201         */
202        CommandsSynchronyzer() {
203            commands = new Hashtable<Integer, CommandPacket>();
204            replies = new Hashtable<Integer, ReplyPacket>();
205
206            // set first command id to 1
207            commandId = 1;
208        }
209
210        /**
211         * Gets the next new id for a command.
212         *
213         * @return int
214         */
215        private synchronized int getNextId() {
216            return commandId++;
217        }
218
219        /**
220         * Notifies thread that reply packet was received.
221         *
222         * @param replyPacket
223         *            instance of ReplyPacket
224         * @throws IOException
225         * @throws InterruptedException
226         */
227        public void notifyThread(ReplyPacket replyPacket) throws IOException,
228                InterruptedException {
229
230            synchronized (commands) {
231
232                // obtain the current command id
233                Integer Id = new Integer(replyPacket.getId());
234
235                // obtain the current command packet by command id
236                CommandPacket command = (CommandPacket) commands.remove(Id);
237                if (command == null) {
238                    // we received reply's id that does not correspond to any
239                    // command
240                    throw new IOException(
241                            "Reply id is corresponded to no command. Id = "
242                                    + Id);
243                }
244
245                synchronized (command) {
246                    // put the reply in replies queue
247                    synchronized (replies) {
248                        replies.put(new Integer(replyPacket.getId()),
249                                replyPacket);
250                    }
251                    // notify waiting thread
252                    command.notifyAll();
253                }
254            }
255        }
256
257        /**
258         * Sends command and waits for the reply during timeout.
259         *
260         * @param command
261         *            instance of CommandPacket
262         * @param timeout
263         *            reply wait timeout
264         * @return
265         * @throws TimeoutException
266         *             if no reply was received
267         */
268        public ReplyPacket waitForReply(CommandPacket command, long timeout)
269                throws InterruptedException, IOException {
270
271            synchronized (command) {
272
273                // if connection is already closed reraise the exception
274                if (connectionException != null)
275                    throw connectionException;
276
277                // obtain new command id
278                Integer Id = new Integer(getNextId());
279                command.setId(Id.intValue());
280
281                // add command into commands hashtable
282                synchronized (commands) {
283                    commands.put(Id, command);
284
285                    // below is trace for sent coomasnds
286                    if (commandsNumberForTrace > 0) {
287                        int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace
288                                : 1;
289                        if (Id.intValue() >= begCommandId) {
290                            if ((Id.intValue() - begCommandId) < commandsNumberForTrace) {
291                                logWriter
292                                        .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = "
293                                                + Id.intValue()
294                                                + "; CommandSet = "
295                                                + command.getCommandSet()
296                                                + "; Command = "
297                                                + command.getCommand() + "...");
298                            }
299                        }
300                    }
301
302                    // write this package to connection
303                    connection.writePacket(command.toBytesArray());
304                }
305
306                // if connection is already closed reraise the exception
307                if (connectionException != null)
308                    throw connectionException;
309
310                // wait for reply
311                command.wait(timeout);
312
313                // receive the reply
314                ReplyPacket currentReply = null;
315                synchronized (replies) {
316                    currentReply = (ReplyPacket) replies.remove(Id);
317                }
318
319                // if reply is ok, return it
320                if (currentReply != null) {
321                    return currentReply;
322                }
323
324                // if connection is already closed reraise the exception
325                if (connectionException != null)
326                    throw connectionException;
327            }
328
329            // no event was occurred during timeout
330            throw new TimeoutException(false);
331        }
332
333        /**
334         * Sends command without waiting for the reply and returns id of the
335         * sent command.
336         *
337         * @param command
338         *            instance of CommandPacket
339         * @return command id
340         * @throws IOException
341         */
342        public int sendCommand(CommandPacket command) throws IOException {
343
344            // if connection is already closed reraise the exception
345            if (connectionException != null)
346                throw connectionException;
347
348            // obtain new command id
349            Integer Id = new Integer(getNextId());
350            command.setId(Id.intValue());
351
352            // add command into commands hashtable
353            synchronized (commands) {
354                commands.put(Id, command);
355
356                // below is trace for sent coomasnds
357                if (commandsNumberForTrace > 0) {
358                    int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace
359                            : 1;
360                    if (Id.intValue() >= begCommandId) {
361                        if ((Id.intValue() - begCommandId) < commandsNumberForTrace) {
362                            logWriter
363                                    .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = "
364                                            + Id.intValue()
365                                            + "; CommandSet = "
366                                            + command.getCommandSet()
367                                            + "; Command = "
368                                            + command.getCommand() + "...");
369                        }
370                    }
371                }
372
373                // write this package to connection
374                connection.writePacket(command.toBytesArray());
375            }
376
377            // if connection is already closed reraise the exception
378            if (connectionException != null) {
379                throw connectionException;
380            }
381
382            return Id.intValue();
383
384        }
385
386        /**
387         * Receives the reply during timeout for command with specified command
388         * ID.
389         *
390         * @param commandId
391         *            id of previously sent commend
392         * @param timeout
393         *            receive timeout
394         * @return received ReplyPacket
395         * @throws TimeoutException
396         *             if no reply was received
397         */
398        public ReplyPacket receiveReply(int commandId, long timeout)
399                throws InterruptedException, IOException {
400
401            // if connection is already closed reraise the exception
402            if (connectionException != null)
403                throw connectionException;
404
405            // receive the reply
406            ReplyPacket currentReply = null;
407            long endTimeMlsecForWait = System.currentTimeMillis() + timeout;
408            synchronized (replies) {
409                while (true) {
410                    currentReply = (ReplyPacket) replies.remove(new Integer(
411                            commandId));
412                    // if reply is ok, return it
413                    if (currentReply != null) {
414                        return currentReply;
415                    }
416                    // if connection is already closed reraise the exception
417                    if (connectionException != null) {
418                        throw connectionException;
419                    }
420                    if (System.currentTimeMillis() >= endTimeMlsecForWait) {
421                        break;
422                    }
423                    replies.wait(100);
424                }
425            }
426            // no expected reply was found during timeout
427            throw new TimeoutException(false);
428        }
429
430        /**
431         * This method is called when connection is closed. It notifies all the
432         * waiting threads.
433         *
434         */
435        public void terminate() {
436
437            synchronized (commands) {
438                // enumerate all waiting commands
439                for (Enumeration en = commands.keys(); en.hasMoreElements();) {
440                    CommandPacket command = (CommandPacket) commands.get(en
441                            .nextElement());
442                    synchronized (command) {
443                        // notify the waiting object
444                        command.notifyAll();
445                    }
446                }
447            }
448        }
449    }
450
451    /** Transport which is used to sent and receive packets. */
452    private TransportWrapper connection;
453
454    /** Current test run configuration. */
455    TestOptions config;
456
457    private CommandsSynchronyzer commandsSynchronyzer;
458
459    private EventsSynchronyzer eventsSynchronyzer;
460
461    private LogWriter logWriter;
462
463    private IOException connectionException;
464
465    /**
466     * Creates new PacketDispatcher instance.
467     *
468     * @param connection
469     *            open connection for reading and writing packets
470     * @param config
471     *            test run options
472     * @param logWriter
473     *            LogWriter object
474     */
475    public PacketDispatcher(TransportWrapper connection, TestOptions config,
476            LogWriter logWriter) {
477
478        this.connection = connection;
479        this.config = config;
480        this.logWriter = logWriter;
481
482        commandsSynchronyzer = new CommandsSynchronyzer();
483        eventsSynchronyzer = new EventsSynchronyzer();
484
485        // make thread daemon
486        setDaemon(true);
487
488        // start the thread
489        start();
490    }
491
492    /**
493     * Reads packets from connection and dispatches them between waiting
494     * threads.
495     */
496    public void run() {
497
498        connectionException = null;
499
500        try {
501            // start listening for replies
502            while (!isInterrupted()) {
503
504                // read packet from transport
505                byte[] packet = connection.readPacket();
506
507                // break cycle if empty packet
508                if (packet == null || packet.length == 0)
509                    break;
510
511                // check flags
512                if (packet.length < Packet.FLAGS_INDEX) {
513                    logWriter
514                            .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet size = "
515                                    + packet.length);
516                } else {
517                    int flag = packet[Packet.FLAGS_INDEX] & 0xFF;
518                    if (flag != 0) {
519                        if (flag != Packet.REPLY_PACKET_FLAG) {
520                            logWriter
521                                    .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet flags = "
522                                            + Integer.toHexString(flag));
523                        }
524                    }
525                }
526
527                // check the reply flag
528                if (Packet.isReply(packet)) {
529                    // new reply
530                    ReplyPacket replyPacket = new ReplyPacket(packet);
531
532                    // check for received reply packet length
533                    int packetLength = replyPacket.getLength();
534                    if (packetLength < Packet.HEADER_SIZE) {
535                        logWriter
536                                .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet length = "
537                                        + packetLength);
538                    }
539
540                    // below is trace for received coomasnds
541                    if (commandsNumberForTrace > 0) {
542                        int replyID = replyPacket.getId();
543                        int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace
544                                : 1;
545                        if (replyID >= begCommandId) {
546                            if ((replyID - begCommandId) < commandsNumberForTrace) {
547                                logWriter
548                                        .println(">>>>>>>>>> PacketDispatcher: Received REPLY ID = "
549                                                + replyID);
550                            }
551                        }
552                    }
553
554                    commandsSynchronyzer.notifyThread(replyPacket);
555                } else {
556                    // new event
557                    EventPacket eventPacket = new EventPacket(packet);
558                    // below is to check received events for correctness
559
560                    // below is trace for received events
561                    ParsedEvent[] parsedEvents = ParsedEvent
562                            .parseEventPacket(eventPacket);
563                    if ((eventRequestIDForTrace >= 0)
564                            || (eventKindForTrace > 0)) {
565                        for (int i = 0; i < parsedEvents.length; i++) {
566                            boolean trace = false;
567                            int eventRequestID = parsedEvents[i].getRequestID();
568                            if (eventRequestIDForTrace == 0) {
569                                trace = true;
570                            } else {
571                                if (eventRequestID == eventRequestIDForTrace) {
572                                    trace = true;
573                                }
574                            }
575                            byte eventKind = parsedEvents[i].getEventKind();
576                            if (eventKind == eventKindForTrace) {
577                                trace = true;
578                            }
579                            if (trace) {
580                                logWriter
581                                        .println(">>>>>>>>>> PacketDispatcher: Received_EVENT["
582                                                + i
583                                                + "]: eventRequestID= "
584                                                + eventRequestID
585                                                + "; eventKind =  "
586                                                + eventKind
587                                                + "("
588                                                + JDWPConstants.EventKind
589                                                        .getName(eventKind)
590                                                + ")");
591                            }
592                        }
593                    }
594                    eventsSynchronyzer.notifyThread(eventPacket);
595                }
596            }
597
598            // this exception is send for all waiting threads
599            connectionException = new TimeoutException(true);
600        } catch (IOException e) {
601            // connection exception is send for all waiting threads
602            connectionException = e;
603
604            // print stack trace
605            e.printStackTrace();
606        } catch (InterruptedException e) {
607            // connection exception is send for all waiting threads
608            connectionException = new InterruptedIOException(e.getMessage());
609            connectionException.initCause(e);
610
611            // print stack trace
612            e.printStackTrace();
613        }
614
615        // notify all the waiting threads
616        eventsSynchronyzer.terminate();
617        commandsSynchronyzer.terminate();
618    }
619
620    /**
621     * Receives event from event queue if there are any events or waits during
622     * timeout for any event occurrence. This method should not be used
623     * simultaneously from different threads. If there were no reply during the
624     * timeout, TimeoutException is thrown.
625     *
626     * @param timeout
627     *            timeout in milliseconds
628     * @return received event packet
629     * @throws IOException
630     *             is any connection error occurred
631     * @throws InterruptedException
632     *             if reading packet was interrupted
633     * @throws TimeoutException
634     *             if timeout exceeded
635     */
636    public EventPacket receiveEvent(long timeout) throws IOException,
637            InterruptedException, TimeoutException {
638
639        return eventsSynchronyzer.waitForNextEvent(timeout);
640    }
641
642    /**
643     * Sends JDWP command packet and waits for reply packet during default
644     * timeout. If there were no reply packet during the timeout,
645     * TimeoutException is thrown.
646     *
647     * @return received reply packet
648     * @throws InterruptedException
649     *             if reading packet was interrupted
650     * @throws IOException
651     *             if any connection error occurred
652     * @throws TimeoutException
653     *             if timeout exceeded
654     */
655    public ReplyPacket performCommand(CommandPacket command)
656            throws InterruptedException, IOException, TimeoutException {
657
658        return performCommand(command, config.getTimeout());
659    }
660
661    /**
662     * Sends JDWP command packet and waits for reply packet during certain
663     * timeout. If there were no reply packet during the timeout,
664     * TimeoutException is thrown.
665     *
666     * @param command
667     *            command packet to send
668     * @param timeout
669     *            timeout in milliseconds
670     * @return received reply packet
671     * @throws InterruptedException
672     *             if packet reading was interrupted
673     * @throws IOException
674     *             if any connection error occurred
675     * @throws TimeoutException
676     *             if timeout exceeded
677     */
678    public ReplyPacket performCommand(CommandPacket command, long timeout)
679            throws InterruptedException, IOException, TimeoutException {
680
681        return commandsSynchronyzer.waitForReply(command, timeout);
682    }
683
684    /**
685     * Sends CommandPacket to debuggee VM without waiting for the reply. This
686     * method is intended for special cases when there is need to divide
687     * command's performing into two actions: command's sending and receiving
688     * reply (e.g. for asynchronous JDWP commands' testing). After this method
689     * the 'receiveReply()' method must be used latter for receiving reply for
690     * sent command. It is NOT recommended to use this method for usual cases -
691     * 'performCommand()' method must be used.
692     *
693     * @param command
694     *            Command packet to be sent
695     * @return command ID of sent command
696     * @throws IOException
697     *             if any connection error occurred
698     */
699    public int sendCommand(CommandPacket command) throws IOException {
700        return commandsSynchronyzer.sendCommand(command);
701    }
702
703    /**
704     * Waits for reply for command which was sent before by 'sendCommand()'
705     * method. Specified timeout is used as time limit for waiting. This method
706     * (jointly with 'sendCommand()') is intended for special cases when there
707     * is need to divide command's performing into two actions: command's
708     * sending and receiving reply (e.g. for asynchronous JDWP commands'
709     * testing). It is NOT recommended to use 'sendCommand()- receiveReply()'
710     * pair for usual cases - 'performCommand()' method must be used.
711     *
712     * @param commandId
713     *            Command ID of sent before command, reply from which is
714     *            expected to be received
715     * @param timeout
716     *            Specified timeout in milliseconds to wait for reply
717     * @return received ReplyPacket
718     * @throws IOException
719     *             if any connection error occurred
720     * @throws InterruptedException
721     *             if reply packet's waiting was interrupted
722     * @throws TimeoutException
723     *             if timeout exceeded
724     */
725    public ReplyPacket receiveReply(int commandId, long timeout)
726            throws InterruptedException, IOException, TimeoutException {
727        return commandsSynchronyzer.receiveReply(commandId, timeout);
728    }
729
730}
731