Mozilla/mozilla/netwerk/cache/memcache/nsMemCacheChannel.cpp
dougt%netscape.com 175245e2de Relanding Necko Changes.
Revising nsIChannel to allow for overlapped i/o. This consists of three parts:

1. Factoring nsIChannel into a protocol specific part, the nsIChannel, and a socket specific, the nsITransport.
2. Derive the nsIChannel from a nsIRequest.
2. Changes the notification system from necko and the URILoader to pass the nsIRequest interface instead of nsIChannel interface.

This goal stems from wanting to be able to have active AsyncRead and AsyncWrite operations on nsSocketTransport.
This is desired because it would greatly simplify the task of maintaining persistent/reusable socket connections
for FTP, HTTP, and Imap (and potentially other protocols). The problem with the existing nsIChannel interface is
that it does not allow one to selectively suspend just one of the read or write operations while keeping the other active.

r=darin@netscape.com
sr=rpotts@netscape.com


git-svn-id: svn://10.0.0.236/trunk@87587 18797224-902f-48f8-a5cc-f745e15eee43
2001-02-21 20:38:08 +00:00

678 lines
20 KiB
C++

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* The contents of this file are subject to the Netscape Public
* License Version 1.1 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of
* the License at http://www.mozilla.org/NPL/
*
* Software distributed under the License is distributed on an "AS
* IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
* implied. See the License for the specific language governing
* rights and limitations under the License.
*
* The Original Code is Mozilla Communicator client code, released
* March 31, 1998.
*
* The Initial Developer of the Original Code is Netscape
* Communications Corporation. Portions created by Netscape are
* Copyright (C) 1998-1999 Netscape Communications Corporation. All
* Rights Reserved.
*
*/
#include "nsMemCache.h"
#include "nsMemCacheChannel.h"
#include "nsIStreamListener.h"
#include "nsIChannel.h"
#include "nsIStorageStream.h"
#include "nsIOutputStream.h"
#include "nsIServiceManager.h"
#include "nsIEventQueueService.h"
#include "nsNetUtil.h"
#include "nsILoadGroup.h"
static NS_DEFINE_CID(kIOServiceCID, NS_IOSERVICE_CID);
static NS_DEFINE_CID(kEventQueueService, NS_EVENTQUEUESERVICE_CID);
NS_IMPL_THREADSAFE_ISUPPORTS3(nsMemCacheChannel, nsIChannel, nsIRequest, nsITransport)
void
nsMemCacheChannel::NotifyStorageInUse(PRInt32 aBytesUsed)
{
mRecord->mCache->mOccupancy += aBytesUsed;
}
/**
* This class acts as an adaptor around a synchronous input stream to add async
* read capabilities. It adds methods for initiating, suspending, resuming and
* cancelling async reads.
*/
class AsyncReadStreamAdaptor : public nsIInputStream,
public nsIStreamListener
{
public:
AsyncReadStreamAdaptor(nsMemCacheChannel* aChannel, nsIInputStream *aSyncStream):
mSyncStream(aSyncStream), mDataAvailCursor(0),
mRemaining((PRUint32)-1), mAvailable(0), mChannel(aChannel), mAbortStatus(NS_OK), mSuspended(PR_FALSE)
{
NS_INIT_REFCNT();
NS_ADDREF(mChannel);
}
virtual ~AsyncReadStreamAdaptor() {
mChannel->mAsyncReadStream = 0;
NS_RELEASE(mChannel);
}
NS_DECL_ISUPPORTS
nsresult
IsPending(PRBool* aIsPending) {
*aIsPending = (mRemaining != 0) && NS_SUCCEEDED(mAbortStatus);
return NS_OK;
}
nsresult
Cancel(nsresult status) {
if (NS_SUCCEEDED(mAbortStatus)) {
mAbortStatus = status;
return mEventQueueStreamListener ?
mEventQueueStreamListener->OnStopRequest(mChannel, mContext, status, nsnull):
status;
} else {
// Cancel has already been called... Do not fire another OnStopRequest!
return NS_OK;
}
}
nsresult
Suspend(void) { mSuspended = PR_TRUE; return NS_OK; }
nsresult
Resume(void) {
if (!mSuspended)
return NS_ERROR_FAILURE;
mSuspended = PR_FALSE;
return NextListenerEvent();
}
// nsIStreamListener methods, all of which delegate to the real listener
// This is the heart of this class.
// The OnDataAvailable() method is always called from an event processed by
// the system event queue. The event is sent from an
// AsyncReadStreamAdaptor object to itself. This method both forwards the
// event to the downstream listener and causes another OnDataAvailable()
// event to be enqueued.
NS_IMETHOD
OnDataAvailable(nsIRequest *request, nsISupports *aContext,
nsIInputStream *inStr, PRUint32 sourceOffset, PRUint32 count) {
nsresult rv;
rv = mDownstreamListener->OnDataAvailable(mChannel, aContext, inStr, sourceOffset, count);
if (NS_FAILED(rv)) {
Cancel(rv);
return rv;
}
if (!mSuspended && NS_SUCCEEDED(mAbortStatus)) {
rv = NextListenerEvent();
if (NS_FAILED(rv)) {
Fail();
return rv;
}
}
return rv;
}
NS_IMETHOD
OnStartRequest(nsIRequest *request, nsISupports *aContext) {
nsresult rv = NS_OK;
NS_ASSERTION(mDownstreamListener, "no downstream listener");
if (mDownstreamListener) {
rv = mDownstreamListener->OnStartRequest(mChannel, aContext);
}
if (NS_FAILED(rv))
Cancel(rv);
return rv;
}
NS_IMETHOD
OnStopRequest(nsIRequest *request, nsISupports *aContext,
nsresult aStatus, const PRUnichar* aStatusArg) {
nsresult rv = NS_OK;
NS_ASSERTION(mDownstreamListener, "no downstream listener");
if (mDownstreamListener) {
rv = mDownstreamListener->OnStopRequest(mChannel, aContext, aStatus, aStatusArg);
mDownstreamListener = 0;
}
// Tricky: causes this instance to be free'ed because mEventQueueStreamListener
// has a circular reference back to this.
mEventQueueStreamListener = 0;
return rv;
}
// nsIInputStream methods
NS_IMETHOD
Available(PRUint32 *aNumBytes) { return mAvailable; }
NS_IMETHOD
Read(char* aBuf, PRUint32 aCount, PRUint32 *aBytesRead) {
if (NS_FAILED(mAbortStatus))
return NS_BASE_STREAM_CLOSED;
*aBytesRead = 0;
aCount = PR_MIN(aCount, mAvailable);
nsresult rv = mSyncStream->Read(aBuf, aCount, aBytesRead);
mAvailable -= *aBytesRead;
if (NS_FAILED(rv) && (rv != NS_BASE_STREAM_WOULD_BLOCK)) {
Fail();
return rv;
}
return NS_OK;
}
NS_IMETHOD ReadSegments(nsWriteSegmentFun writer, void * closure, PRUint32 count, PRUint32 *_retval) {
return mSyncStream->ReadSegments(writer, closure, count, _retval);
}
NS_IMETHOD GetNonBlocking(PRBool *aNonBlocking) {
NS_NOTREACHED("GetNonBlocking");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD GetObserver(nsIInputStreamObserver * *aObserver) {
NS_NOTREACHED("GetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD SetObserver(nsIInputStreamObserver * aObserver) {
NS_NOTREACHED("SetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
Close() {
nsresult rv = mSyncStream->Close();
mSyncStream = 0;
mContext = 0;
mDownstreamListener = 0;
mEventQueueStreamListener = 0;
return rv;
}
NS_IMETHOD
GetTransferCount(nsLoadFlags *aTransferCount) {
*aTransferCount = mRemaining;
return NS_OK;
}
NS_IMETHOD
SetTransferCount(nsLoadFlags aTransferCount) {
mRemaining = aTransferCount;
return NS_OK;
}
nsresult
AsyncRead(nsIStreamListener* aListener, nsISupports* aContext) {
nsresult rv;
nsIEventQueue *eventQ;
mContext = aContext;
mDownstreamListener = aListener;
NS_WITH_SERVICE(nsIIOService, serv, kIOServiceCID, &rv);
if (NS_FAILED(rv)) return rv;
NS_WITH_SERVICE(nsIEventQueueService, eventQService, kEventQueueService, &rv);
if (NS_FAILED(rv)) return rv;
rv = eventQService->GetThreadEventQueue(PR_CurrentThread(), &eventQ);
if (NS_FAILED(rv)) return rv;
rv = NS_NewAsyncStreamListener(getter_AddRefs(mEventQueueStreamListener),
NS_STATIC_CAST(nsIStreamListener*, this), eventQ);
NS_RELEASE(eventQ);
if (NS_FAILED(rv)) return rv;
rv = mEventQueueStreamListener->OnStartRequest(mChannel, aContext);
if (NS_FAILED(rv)) return rv;
return NextListenerEvent();
}
protected:
nsresult
Fail(void) {
mAbortStatus = NS_BINDING_ABORTED;
return mEventQueueStreamListener->OnStopRequest(mChannel, mContext, NS_BINDING_FAILED, nsnull);
}
// If more data remains in the source stream that the downstream consumer
// has not yet been notified about, fire an OnDataAvailable event.
// Otherwise, fire an OnStopRequest event.
nsresult
NextListenerEvent() {
PRUint32 available;
nsresult rv = mSyncStream->Available(&available);
if (NS_FAILED(rv)) return rv;
available -= mAvailable;
available = PR_MIN(available, mRemaining);
if (available) {
PRUint32 size = PR_MIN(available, MEM_CACHE_SEGMENT_SIZE);
rv = mEventQueueStreamListener->OnDataAvailable(mChannel, mContext, this,
mDataAvailCursor, size);
mDataAvailCursor += size;
mRemaining -= size;
mAvailable += size;
return rv;
} else {
rv = mEventQueueStreamListener->OnStopRequest(mChannel, mContext, NS_OK, nsnull);
AsyncReadStreamAdaptor* thisAlias = this;
NS_RELEASE(thisAlias);
return rv;
}
}
private:
nsCOMPtr<nsISupports> mContext; // Opaque context passed to AsyncRead()
nsCOMPtr<nsIStreamListener> mEventQueueStreamListener; // Stream listener that has been proxied
nsCOMPtr<nsIStreamListener> mDownstreamListener; // Original stream listener
nsCOMPtr<nsIInputStream> mSyncStream; // Underlying synchronous stream that is
// being converted to an async stream
PRUint32 mDataAvailCursor;
PRUint32 mRemaining; // Size of AsyncRead request less bytes for
// consumer OnDataAvailable's that were fired
PRUint32 mAvailable; // Number of bytes for which OnDataAvailable fired
nsMemCacheChannel* mChannel; // Associated memory cache channel, strong link
// but can not use nsCOMPtr
nsresult mAbortStatus; // Abort() has been called
PRBool mSuspended; // Suspend() has been called
};
NS_IMPL_ISUPPORTS3(AsyncReadStreamAdaptor, nsIInputStream,
nsIStreamListener, nsIStreamObserver)
// The only purpose of this output stream wrapper is to adjust the cache's
// overall occupancy as new data flows into the cache entry.
class MemCacheWriteStreamWrapper : public nsIOutputStream {
public:
MemCacheWriteStreamWrapper(nsMemCacheChannel* aChannel, nsIOutputStream *aBaseStream):
mBaseStream(aBaseStream), mChannel(aChannel)
{
NS_INIT_REFCNT();
NS_ADDREF(mChannel);
}
virtual ~MemCacheWriteStreamWrapper() { NS_RELEASE(mChannel); };
static nsresult
Create(nsMemCacheChannel* aChannel, nsIOutputStream *aBaseStream, nsIOutputStream* *aWrapper) {
MemCacheWriteStreamWrapper *wrapper =
new MemCacheWriteStreamWrapper(aChannel, aBaseStream);
if (!wrapper) return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(wrapper);
*aWrapper = wrapper;
return NS_OK;
}
NS_DECL_ISUPPORTS
NS_IMETHOD
Close() { return mBaseStream->Close(); }
NS_IMETHOD
Flush() { return mBaseStream->Flush(); }
NS_IMETHOD
Write(const char *aBuffer, PRUint32 aCount, PRUint32 *aNumWritten) {
*aNumWritten = 0;
nsresult rv = mBaseStream->Write(aBuffer, aCount, aNumWritten);
mChannel->NotifyStorageInUse(*aNumWritten);
return rv;
}
NS_IMETHOD
WriteFrom(nsIInputStream *inStr, PRUint32 count, PRUint32 *_retval) {
NS_NOTREACHED("WriteFrom");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
WriteSegments(nsReadSegmentFun reader, void * closure, PRUint32 count, PRUint32 *_retval) {
NS_NOTREACHED("WriteSegments");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
GetNonBlocking(PRBool *aNonBlocking) {
NS_NOTREACHED("GetNonBlocking");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
SetNonBlocking(PRBool aNonBlocking) {
NS_NOTREACHED("SetNonBlocking");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
GetObserver(nsIOutputStreamObserver * *aObserver) {
NS_NOTREACHED("GetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHOD
SetObserver(nsIOutputStreamObserver * aObserver) {
NS_NOTREACHED("SetObserver");
return NS_ERROR_NOT_IMPLEMENTED;
}
private:
nsCOMPtr<nsIOutputStream> mBaseStream;
nsMemCacheChannel* mChannel;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(MemCacheWriteStreamWrapper, nsIOutputStream)
nsMemCacheChannel::nsMemCacheChannel(nsMemCacheRecord *aRecord, nsILoadGroup *aLoadGroup)
: mRecord(aRecord), mStatus(NS_OK),
mLoadAttributes(nsIChannel::LOAD_NORMAL)
{
NS_INIT_REFCNT();
mRecord->mNumChannels++;
}
nsMemCacheChannel::~nsMemCacheChannel()
{
mRecord->mNumChannels--;
}
NS_IMETHODIMP
nsMemCacheChannel::GetName(PRUnichar* *result)
{
NS_NOTREACHED("nsMemCacheChannel::GetName");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::IsPending(PRBool* aIsPending)
{
*aIsPending = PR_FALSE;
if (!mAsyncReadStream)
return NS_OK;
return mAsyncReadStream->IsPending(aIsPending);
}
NS_IMETHODIMP
nsMemCacheChannel::GetStatus(nsresult *status)
{
*status = mStatus;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::Cancel(nsresult status)
{
mStatus = status;
if (!mAsyncReadStream)
return NS_ERROR_FAILURE;
return mAsyncReadStream->Cancel(status);
}
NS_IMETHODIMP
nsMemCacheChannel::Suspend(void)
{
if (!mAsyncReadStream)
return NS_ERROR_FAILURE;
return mAsyncReadStream->Suspend();
}
NS_IMETHODIMP
nsMemCacheChannel::Resume(void)
{
if (!mAsyncReadStream)
return NS_ERROR_FAILURE;
return mAsyncReadStream->Resume();
}
NS_IMETHODIMP
nsMemCacheChannel::GetOriginalURI(nsIURI* *aURI)
{
// Not required
NS_NOTREACHED("nsMemCacheChannel::GetOriginalURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetOriginalURI(nsIURI* aURI)
{
// Not required
NS_NOTREACHED("nsMemCacheChannel::SetOriginalURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetURI(nsIURI* *aURI)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetURI(nsIURI* aURI)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetURI");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::Open(nsIInputStream **aResult)
{
return OpenInputStream(0, -1, 0, aResult);
}
NS_IMETHODIMP
nsMemCacheChannel::AsyncOpen(nsIStreamListener *aListener, nsISupports *aContext)
{
nsCOMPtr<nsIRequest> req;
return AsyncRead(aListener, aContext, 0, -1, 0, getter_AddRefs(req));
}
NS_IMETHODIMP
nsMemCacheChannel::OpenInputStream(PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIInputStream* *aResult)
{
nsresult rv;
NS_ENSURE_ARG(aResult);
if (mInputStream)
return NS_ERROR_NOT_AVAILABLE;
rv = mRecord->mStorageStream->NewInputStream(offset, getter_AddRefs(mInputStream));
if (NS_FAILED(rv)) return rv;
*aResult = mInputStream;
NS_ADDREF(*aResult);
return rv;
}
NS_IMETHODIMP
nsMemCacheChannel::OpenOutputStream(PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIOutputStream* *aResult)
{
nsresult rv;
NS_ENSURE_ARG(aResult);
nsCOMPtr<nsIOutputStream> outputStream;
PRUint32 oldLength;
mRecord->mStorageStream->GetLength(&oldLength);
rv = mRecord->mStorageStream->GetOutputStream(offset, getter_AddRefs(outputStream));
if (NS_FAILED(rv)) return rv;
if (offset < oldLength)
NotifyStorageInUse(offset - oldLength);
return MemCacheWriteStreamWrapper::Create(this, outputStream, aResult);
}
NS_IMETHODIMP
nsMemCacheChannel::AsyncRead(nsIStreamListener *aListener, nsISupports *aContext,
PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIRequest **aResult)
{
NS_ENSURE_ARG_POINTER(aResult);
nsCOMPtr<nsIInputStream> inputStream;
nsresult rv = OpenInputStream(offset, count, flags, getter_AddRefs(inputStream));
if (NS_FAILED(rv)) return rv;
AsyncReadStreamAdaptor *asyncReadStreamAdaptor;
asyncReadStreamAdaptor = new AsyncReadStreamAdaptor(this, inputStream);
if (!asyncReadStreamAdaptor)
return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(asyncReadStreamAdaptor);
mAsyncReadStream = asyncReadStreamAdaptor;
rv = asyncReadStreamAdaptor->AsyncRead(aListener, aContext);
if (NS_FAILED(rv)) {
mAsyncReadStream = nsnull;
NS_RELEASE(asyncReadStreamAdaptor);
}
NS_ADDREF(*aResult = this);
return rv;
}
NS_IMETHODIMP
nsMemCacheChannel::AsyncWrite(nsIStreamProvider *provider, nsISupports *ctxt,
PRUint32 offset, PRUint32 count, PRUint32 flags,
nsIRequest **aResult)
{
// Not required to be implemented
NS_NOTREACHED("nsMemCacheChannel::AsyncWrite");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetContentLength(PRInt32 aContentLength)
{
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetLoadAttributes(nsLoadFlags *aLoadAttributes)
{
*aLoadAttributes = mLoadAttributes;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::SetLoadAttributes(nsLoadFlags aLoadAttributes)
{
mLoadAttributes = aLoadAttributes;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetContentType(char* *aContentType)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetContentType");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetContentType(const char *aContentType)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetContentType");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetContentLength(PRInt32 *aContentLength)
{
PRUint32 cl = 0;
mRecord->GetStoredContentLength(&cl);
*aContentLength = (PRInt32) cl;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetOwner(nsISupports* *aOwner)
{
*aOwner = mOwner.get();
NS_IF_ADDREF(*aOwner);
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::SetOwner(nsISupports* aOwner)
{
// Not required to be implemented, since it is implemented by cache manager
mOwner = aOwner;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetLoadGroup(nsILoadGroup* *aLoadGroup)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetLoadGroup");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetLoadGroup(nsILoadGroup* aLoadGroup)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetLoadGroup");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetNotificationCallbacks(nsIInterfaceRequestor* *aNotificationCallbacks)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::GetNotificationCallbacks");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetNotificationCallbacks(nsIInterfaceRequestor* aNotificationCallbacks)
{
// Not required to be implemented, since it is implemented by cache manager
NS_NOTREACHED("nsMemCacheChannel::SetNotificationCallbacks");
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::GetSecurityInfo(nsISupports * *aSecurityInfo)
{
*aSecurityInfo = nsnull;
return NS_OK;
}
NS_IMETHODIMP
nsMemCacheChannel::GetProgressEventSink(nsIProgressEventSink **aSink)
{
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP
nsMemCacheChannel::SetProgressEventSink(nsIProgressEventSink *aSink)
{
return NS_ERROR_NOT_IMPLEMENTED;
}