factor out socket transport specific ipcTransport implementation.

git-svn-id: svn://10.0.0.236/trunk@133062 18797224-902f-48f8-a5cc-f745e15eee43
This commit is contained in:
darin%netscape.com 2002-11-06 01:47:07 +00:00
parent 386a683618
commit 811b906dfb
5 changed files with 393 additions and 271 deletions

View File

@ -62,6 +62,7 @@ CPPSRCS = \
ifeq ($(MOZ_WIDGET_TOOLKIT),windows)
else
CPPSRCS += ipcTransportUnix.cpp
CPPSRCS += ipcSocketProviderUnix.cpp
endif

View File

@ -37,8 +37,6 @@
#include "nsIServiceManager.h"
#include "nsIObserverService.h"
#include "nsISocketTransportService.h"
#include "nsISocketTransport.h"
#include "nsIInputStream.h"
#include "nsIOutputStream.h"
#include "nsIFile.h"
@ -58,9 +56,10 @@
#ifdef XP_UNIX
#include "ipcSocketProviderUnix.h"
#endif
#include "nsISocketTransportService.h"
static NS_DEFINE_CID(kSocketTransportServiceCID, NS_SOCKETTRANSPORTSERVICE_CID);
#endif
//-----------------------------------------------------------------------------
// ipcTransport
@ -68,8 +67,10 @@ static NS_DEFINE_CID(kSocketTransportServiceCID, NS_SOCKETTRANSPORTSERVICE_CID);
ipcTransport::~ipcTransport()
{
#ifdef XP_UNIX
if (mFD)
PR_Close(mFD);
#endif
}
nsresult
@ -78,10 +79,10 @@ ipcTransport::Init(const nsACString &appName,
ipcTransportObserver *obs)
{
mAppName = appName;
mSocketPath = socketPath;
mObserver = obs;
#ifdef XP_UNIX
mSocketPath = socketPath;
ipcSocketProviderUnix::SetSocketPath(socketPath);
#endif
@ -104,6 +105,7 @@ ipcTransport::Shutdown()
mHaveConnection = PR_FALSE;
#ifdef XP_UNIX
if (mReadRequest) {
mReadRequest->Cancel(NS_BINDING_ABORTED);
mReadRequest = nsnull;
@ -114,6 +116,7 @@ ipcTransport::Shutdown()
mWriteSuspended = PR_FALSE;
}
mTransport = nsnull;
#endif
return NS_OK;
}
@ -140,6 +143,7 @@ ipcTransport::SendMsg_Internal(ipcMessage *msg)
mSendQ.EnqueueMsg(msg);
#ifdef XP_UNIX
if (!mWriteRequest) {
if (!mTransport)
return NS_ERROR_FAILURE;
@ -152,6 +156,7 @@ ipcTransport::SendMsg_Internal(ipcMessage *msg)
mWriteRequest->Resume();
mWriteSuspended = PR_FALSE;
}
#endif
return NS_OK;
}
@ -167,12 +172,16 @@ ipcTransport::Connect()
return NS_ERROR_ABORT;
}
#ifdef XP_UNIX
rv = CreateTransport();
if (NS_FAILED(rv)) return rv;
rv = mTransport->AsyncRead(&mReceiver, nsnull, 0, PRUint32(-1), 0,
getter_AddRefs(mReadRequest));
return rv;
#else
return NS_OK;
#endif
}
void
@ -206,6 +215,8 @@ ipcTransport::OnMessageAvailable(const ipcMessage *rawMsg)
mObserver->OnMessageAvailable(rawMsg);
}
#ifdef XP_UNIX
void
ipcTransport::OnStartRequest(nsIRequest *req)
{
@ -297,6 +308,8 @@ ipcTransport::CreateTransport()
return rv;
}
#endif // !XP_UNIX
nsresult
ipcTransport::SpawnDaemon()
{
@ -347,197 +360,3 @@ ipcTransport::Observe(nsISupports *subject, const char *topic, const PRUnichar *
return NS_OK;
}
//-----------------------------------------------------------------------------
// ipcSendQueue
//-----------------------------------------------------------------------------
NS_IMETHODIMP_(nsrefcnt)
ipcSendQueue::AddRef()
{
return mTransport->AddRef();
}
NS_IMETHODIMP_(nsrefcnt)
ipcSendQueue::Release()
{
return mTransport->Release();
}
NS_IMPL_QUERY_INTERFACE2(ipcSendQueue, nsIStreamProvider, nsIRequestObserver)
NS_IMETHODIMP
ipcSendQueue::OnStartRequest(nsIRequest *request,
nsISupports *context)
{
LOG(("ipcSendQueue::OnStartRequest\n"));
if (mTransport)
mTransport->OnStartRequest(request);
return NS_OK;
}
NS_IMETHODIMP
ipcSendQueue::OnStopRequest(nsIRequest *request,
nsISupports *context,
nsresult status)
{
LOG(("ipcSendQueue::OnStopRequest [status=%x]\n", status));
if (mTransport)
mTransport->OnStopRequest(request, status);
return NS_OK;
}
struct ipcWriteState
{
ipcMessage *msg;
PRBool complete;
};
static NS_METHOD ipcWriteMessage(nsIOutputStream *stream,
void *closure,
char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countWritten)
{
ipcWriteState *state = (ipcWriteState *) closure;
if (state->msg->WriteTo(segment, count,
countWritten, &state->complete) != PR_SUCCESS)
return NS_ERROR_UNEXPECTED;
return NS_OK;
}
NS_IMETHODIMP
ipcSendQueue::OnDataWritable(nsIRequest *request,
nsISupports *context,
nsIOutputStream *stream,
PRUint32 offset,
PRUint32 count)
{
PRUint32 n;
nsresult rv;
ipcWriteState state;
PRBool wroteSomething = PR_FALSE;
LOG(("ipcSendQueue::OnDataWritable\n"));
while (!mQueue.IsEmpty()) {
state.msg = mQueue.First();
state.complete = PR_FALSE;
rv = stream->WriteSegments(ipcWriteMessage, &state, count, &n);
if (NS_FAILED(rv))
break;
if (state.complete) {
LOG((" wrote message %u bytes\n", mQueue.First()->MsgLen()));
mQueue.DeleteFirst();
}
wroteSomething = PR_TRUE;
}
if (wroteSomething)
return NS_OK;
LOG((" suspending write request\n"));
mTransport->SetWriteSuspended(PR_TRUE);
return NS_BASE_STREAM_WOULD_BLOCK;
}
//----------------------------------------------------------------------------
// ipcReceiver
//----------------------------------------------------------------------------
NS_IMETHODIMP_(nsrefcnt)
ipcReceiver::AddRef()
{
return mTransport->AddRef();
}
NS_IMETHODIMP_(nsrefcnt)
ipcReceiver::Release()
{
return mTransport->Release();
}
NS_IMPL_QUERY_INTERFACE2(ipcReceiver, nsIStreamListener, nsIRequestObserver)
NS_IMETHODIMP
ipcReceiver::OnStartRequest(nsIRequest *request,
nsISupports *context)
{
LOG(("ipcReceiver::OnStartRequest\n"));
if (mTransport)
mTransport->OnStartRequest(request);
return NS_OK;
}
NS_IMETHODIMP
ipcReceiver::OnStopRequest(nsIRequest *request,
nsISupports *context,
nsresult status)
{
LOG(("ipcReceiver::OnStopRequest [status=%x]\n", status));
if (mTransport)
mTransport->OnStopRequest(request, status);
return NS_OK;
}
static NS_METHOD ipcReadMessage(nsIInputStream *stream,
void *closure,
const char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countRead)
{
ipcReceiver *receiver = (ipcReceiver *) closure;
return receiver->ReadSegment(segment, count, countRead);
}
NS_IMETHODIMP
ipcReceiver::OnDataAvailable(nsIRequest *request,
nsISupports *context,
nsIInputStream *stream,
PRUint32 offset,
PRUint32 count)
{
LOG(("ipcReceiver::OnDataAvailable [count=%u]\n", count));
PRUint32 countRead;
return stream->ReadSegments(ipcReadMessage, this, count, &countRead);
}
nsresult
ipcReceiver::ReadSegment(const char *ptr, PRUint32 count, PRUint32 *countRead)
{
*countRead = 0;
while (count) {
PRUint32 nread;
PRBool complete;
mMsg.ReadFrom(ptr, count, &nread, &complete);
if (complete) {
mTransport->OnMessageAvailable(&mMsg);
mMsg.Reset();
}
count -= nread;
ptr += nread;
*countRead += nread;
}
return NS_OK;
}

View File

@ -39,18 +39,18 @@
#define ipcTransport_h__
#include "nsIObserver.h"
#include "nsIStreamListener.h"
#include "nsIStreamProvider.h"
#include "nsITransport.h"
#include "nsITimer.h"
#include "nsString.h"
#include "nsCOMPtr.h"
#include "prio.h"
#include "ipcMessage.h"
#include "ipcMessageQ.h"
class ipcTransport;
#ifdef XP_UNIX
#include "prio.h"
#include "ipcTransportUnix.h"
#endif
//----------------------------------------------------------------------------
// ipcTransportObserver interface
@ -64,54 +64,6 @@ public:
virtual void OnMessageAvailable(const ipcMessage *) = 0;
};
//----------------------------------------------------------------------------
// ipcSendQueue
//----------------------------------------------------------------------------
class ipcSendQueue : public nsIStreamProvider
{
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMPROVIDER
ipcSendQueue(ipcTransport *transport)
: mTransport(transport)
{ }
virtual ~ipcSendQueue() { }
void EnqueueMsg(ipcMessage *msg) { mQueue.Append(msg); }
PRBool IsEmpty() { return mQueue.IsEmpty(); }
private:
ipcTransport *mTransport;
ipcMessageQ mQueue;
};
//-----------------------------------------------------------------------------
// ipcReceiver
//-----------------------------------------------------------------------------
class ipcReceiver : public nsIStreamListener
{
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMLISTENER
ipcReceiver(ipcTransport *transport)
: mTransport(transport)
{ }
virtual ~ipcReceiver() { }
nsresult ReadSegment(const char *, PRUint32 count, PRUint32 *countRead);
private:
ipcTransport *mTransport;
ipcMessage mMsg; // message in progress
};
//-----------------------------------------------------------------------------
// ipcTransport
//-----------------------------------------------------------------------------
@ -123,20 +75,23 @@ public:
NS_DECL_NSIOBSERVER
ipcTransport()
: mSendQ(this)
, mReceiver(this)
, mObserver(nsnull)
, mFD(nsnull)
, mWriteSuspended(PR_FALSE)
: mObserver(nsnull)
, mSentHello(PR_FALSE)
, mHaveConnection(PR_FALSE)
, mSpawnedDaemon(PR_FALSE)
, mConnectionAttemptCount(0)
{ }
#ifdef XP_UNIX
, mSendQ(this)
, mReceiver(this)
, mFD(nsnull)
, mWriteSuspended(PR_FALSE)
#endif
{ NS_INIT_ISUPPORTS(); }
virtual ~ipcTransport();
nsresult Init(const nsACString &appName,
const nsACString &socketPath,
const nsACString &socketPath, // ignored if not UNIX
ipcTransportObserver *observer);
nsresult Shutdown();
@ -146,12 +101,10 @@ public:
PRBool HaveConnection() const { return mHaveConnection; }
public:
// internal to implementation
//
// internal to implementation
//
void OnMessageAvailable(const ipcMessage *);
void SetWriteSuspended(PRBool val) { mWriteSuspended = val; }
void OnStartRequest(nsIRequest *req);
void OnStopRequest(nsIRequest *req, nsresult status);
PRFileDesc *FD() { return mFD; }
private:
//
@ -159,28 +112,43 @@ private:
//
nsresult Connect();
nsresult SendMsg_Internal(ipcMessage *msg);
nsresult CreateTransport();
nsresult SpawnDaemon();
//
// data
//
ipcSendQueue mSendQ;
ipcReceiver mReceiver;
ipcMessageQ mDelayedQ;
ipcTransportObserver *mObserver;
nsCOMPtr<nsITransport> mTransport;
nsCOMPtr<nsIRequest> mReadRequest;
nsCOMPtr<nsIRequest> mWriteRequest;
nsCOMPtr<nsITimer> mTimer;
nsCString mAppName;
nsCString mSocketPath;
PRFileDesc *mFD;
PRPackedBool mWriteSuspended;
PRPackedBool mSentHello;
PRPackedBool mHaveConnection;
PRPackedBool mSpawnedDaemon;
PRUint8 mConnectionAttemptCount;
#ifdef XP_UNIX
ipcSendQueue mSendQ;
ipcReceiver mReceiver;
nsCOMPtr<nsITransport> mTransport;
nsCOMPtr<nsIRequest> mReadRequest;
nsCOMPtr<nsIRequest> mWriteRequest;
nsCString mSocketPath;
PRFileDesc *mFD;
PRPackedBool mWriteSuspended;
nsresult CreateTransport();
public:
//
// internal helper methods for ipcSendQueue and ipcReceiver
//
void SetWriteSuspended(PRBool val) { mWriteSuspended = val; }
void OnStartRequest(nsIRequest *req);
void OnStopRequest(nsIRequest *req, nsresult status);
PRFileDesc *FD() { return mFD; }
#endif
};
#endif // !ipcTransport_h__

View File

@ -0,0 +1,238 @@
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla IPC.
*
* The Initial Developer of the Original Code is
* Netscape Communications Corporation.
* Portions created by the Initial Developer are Copyright (C) 2002
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
* Darin Fisher <darin@netscape.com>
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#include "nsIInputStream.h"
#include "nsIOutputStream.h"
#include "ipcConfig.h"
#include "ipcLog.h"
#include "ipcTransportUnix.h"
#include "ipcTransport.h"
//-----------------------------------------------------------------------------
// ipcSendQueue
//-----------------------------------------------------------------------------
NS_IMETHODIMP_(nsrefcnt)
ipcSendQueue::AddRef()
{
return mTransport->AddRef();
}
NS_IMETHODIMP_(nsrefcnt)
ipcSendQueue::Release()
{
return mTransport->Release();
}
NS_IMPL_QUERY_INTERFACE2(ipcSendQueue, nsIStreamProvider, nsIRequestObserver)
NS_IMETHODIMP
ipcSendQueue::OnStartRequest(nsIRequest *request,
nsISupports *context)
{
LOG(("ipcSendQueue::OnStartRequest\n"));
if (mTransport)
mTransport->OnStartRequest(request);
return NS_OK;
}
NS_IMETHODIMP
ipcSendQueue::OnStopRequest(nsIRequest *request,
nsISupports *context,
nsresult status)
{
LOG(("ipcSendQueue::OnStopRequest [status=%x]\n", status));
if (mTransport)
mTransport->OnStopRequest(request, status);
return NS_OK;
}
struct ipcWriteState
{
ipcMessage *msg;
PRBool complete;
};
static NS_METHOD ipcWriteMessage(nsIOutputStream *stream,
void *closure,
char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countWritten)
{
ipcWriteState *state = (ipcWriteState *) closure;
if (state->msg->WriteTo(segment, count,
countWritten, &state->complete) != PR_SUCCESS)
return NS_ERROR_UNEXPECTED;
return NS_OK;
}
NS_IMETHODIMP
ipcSendQueue::OnDataWritable(nsIRequest *request,
nsISupports *context,
nsIOutputStream *stream,
PRUint32 offset,
PRUint32 count)
{
PRUint32 n;
nsresult rv;
ipcWriteState state;
PRBool wroteSomething = PR_FALSE;
LOG(("ipcSendQueue::OnDataWritable\n"));
while (!mQueue.IsEmpty()) {
state.msg = mQueue.First();
state.complete = PR_FALSE;
rv = stream->WriteSegments(ipcWriteMessage, &state, count, &n);
if (NS_FAILED(rv))
break;
if (state.complete) {
LOG((" wrote message %u bytes\n", mQueue.First()->MsgLen()));
mQueue.DeleteFirst();
}
wroteSomething = PR_TRUE;
}
if (wroteSomething)
return NS_OK;
LOG((" suspending write request\n"));
mTransport->SetWriteSuspended(PR_TRUE);
return NS_BASE_STREAM_WOULD_BLOCK;
}
//----------------------------------------------------------------------------
// ipcReceiver
//----------------------------------------------------------------------------
NS_IMETHODIMP_(nsrefcnt)
ipcReceiver::AddRef()
{
return mTransport->AddRef();
}
NS_IMETHODIMP_(nsrefcnt)
ipcReceiver::Release()
{
return mTransport->Release();
}
NS_IMPL_QUERY_INTERFACE2(ipcReceiver, nsIStreamListener, nsIRequestObserver)
NS_IMETHODIMP
ipcReceiver::OnStartRequest(nsIRequest *request,
nsISupports *context)
{
LOG(("ipcReceiver::OnStartRequest\n"));
if (mTransport)
mTransport->OnStartRequest(request);
return NS_OK;
}
NS_IMETHODIMP
ipcReceiver::OnStopRequest(nsIRequest *request,
nsISupports *context,
nsresult status)
{
LOG(("ipcReceiver::OnStopRequest [status=%x]\n", status));
if (mTransport)
mTransport->OnStopRequest(request, status);
return NS_OK;
}
static NS_METHOD ipcReadMessage(nsIInputStream *stream,
void *closure,
const char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countRead)
{
ipcReceiver *receiver = (ipcReceiver *) closure;
return receiver->ReadSegment(segment, count, countRead);
}
NS_IMETHODIMP
ipcReceiver::OnDataAvailable(nsIRequest *request,
nsISupports *context,
nsIInputStream *stream,
PRUint32 offset,
PRUint32 count)
{
LOG(("ipcReceiver::OnDataAvailable [count=%u]\n", count));
PRUint32 countRead;
return stream->ReadSegments(ipcReadMessage, this, count, &countRead);
}
nsresult
ipcReceiver::ReadSegment(const char *ptr, PRUint32 count, PRUint32 *countRead)
{
*countRead = 0;
while (count) {
PRUint32 nread;
PRBool complete;
mMsg.ReadFrom(ptr, count, &nread, &complete);
if (complete) {
mTransport->OnMessageAvailable(&mMsg);
mMsg.Reset();
}
count -= nread;
ptr += nread;
*countRead += nread;
}
return NS_OK;
}

View File

@ -0,0 +1,96 @@
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla IPC.
*
* The Initial Developer of the Original Code is
* Netscape Communications Corporation.
* Portions created by the Initial Developer are Copyright (C) 2002
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
* Darin Fisher <darin@netscape.com>
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#ifndef ipcTransportUnix_h__
#define ipcTransportUnix_h__
#include "nsIStreamListener.h"
#include "nsIStreamProvider.h"
#include "ipcMessageQ.h"
class ipcTransport;
//----------------------------------------------------------------------------
// ipcSendQueue
//----------------------------------------------------------------------------
class ipcSendQueue : public nsIStreamProvider
{
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMPROVIDER
ipcSendQueue(ipcTransport *transport)
: mTransport(transport)
{ }
virtual ~ipcSendQueue() { }
void EnqueueMsg(ipcMessage *msg) { mQueue.Append(msg); }
PRBool IsEmpty() { return mQueue.IsEmpty(); }
private:
ipcTransport *mTransport;
ipcMessageQ mQueue;
};
//-----------------------------------------------------------------------------
// ipcReceiver
//-----------------------------------------------------------------------------
class ipcReceiver : public nsIStreamListener
{
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMLISTENER
ipcReceiver(ipcTransport *transport)
: mTransport(transport)
{ }
virtual ~ipcReceiver() { }
nsresult ReadSegment(const char *, PRUint32 count, PRUint32 *countRead);
private:
ipcTransport *mTransport;
ipcMessage mMsg; // message in progress
};
#endif // !ipcTransportUnix_h__