fixes bug 210125 "need to be able to AsyncWait for closure only" r=dougt sr=bzbarsky

git-svn-id: svn://10.0.0.236/trunk@147597 18797224-902f-48f8-a5cc-f745e15eee43
This commit is contained in:
darin%meer.net
2003-10-06 01:46:31 +00:00
parent fa7cb8115d
commit bc1bb52b19
73 changed files with 2530 additions and 2745 deletions

View File

@@ -443,470 +443,3 @@ nsThread::Shutdown()
}
////////////////////////////////////////////////////////////////////////////////
/* nsThreadPoolBusyBody implements an increment/decrement of
nsThreadPool's mBusyThreads member variable
*/
class nsThreadPoolBusyBody {
public:
nsThreadPoolBusyBody(nsThreadPool *aPool);
~nsThreadPoolBusyBody();
private:
void* operator new(size_t) CPP_THROW_NEW { return 0; } // local variable, only!
nsThreadPool *mPool;
};
inline nsThreadPoolBusyBody::nsThreadPoolBusyBody(nsThreadPool *aPool) {
nsAutoLock lock(aPool->mLock);
#ifdef DEBUG
PRUint32 threadCount;
if (NS_SUCCEEDED(aPool->mThreads->Count(&threadCount)))
NS_ASSERTION(aPool->mBusyThreads < threadCount, "thread busy count exceeded thread count");
#endif
aPool->mBusyThreads++;
mPool = aPool;
}
inline nsThreadPoolBusyBody::~nsThreadPoolBusyBody() {
nsAutoLock lock(mPool->mLock);
NS_ASSERTION(mPool->mBusyThreads > 0, "thread busy count < 0");
mPool->mBusyThreads--;
}
////////////////////////////////////////////////////////////////////////////////
nsThreadPool::nsThreadPool()
: mLock(nsnull), mThreadExit(nsnull), mPendingRequestAdded(nsnull),
mPendingRequestsAtZero(nsnull), mMinThreads(0), mMaxThreads(0),
mBusyThreads(0), mShuttingDown(PR_TRUE)
{
}
nsThreadPool::~nsThreadPool()
{
if (mThreads) {
Shutdown();
}
if (mLock)
PR_DestroyLock(mLock);
if (mThreadExit)
PR_DestroyCondVar(mThreadExit);
if (mPendingRequestAdded)
PR_DestroyCondVar(mPendingRequestAdded);
if (mPendingRequestsAtZero)
PR_DestroyCondVar(mPendingRequestsAtZero);
}
NS_IMPL_THREADSAFE_ISUPPORTS1(nsThreadPool, nsIThreadPool)
NS_IMETHODIMP
nsThreadPool::DispatchRequest(nsIRunnable* runnable)
{
nsresult rv;
nsAutoLock lock(mLock);
#if defined(PR_LOGGING)
nsCOMPtr<nsIThread> th;
nsIThread::GetCurrent(getter_AddRefs(th));
#endif
NS_ASSERTION(mMinThreads > 0, "forgot to call Init");
if (mShuttingDown) {
rv = NS_ERROR_FAILURE;
}
else {
PRUint32 requestCnt, threadCount;
requestCnt = mPendingRequests.Count();
rv = mThreads->Count(&threadCount);
if (NS_FAILED(rv)) goto exit;
if (requestCnt >= threadCount-mBusyThreads && threadCount < mMaxThreads) {
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p: %d threads in pool, max = %d, requests = %d, creating new thread...\n",
th.get(), threadCount, mMaxThreads, requestCnt));
rv = AddThread();
if (NS_FAILED(rv)) goto exit;
}
rv = mPendingRequests.AppendObject(runnable) ? NS_OK : NS_ERROR_FAILURE;
if (NS_SUCCEEDED(rv)) {
if (PR_FAILURE == PR_NotifyCondVar(mPendingRequestAdded))
goto exit;
}
}
exit:
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p dispatched %p status %x\n", th.get(), runnable, rv));
return rv;
}
nsresult
nsThreadPool::RemoveThread(nsIThread* currentThread)
{
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p being removed\n",
currentThread));
nsresult rv = mThreads->DeleteLastElement(currentThread);
PR_NotifyCondVar(mThreadExit);
return rv;
}
void
nsThreadPool::RequestDone(nsIRunnable* request)
{
nsAutoLock lock(mLock);
mRunningRequests.RemoveObject(request);
}
nsIRunnable*
nsThreadPool::GetRequest(nsIThread* currentThread)
{
nsresult rv = NS_OK;
nsCOMPtr<nsIRunnable> request;
nsAutoLock lock(mLock);
PRInt32 requestCnt;
while (PR_TRUE) {
requestCnt = mPendingRequests.Count();
if (requestCnt > 0) {
PRInt32 pendingThread;
PRInt32 runningPos;
for (pendingThread = 0; pendingThread < requestCnt; ++pendingThread) {
request = mPendingRequests.ObjectAt(pendingThread);
if (!request) {
NS_ERROR("Bad request in pending request list");
// Make it look like we just found no runnable requests
pendingThread = requestCnt;
break;
}
// check to see if the request is not running.
runningPos = mRunningRequests.IndexOf(request);
if (runningPos == -1)
break;
}
if (pendingThread < requestCnt) {
// Found something to run
PRBool removed = mPendingRequests.RemoveObjectAt(pendingThread);
NS_ASSERTION(removed, "nsCOMArray broken");
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p got request %p\n",
currentThread, request.get()));
if (removed && requestCnt == 1)
PR_NotifyCondVar(mPendingRequestsAtZero);
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p got request %p\n",
currentThread, request.get()));
mRunningRequests.AppendObject(request);
return request;
}
}
if (mShuttingDown)
break;
// no requests, and we're not shutting down yet...
// if we have more than the minimum required threads already then
// then we may be able to go away.
PRUint32 threadCnt;
rv = mThreads->Count(&threadCnt);
if (NS_FAILED(rv)) break;
if (threadCnt > mMinThreads) {
// to avoid multiple thread spawns/exits, we need to
// wait for some period of time while waiting for any
// additional requests. If this this wait yeilds no
// request, then we can exit.
//
// TODO: determine what the optimal timeout value is.
// For now, just use 5 seconds.
PRIntervalTime interval = PR_SecondsToInterval(5);
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p waiting for %d seconds before exiting (%d threads in pool)\n",
currentThread, interval, threadCnt));
(void) PR_WaitCondVar( mPendingRequestAdded, interval);
requestCnt = mPendingRequests.Count();
if (requestCnt == 0) {
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p: %d threads in pool, min = %d, exiting...\n",
currentThread, threadCnt, mMinThreads));
RemoveThread(currentThread);
return nsnull; // causes nsThreadPoolRunnable::Run to quit
}
}
else
{
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p waiting (%d threads in pool)\n",
currentThread, threadCnt));
(void)PR_WaitCondVar(mPendingRequestAdded, PR_INTERVAL_NO_TIMEOUT);
}
}
// no requests, we are going to dump the thread.
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p -- no more requests, exiting...\n",
currentThread));
RemoveThread(currentThread);
return nsnull;
}
NS_METHOD
nsThreadPool::Create(nsISupports *aOuter, REFNSIID aIID, void **aResult)
{
nsThreadPool* pool = new nsThreadPool();
if (!pool) return NS_ERROR_OUT_OF_MEMORY;
nsresult rv = pool->QueryInterface(aIID, aResult);
if (NS_FAILED(rv)) delete pool;
return rv;
}
NS_IMETHODIMP
nsThreadPool::ProcessPendingRequests()
{
while (mPendingRequests.Count() != 0) {
(void)PR_WaitCondVar(mPendingRequestsAtZero, PR_INTERVAL_NO_TIMEOUT);
}
return NS_OK;
}
PRBool
nsThreadPool::InterruptThreads(nsISupports* aElement, void *aData)
{
nsCOMPtr<nsIThread> thread = do_QueryInterface(aElement);
NS_ASSERTION(thread, "bad thread in array");
(void) thread->Interrupt();
return PR_TRUE;
}
NS_IMETHODIMP
nsThreadPool::Shutdown()
{
nsresult rv = NS_OK;
PRUint32 count = 0;
#if defined(PR_LOGGING)
nsCOMPtr<nsIThread> th;
nsIThread::GetCurrent(getter_AddRefs(th));
#endif
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p shutting down\n", th.get()));
nsAutoLock lock(mLock);
if (mShuttingDown) {
NS_ERROR("Bug 166371 - Was Shutdown() called more than once,"
" or did someone forget to call Init()?");
return NS_OK;
}
mShuttingDown = PR_TRUE;
rv = ProcessPendingRequests();
NS_ASSERTION(NS_SUCCEEDED(rv), "ProcessPendingRequests failed");
// keep trying... don't bail with an error here
// fix Add assert that there are no more requests to be handled.
// then interrupt the threads
rv = mThreads->EnumerateForwards(nsThreadPool::InterruptThreads, nsnull);
NS_ASSERTION(NS_SUCCEEDED(rv), "Interruption failed");
if (NS_FAILED(rv)) return rv;
while (PR_TRUE) {
rv = mThreads->Count(&count);
NS_ASSERTION(NS_SUCCEEDED(rv), "Count failed");
if (NS_FAILED(rv)) return rv;
if (count == 0 )
break;
PR_WaitCondVar(mThreadExit, PR_INTERVAL_NO_TIMEOUT);
}
mThreads = nsnull;
return rv;
}
NS_IMETHODIMP
nsThreadPool::Init(PRUint32 minThreadCount,
PRUint32 maxThreadCount,
PRUint32 stackSize,
PRThreadPriority priority,
PRThreadScope scope)
{
nsresult rv;
mStackSize = stackSize;
mPriority = priority;
mScope = scope;
NS_ASSERTION(minThreadCount > 0 && minThreadCount <= maxThreadCount, "bad min/max values");
mMinThreads = minThreadCount;
mMaxThreads = maxThreadCount;
mShuttingDown = PR_FALSE;
rv = NS_NewISupportsArray(getter_AddRefs(mThreads));
if (NS_FAILED(rv)) return rv;
mLock = PR_NewLock();
if (mLock == nsnull)
goto cleanup;
mPendingRequestAdded = PR_NewCondVar(mLock);
if (mPendingRequestAdded == nsnull)
goto cleanup;
mThreadExit = PR_NewCondVar(mLock);
if (mThreadExit == nsnull)
goto cleanup;
mPendingRequestsAtZero = PR_NewCondVar(mLock);
if (mPendingRequestsAtZero == nsnull)
goto cleanup;
return NS_OK;
cleanup:
if (mLock) {
PR_DestroyLock(mLock);
mLock = nsnull;
}
if (mThreadExit) {
PR_DestroyCondVar(mThreadExit);
mThreadExit = nsnull;
}
if (mPendingRequestAdded) {
PR_DestroyCondVar(mPendingRequestAdded);
mPendingRequestAdded = nsnull;
}
if (mPendingRequestsAtZero) {
PR_DestroyCondVar(mPendingRequestsAtZero);
mPendingRequestsAtZero = nsnull;
}
return NS_ERROR_OUT_OF_MEMORY;
}
nsresult
nsThreadPool::AddThread()
{
nsresult rv;
#if defined(DEBUG) || defined(FORCE_PR_LOG)
PRUint32 cnt;
rv = mThreads->Count(&cnt);
if (NS_FAILED(rv)) return rv;
#endif
#ifdef DEBUG
if (cnt >= mMaxThreads)
return NS_ERROR_FAILURE;
#endif
nsThreadPoolRunnable* runnable = new nsThreadPoolRunnable(this);
if (runnable == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(runnable);
nsCOMPtr<nsIThread> thread;
rv = NS_NewThread(getter_AddRefs(thread),
runnable,
mStackSize,
PR_UNJOINABLE_THREAD,
mPriority,
mScope);
// Let the thread own the runnable.
NS_RELEASE(runnable);
if (NS_FAILED(rv)) return rv;
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool adding new thread %p (%d total)\n",
thread.get(), cnt + 1));
rv = mThreads->AppendElement(thread) ? NS_OK : NS_ERROR_FAILURE;
return rv;
}
NS_COM nsresult
NS_NewThreadPool(nsIThreadPool* *result,
PRUint32 minThreads, PRUint32 maxThreads,
PRUint32 stackSize,
PRThreadPriority priority,
PRThreadScope scope)
{
nsresult rv;
nsThreadPool* pool = new nsThreadPool();
if (pool == nsnull)
return NS_ERROR_OUT_OF_MEMORY;
NS_ADDREF(pool);
rv = pool->Init(minThreads, maxThreads, stackSize, priority, scope);
if (NS_FAILED(rv)) {
NS_RELEASE(pool);
return rv;
}
*result = pool;
return NS_OK;
}
////////////////////////////////////////////////////////////////////////////////
nsThreadPoolRunnable::nsThreadPoolRunnable(nsThreadPool* pool)
: mPool(pool)
{
}
nsThreadPoolRunnable::~nsThreadPoolRunnable()
{
}
NS_IMPL_THREADSAFE_ISUPPORTS1(nsThreadPoolRunnable, nsIRunnable)
NS_IMETHODIMP
nsThreadPoolRunnable::Run()
{
nsresult rv = NS_OK;
nsCOMPtr<nsIRunnable> request;
nsCOMPtr<nsIThread> currentThread;
nsIThread::GetCurrent(getter_AddRefs(currentThread));
while ((request = mPool->GetRequest(currentThread)) != nsnull) {
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p running %p\n",
currentThread.get(), request.get()));
nsThreadPoolBusyBody bumpBusyCount(mPool);
rv = request->Run();
NS_ASSERTION(NS_SUCCEEDED(rv), "runnable failed");
// let the pool know that the request has finished running.
mPool->RequestDone(request);
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p completed %p status=%x\n",
currentThread.get(), request.get(), rv));
}
PR_LOG(nsIThreadLog, PR_LOG_DEBUG,
("nsIThreadPool thread %p quitting %p\n",
currentThread.get(), this));
return rv;
}
////////////////////////////////////////////////////////////////////////////////