// Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl> // Distributable under LGPL license. See terms of license at gnu.org. package nl.justobjects.pushlet.client; import nl.justobjects.pushlet.core.Event; import nl.justobjects.pushlet.core.EventParser; import nl.justobjects.pushlet.core.Protocol; import nl.justobjects.pushlet.util.PushletException; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.io.OutputStream; import java.net.*; import java.util.Map; /** * Client API for Java HTTP client applets or apps. * <p/> * Use this class within Java client applications or applets. * Implement a PushletClientListener to receive callbacks for * data-related Event objects pushed by the server. * * @author Just van den Broecke - Just Objects © * @version $Id: PushletClient.java,v 1.18 2007/11/10 13:52:47 justb Exp $ * @see PushletClientListener * @see nl.justobjects.pushlet.test.PushletApplet * @see nl.justobjects.pushlet.test.PushletPingApplication */ public class PushletClient implements Protocol { /** * Pushlet URL. */ private String pushletURL; /** * Debug flag for verbose output. */ private boolean debug; /** * Id gotten on join ack */ private String id; /** * Internal listener for data events pushed by server. */ private DataEventListener dataEventListener; /** * Constructor with full pushlet URL. */ public PushletClient(String aPushletURL) { pushletURL = aPushletURL; } /** * Constructor with host and port using default URI. */ public PushletClient(String aHost, int aPort) { this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI); } /** * Set proxy options and optional proxy authentication. * <p/> * Contributed by Dele Olajide * See http://groups.yahoo.com/group/pushlet/message/634 * <p/> * Usage: * PushletClient pushletClient = new PushletClient("http:://www.domain.com/pushlet"); * pushletClient.setProxyOptions("proxy.bla.com", "8080", ....); * <p/> * use pushletClient further as normal */ public void setProxyOptions(String aProxyHost, String aProxyPort, String theNonProxyHosts, String aUserName, String aPassword, String anNTLMDomain) { // Enable proxying System.setProperty("http.proxySet", "true"); System.setProperty("http.proxyHost", aProxyHost); System.setProperty("http.proxyPort", aProxyPort); // Set optional non-proxy hosts if (theNonProxyHosts != null) { System.setProperty("http.nonProxyHosts", theNonProxyHosts); } // If user name specified configure proxy authentication if (aUserName != null) { System.setProperty("http.proxyUser", aUserName); System.setProperty("http.proxyPassword", aPassword); // See inner class below Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword)); // Optional NT domain if (anNTLMDomain != null) { System.setProperty("http.auth.ntlm.domain", anNTLMDomain); } } } /** * Join server, starts session. */ public void join() throws PushletException { Event event = new Event(E_JOIN); event.setField(P_FORMAT, FORMAT_XML); Event response = doControl(event); throwOnNack(response); // Join Ack received id = response.getField(P_ID); } /** * Leave server, stops session. */ public void leave() throws PushletException { stopListen(); throwOnInvalidSession(); Event event = new Event(E_LEAVE); event.setField(P_ID, id); Event response = doControl(event); throwOnNack(response); id = null; } /** * Open data channel. */ public void listen(PushletClientListener aListener) throws PushletException { listen(aListener, MODE_STREAM); } /** * Open data channel in stream or push mode. */ public void listen(PushletClientListener aListener, String aMode) throws PushletException { listen(aListener, aMode, null); } /** * Open data channel in stream or push mode with a subject. */ public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException { throwOnInvalidSession(); stopListen(); String listenURL = pushletURL + "?" + P_EVENT + "=" + E_LISTEN + "&" + P_ID + "=" + id + "&" + P_MODE + "=" + aMode; if (aSubject != null) { listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject; } // Start listener thread (sync call). startDataEventListener(aListener, listenURL); } /** * Immediate listener: joins/subscribes and listens in one action. */ public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException { stopListen(); String listenURL = pushletURL + "?" + P_EVENT + "=" + E_JOIN_LISTEN + "&" + P_FORMAT + "=" + FORMAT_XML + "&" + P_MODE + "=" + aMode + "&" + P_SUBJECT + "=" + aSubject; // Start listener thread (sync call). startDataEventListener(aListener, listenURL); } /** * Publish an event through server. */ public void publish(String aSubject, Map theAttributes) throws PushletException { throwOnInvalidSession(); Event event = new Event(E_PUBLISH, theAttributes); event.setField(P_SUBJECT, aSubject); event.setField(P_ID, id); Event response = doControl(event); throwOnNack(response); } /** * Subscribes, returning subscription id. */ public String subscribe(String aSubject, String aLabel) throws PushletException { throwOnInvalidSession(); Event event = new Event(E_SUBSCRIBE); event.setField(P_ID, id); event.setField(P_SUBJECT, aSubject); // Optional label, is returned in data events if (aLabel != null) { event.setField(P_SUBSCRIPTION_LABEL, aLabel); } // Send request Event response = doControl(event); throwOnNack(response); return response.getField(P_SUBSCRIPTION_ID); } /** * Subscribes, returning subscription id. */ public String subscribe(String aSubject) throws PushletException { return subscribe(aSubject, null); } /** * Unsubscribes with subscription id. */ public void unsubscribe(String aSubscriptionId) throws PushletException { throwOnInvalidSession(); Event event = new Event(E_UNSUBSCRIBE); event.setField(P_ID, id); // Optional subscription id if (aSubscriptionId != null) { event.setField(P_SUBSCRIPTION_ID, aSubscriptionId); } Event response = doControl(event); throwOnNack(response); } /** * Unsubscribes from all subjects. */ public void unsubscribe() throws PushletException { unsubscribe(null); } /** * Stop the listener. */ public void stopListen() throws PushletException { if (dataEventListener != null) { unsubscribe(); dataEventListener.stop(); dataEventListener = null; } } public void setDebug(boolean b) { debug = b; } /** * Starts DataEventListener and waits for its thread to start. */ protected void startDataEventListener(PushletClientListener aListener, String aListenURL) { // Suggestion by Jeff Nowakowski 29.oct.2006 dataEventListener = new DataEventListener(aListener, aListenURL); synchronized (dataEventListener) { dataEventListener.start(); try { // Wait for data event listener (thread) to start dataEventListener.wait(); } catch (InterruptedException e) { } } } private void throwOnNack(Event anEvent) throws PushletException { if (anEvent.getEventType().equals(E_NACK)) { throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON)); } } private void throwOnInvalidSession() throws PushletException { if (id == null) { throw new PushletException("Invalid pushlet session"); } } private Reader openURL(String aURL) throws PushletException { // Open URL connection with server try { p("Connecting to " + aURL); URL url = new URL(aURL); URLConnection urlConnection = url.openConnection(); // Disable any kind of caching. urlConnection.setUseCaches(false); urlConnection.setDefaultUseCaches(false); urlConnection.getRequestProperties(); // TODO: later version may use POST // Enable HTTP POST // urlConnection.setDoOutput(true); // Do the POST with Event in XML in body // OutputStream os = urlConnection.getOutputStream(); // os.write(anEvent.toXML().getBytes()); // os.flush(); // os.close(); // Get the stream from the server. // reader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream())); // Note: somehow the client does not work with some JVMs when using // BufferedInputStream... So do unbuffered input. // p("Opening urlConnection inputstream"); return new InputStreamReader(urlConnection.getInputStream(),"UTF-8"); } catch (Throwable t) { warn("openURL() could not open " + aURL, t); throw new PushletException(" could not open " + aURL, t); } } private Event doControl(Event aControlEvent) throws PushletException { String controlURL = pushletURL + "?" + aControlEvent.toQueryString(); p("doControl to " + controlURL); // Open URL connection with server Reader reader = openURL(controlURL); // Get Pushlet event from stream Event event = null; try { p("Getting event..."); // Get next event from server event = EventParser.parse(reader); p("Event received " + event); return event; } catch (Throwable t) { // Stop and report error. warn("doControl() exception", t); throw new PushletException(" error parsing response from" + controlURL, t); } } /** * Util: print. */ private void p(String s) { if (debug) { System.out.println("[PushletClient] " + s); } } /** * Util: warn. */ private void warn(String s) { warn(s, null); } /** * Util: warn with exception. */ private void warn(String s, Throwable t) { System.err.println("[PushletClient] - WARN - " + s + " ex=" + t); if (t != null) { t.printStackTrace(); } } /** * Internal listener for the Pushlet data channel. */ private class DataEventListener implements Runnable { /** * Client's listener that gets called back on events. */ private PushletClientListener listener; /** * Receiver receiveThread. */ private Thread receiveThread = null; private Reader reader; private String refreshURL; private String listenURL; public DataEventListener(PushletClientListener aListener, String aListenURL) { listener = aListener; listenURL = aListenURL; } public void start() { // All ok: start a receiver receiveThread receiveThread = new Thread(this); receiveThread.start(); } /** * Stop listening; may restart later with start(). */ public void stop() { p("In stop()"); bailout(); } /** * Receive event objects from server and callback listener. */ public void run() { p("Start run()"); try { while (receiveThread != null && receiveThread.isAlive()) { // Connect to server reader = openURL(listenURL); synchronized (this) { // Inform the calling thread we're ready to receive events. // Suggestion by Jeff Nowakowski 29.oct.2006 this.notify(); } // Get events while we're alive. while (receiveThread != null && receiveThread.isAlive()) { Event event = null; try { // p("Getting event..."); // Get next event from server event = EventParser.parse(reader); p("Event received " + event); } catch (Throwable t) { // Stop and report error. // warn("Stop run() on exception", t); if (listener != null) { listener.onError("exception during receive: " + t); } break; } // Handle event by calling listener if (event != null && listener != null) { // p("received: " + event.toXML()); String eventType = event.getEventType(); if (eventType.equals(E_HEARTBEAT)) { listener.onHeartbeat(event); } else if (eventType.equals(E_DATA)) { listener.onData(event); } else if (eventType.equals(E_JOIN_LISTEN_ACK)) { id = event.getField(P_ID); listener.onJoinListenAck(event); } else if (eventType.equals(E_LISTEN_ACK)) { p("Listen ack ok"); listener.onListenAck(event); } else if (eventType.equals(E_REFRESH_ACK)) { listener.onRefresh_ack(event); } else if (eventType.equals(E_ABORT)) { listener.onAbort(event); listener = null; break; } else if (eventType.equals(E_REFRESH)) { refresh(event); listener.onRefresh(event); } else { warn("unsupported event type received: " + eventType); } } } } } catch (Throwable t) { warn("Exception in run() ", t); // bailout(); } } private void disconnect() { p("start disconnect()"); if (reader != null) { try { // this blocks, find another way // reader.close(); p("Closed reader ok"); } catch (Exception ignore) { } finally { reader = null; } } p("end disconnect()"); } /** * Stop receiver receiveThread. */ public void stopThread() { p("In stopThread()"); // Keep a reference such that we can kill it from here. Thread targetThread = receiveThread; receiveThread = null; // This should stop the main loop for this receiveThread. // Killing a receiveThread on a blcing read is tricky. // See also http://gee.cs.oswego.edu/dl/cpj/cancel.html if ((targetThread != null) && targetThread.isAlive()) { targetThread.interrupt(); try { // Wait for it to die targetThread.join(500); } catch (InterruptedException ignore) { } // If current receiveThread refuses to die, // take more rigorous methods. if (targetThread.isAlive()) { // Not preferred but may be needed // to stop during a blocking read. targetThread.stop(); // Wait for it to die try { targetThread.join(500); } catch (Throwable ignore) { } } p("Stopped receiveThread alive=" + targetThread.isAlive()); } } /** * Stop listening on stream from server. */ public void bailout() { p("In bailout()"); stopThread(); disconnect(); } /** * Handle refresh, by pausing. */ private void refresh(Event aRefreshEvent) throws PushletException { try { // Wait for specified time. Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT))); } catch (Throwable t) { warn("abort while refresing"); refreshURL = null; return; } // If stopped during sleep, don't proceed if (receiveThread == null) { return; } // Create url to refresh refreshURL = pushletURL + "?" + P_ID + "=" + id + "&" + P_EVENT + "=" + E_REFRESH ; if (reader != null) { try { reader.close(); } catch (IOException ignore) { } reader = null; } reader = openURL(refreshURL); } } /** * Authenticator */ private static class HTTPAuthenticateProxy extends Authenticator { /** * Contributed by Dele Olajide * See http://groups.yahoo.com/group/pushlet/message/634 */ private String thePassword = ""; private String theUser = ""; public HTTPAuthenticateProxy(String username, String password) { thePassword = password; theUser = username; } protected PasswordAuthentication getPasswordAuthentication() { // System.out.println("[HttpAuthenticateProxy] Username = " + theUser); // System.out.println("[HttpAuthenticateProxy] Password = " + thePassword); return new PasswordAuthentication(theUser, thePassword.toCharArray()); } } } /* * $Log: PushletClient.java,v $ * Revision 1.18 2007/11/10 13:52:47 justb * make startDataEventListener method protected to allow overriding * * Revision 1.17 2006/10/29 16:47:57 justb * included patch from Jeff Nowakowski: wait until listener thread runs * * Revision 1.16 2005/05/06 20:08:20 justb * client enhancements * * Revision 1.15 2005/03/27 17:42:27 justb * enhancements * * Revision 1.14 2005/03/25 23:54:04 justb * *** empty log message *** * * Revision 1.13 2005/02/28 16:59:40 justb * fixes for leave and disconnect * * Revision 1.12 2005/02/28 15:57:54 justb * added SimpleListener example * * Revision 1.11 2005/02/21 12:31:44 justb * added proxy contribution from Dele Olajide * * Revision 1.10 2005/02/20 13:05:32 justb * removed the Postlet (integrated in Pushlet protocol) * * Revision 1.9 2005/02/18 10:07:23 justb * many renamings of classes (make names compact) * * Revision 1.8 2005/02/18 09:54:12 justb * refactor: rename Publisher Dispatcher and single Subscriber class * * Revision 1.7 2005/02/15 15:46:30 justb * client API improves * * Revision 1.6 2005/02/15 13:28:56 justb * first quick rewrite adapt for v2 protocol * * Revision 1.5 2004/10/25 21:23:44 justb * *** empty log message *** * * Revision 1.4 2004/10/24 13:52:51 justb * small fixes in client lib * * Revision 1.3 2004/10/24 12:58:18 justb * revised client and test classes for new protocol * * Revision 1.2 2004/09/03 22:35:37 justb * Almost complete rewrite, just checking in now * * Revision 1.1 2004/03/10 20:14:17 justb * renamed all *JavaPushletClient* to *PushletClient* * * Revision 1.10 2004/03/10 15:45:55 justb * many cosmetic changes * * Revision 1.9 2003/08/17 20:30:20 justb * cosmetic changes * * Revision 1.8 2003/08/15 08:37:40 justb * fix/add Copyright+LGPL file headers and footers * * */
最近下载更多
羁鸟念旧林 LV1
2021年6月14日
黄河之水天上来 LV2
2019年7月7日
2252536772 LV21
2019年6月21日
zCOFFEE LV3
2019年3月31日
博博123 LV3
2017年8月1日
覃学谦 LV2
2017年7月5日
liuhaitie LV14
2017年3月22日
guveis LV2
2016年11月19日
ljcy122 LV1
2016年10月3日
lalalalala LV13
2016年6月21日
最近浏览更多
cksndh LV4
2023年8月16日
yybb7435100 LV2
2023年8月7日
康 LV1
2021年11月24日
13798956075 LV1
2021年10月8日
羁鸟念旧林 LV1
2021年6月14日
年轻的人 LV1
2021年6月12日
1005948011 LV7
2021年6月1日
xser3366 LV5
2021年4月13日
2251937068 LV5
2021年3月17日
eawdas LV5
2020年12月27日