package javacodebook.xml.transport.jms.pubSub; import javax.jms.*; /** * Die XMLListener-Klasse implementiert das MessageListener-Interface * indem sie die onMessage-Methode überschreibt. An dieser Stelle findet * die Verarbeitung des übertragenen XMLs statt. Hier müsste * geparst und entsprechend weitere Anwendungslogik angestossen werden. * Allesdings ist hier nur eine Pseudoverarbeitung implementiert, die das * empfangene XML schlichtweg in die Standardausgabe schreibt. */ public class XMLListener implements MessageListener { /** * diese Methode ist von dem MessageListener-Interface gefordert. * Ihr wird ein Parameter vom Typ Message übergeben. * Zunächst wird versucht das Message-Objekt in eine TextMessage * zu casten. Falls das gelingt wird schlichtweg der Inhalt der * Textnachricht, also das XML, in den Standardausgabestrom * geschrieben. */ public void onMessage(Message message) { TextMessage msg = null; try { if (message instanceof TextMessage) { // Das Message-Objekt wird in eine TextMessage gecastet. msg = (TextMessage) message; System.out.println("Nachricht empfangen: \n\n" + msg.getText() + "\n\n"); } else { System.out.println("Message of wrong type: " + message.getClass().getName()); } } catch (Exception e) { System.out.println("Ausnahmezustand in onMessage() aufgetreten: " + e.toString()); } } } --- Neue Klasse --- package javacodebook.xml.transport.jms.pubSub; import javax.jms.*; import javax.naming.*; import java.io.*; /** * Die XMLTopicPublisher-Klasse benutzt ein TopicSession-Objekt, um XML-Dokumente in From * von TextMessages zu veröffentlichen. Die XML-Dokumente werden dabei * von dem Dateisystem gelesen und als einfache Strings behandelt. * Das Parsen und eventuelle Validierung muss getrennt geschehen. */ public class XMLTopicPublisher { private static final String USAGE = "\nBenutzerhinweis: javacodebook.xml.transport.jms.pubSub.XMLTopicPublisher " + " [ ...]\n\nwobei\n\n\n" + "der Name der Themas unter dem die Nachrichten zu abbonieren sind\n\n [ ...]\n" + "ein odere mehrer durch Leerzeichen getrennte Dateinamen von XML-Dateien sind, die\n" + " veröffentlicht werden sollen"; /** * An die main-Methode müssen die Parameter und eine durch Leerzeichen getrennte Folge von * Dateinamen [ ...] übergeben werden. Es wird ein Objekt vom Typ * XMLTopicPublisher instanziiert und die Kommandozeilenparameter ausgelesen. * Nun wird auf das XMLTopicPublisher-Objekt die Methode sendDocuments() * aufgerufen, wobei das Thema als erster Parameter übergeben wird, und * die restlichen Parameter in Form eines String-Arrays von XML-Dateinamen übergeben werden. */ public static void main(String args[]) { XMLTopicPublisher xMLTopicPublisher = new XMLTopicPublisher(); if (args.length < 2) { System.out.println(getUsage()); System.exit(1); } else { String topic = args[0]; //Ein neuer String-Array für die XML-Dateinamen wird angelegt. String[] fileNames = new String[args.length - 1]; for (int i = 1; i < args.length; i++) { fileNames[i - 1] = args[i]; } // Das Thema und die XML-Dateinamen in Form eines String-Arrays werden // an die sendDocuments-Methode übergeben. xMLTopicPublisher.sendDocuments(topic, fileNames); } } public void sendDocuments(String topicString, String[] fileNames) { Context jndiContext = null; TopicConnectionFactory topicConnectionFactory = null; TopicConnection topicConnection = null; TopicSession topicSession = null; Topic topic = null; TopicPublisher topicPublisher = null; System.out.println("Das Thema heisst: " + topicString); // Erzeugung eines neuen JNDI API InitialContext Objektes try { jndiContext = new InitialContext(); } catch (NamingException e) { System.out.println("Es konnte kein JNDI API Kontext erstellt werden: " + e.toString()); System.exit(1); } // Look-up der Connection-Factory und des Themas. Falls eines der Objekte nicht // gefunden wird, soll die Anwendung verlassen werden. try { topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup( "TopicConnectionFactory"); topic = (Topic) jndiContext.lookup(topicString); } catch (NamingException e) { System.out.println("JNDI API lookup verfehlt: " + e.toString() + "\n\nentweder die TopicConnectionFactory oder die Warteschlange names " + topicString + " ist nicht beim Namensdienst registriert"); System.exit(1); } try { // Eine neue Connection wird erzeugt. topicConnection = topicConnectionFactory.createTopicConnection(); // Über die TopicConnection wird ein TopicSession-Objekt erzeugt. Dadurch dass true übergeben wird, // ist die Verbindung transaktional - bei false wäre sie das nicht. Als 2. Parameter wir 0 übergeben um // anzudeuten, dass er bei transaktionalen TopicSessions keine Rolle spielt. Falls keine transaktionale // TopicSession erzeugt wird, muss der 2. Parameter einer der Werte: // * Session.AUTO_ACKNOWLEDGE; // * Session.CLIENT_ACKNOWLEDGE; // * Session.DUPS_OK_ACKNOWLEDGE; // sein. topicSession = topicConnection.createTopicSession(true, 0); // Erzeugung eines TopicSender-Objektes über das TopicSession-Objekt topicPublisher = topicSession.createPublisher(topic); // Erzeugung einer TextMessage über das TopicSession-Objekt TextMessage message = topicSession.createTextMessage(); // Für jedes der zu verschickenden XML-Dokumente wird das TextMessage-Objekt // mit dem Text aus der XML-Datei belegt und über die send()-Methode des TopicSession-Objektes // publiziert. for (int i = 0; i < fileNames.length; i++) { String document = loadDocument(fileNames[i]); if (document != null) { message.setText(document); System.out.println("Publiziere Nachricht:\n" + message.getText()); topicPublisher.publish(message); } // Da das TopicSession-Objekt transaktional ist, können die Nachrichten, // die über dieses TopicSession-Objekt publiziert wurden von keinem Empfänger gelesen // werden, bevor nicht die commit()-Methode auf das TopicSession-Objekt aufgerufen wurde. topicSession.commit(); } } catch (JMSException e) { System.out.println("Ausnahmezustand aufgetreten: " + e.toString()); } finally { if (topicConnection != null) { try { // Zum Schluss muss die TopicConnection geschlossen werden, um nicht unnötig Resourcen zu belegen. topicConnection.close(); } catch (JMSException e) {} } } } /** * Diese Methode liest die Datei und gibt dessen Inhalt als * String zurück. */ private String loadDocument(String fileName) { InputStream is = null; String document = ""; try { is = new FileInputStream(fileName); BufferedReader br = new BufferedReader(new InputStreamReader(is)); String line = ""; while (line != null) { document = document + line; line = br.readLine(); } } catch (Exception e) { System.out.println("Probleme beim lesen des Dokuments " + fileName + ": " + e); return null; } return document; } public static String getUsage() { return USAGE; } } --- Neue Klasse --- package javacodebook.xml.transport.jms.pubSub; import java.io.*; import javax.jms.*; import javax.naming.*; /** * Die XMLTopicSubscriber-Klasse benutzt eine Session, um sich für den Empfang von * XML-Dokumente in Form von TextMessages zu subskribieren. Dabei werden XML-Dokumente als * reine Text-Dokumente behandelt. Das Parsen und eventuelle Validierung muss getrennt * geschehen. */ public class XMLTopicSubscriber { private static final String USAGE = "\nBenutzerhinweis: javacodebook.xml.transport.jms.pubSub.XMLTopicSubscriber " + "\n\nwobei\n\n\nddas Thema ist, das subskribiert werden soll"; /** * An die main-Methode muss der Parameter übergeben werden. * In der Methode wird ein XMLTopicSubscriber-Objekt instanziiert, es wird der Kommandozeilen-Parameter * ausgelesen und die receiveMessages()-Methode mit dem Parameter aufgerufen. * Diese Methode empfängt solange Nachrichten bis entweder 'a' oder 'A' in die Standardeingabe * eingegeben wird und mit bestätigt wird. */ public static void main(String args[]) { XMLTopicSubscriber xMLTopicSubscriber = new XMLTopicSubscriber(); String topic = "not set"; if (args.length != 1) { System.out.println(getUsage()); System.exit(1); } else { topic = args[0]; } xMLTopicSubscriber.receiveMessages(topic); } public void receiveMessages(String topicString) { Context jndiContext = null; TopicConnectionFactory topicConnectionFactory = null; TopicConnection topicConnection = null; TopicSession topicSession = null; Topic topic = null; TopicSubscriber topicSubscriber = null; XMLListener xmlListener = null; System.out.println("Das subskribierte Thema lautet: " + topicString); // Erzeugung eines neuen JNDI API InitialContext Objektes try { jndiContext = new InitialContext(); } catch (NamingException e) { System.out.println("Es konnte kein JNDI API Kontext erstellt werden: " + e.toString()); System.exit(1); } // Look-up der Connection-Factory und des Topics. Falls eines der Objekte nicht // gefunden wird, soll die Anwendung verlassen werden. try { topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup( "TopicConnectionFactory"); topic = (Topic) jndiContext.lookup(topicString); } catch (NamingException e) { System.out.println("JNDI API lookup verfehlt: " + e.toString() + "\n\nentweder die TopicConnectionFactory oder die Warteschlange namens " + topicString + " ist nicht beim Namensdienst registriert"); System.exit(1); } try { // Eine neue Connection wird erzeugt. topicConnection = topicConnectionFactory.createTopicConnection(); // Über die Connection wird ein TopicSession-Objekt erzeugt. Dadurch dass true übergeben wird, // ist die Verbindung transaktional - bei false wäre sie das nicht. Als 2. Parameter wir 0 übergeben um // anzudeuten, dass er bei transaktionalen TopicSessions keine Rolle spielt. Falls keine transaktionale // TopicSession erzeugt wird, muss der 2. Parameter einer der Werte: // * Session.AUTO_ACKNOWLEDGE; // * Session.CLIENT_ACKNOWLEDGE; // * Session.DUPS_OK_ACKNOWLEDGE; // sein. topicSession = topicConnection.createTopicSession(true, 0); // Erzeugung eines TopicSubscriber-Objektes über das TopicSession-Objekt topicSubscriber = topicSession.createSubscriber(topic); // ein neues XMLListener-Objekt wird erzeugt welches das MessageListener-Interface // implementiert xmlListener = new XMLListener(); // Das XMLListener-Object wird beim TopicSubscriber-Object registriert. topicSubscriber.setMessageListener(xmlListener); // Bevor die Connection benutzt werden kann, muss sie gestartet werden topicConnection.start(); System.out.println( "Zum Beenden der Anwendung bitte 'a' oder 'A' für abbrechen eingeben " + "dann bitte mit bestätigen"); InputStreamReader inputStreamReader = new InputStreamReader(System.in); char answer = 0; while (! ( (answer == 'a') || (answer == 'A'))) { try { answer = (char) inputStreamReader.read(); } catch (IOException e) { System.out.println("Ausnahmezustand beim Lesen der Standardeingabe: " + e.toString()); } } // Da die TopicSession transaktional ist, muss die commit()-Methode aufgerufen werden um // Locks von den Nachrichten zu entfernen, die über diese TopicSession veröffentlicht wurden. try { topicSession.commit(); } // Falls die Transaktion von Seiten des JMS-Providers nicht 'commited' werden konnte, wird eine JMSException geworfen, die // auch vom Typ der Unterklassen TransactionRolledBackException oder IllegalStateException sein kann. // Es empfielt sich, in solchen Fällen die empfangene Nachricht nicht weiter zu verarbeiten, da sie von dem Provider noch // mal ausgeliefert werden wird und somit eine doppelte Verarbeitung stattfinden würde. catch (JMSException e) { System.out.println("Ausnahmefall eingetreten: " + e); } } catch (JMSException e) { System.out.println("Ausnahmefall eingetreten: " + e.toString()); } finally { if (topicConnection != null) { try { // Die Connection wird geschlossen. topicConnection.close(); } catch (JMSException e) {} } } } public static String getUsage() { return USAGE; } }