Logo Search packages:      
Sourcecode: condor version File versions  Download package

int DaemonCore::Create_Thread ( ThreadStartFunc  start_func,
void *  arg = NULL,
Stream sock = NULL,
int  reaper_id = 1 
)

Create a "poor man's" thread. Works for NT (via CreateThread) and Unix (via fork). The new thread does not have a DaemonCore command socket. NOTE: if DoFakeCreateThread() is true, this will just call the specified function directly and then call the reaper later in a timer. Currently, this is the default under Windows until thread-safety can be assured.

Parameters:
start_func Function to execute in the thread. This function must not access any shared memory. The only DaemonCore command which may be used in this thread is Send_Signal. When the function returns, the thread exits and the reaper is called in the parent, giving the return value as the thread exit status. The function must take a single argument of type (void *) and return an int. The value of the return int should ONLY be a 0 or a 1.... the thread reaper can only distinguish between 0 and non-zero.
arg The (void *) argument to be passed to the thread. If not NULL, this must point to a buffer malloc()'ed by the caller. DaemonCore will free() this memory when appropriate.
sock A socket to be handed off to the thread. This socket is duplicated; one copy is given to the thread and the other is kept by the caller (parent). The caller must not access this socket after handing it off while the thread is still alive. DaemonCore will close the copy given to the thread when the thread exits; the caller (parent) is responsible for eventually closing the copy kept with the caller.
reaper_id The reaper number to use. Default = 1. IMPORTANT: the exit_status passed to the reaper will be a 0 if the start_func returned with 0, and the reaper exit_status will be non-zero if the start_func returns with a 1. Example: start_func returns 1, the reaper exit_status could be set to 1, or 128, or -1, or 255.... or anything except 0. Example: start_func returns 0, the reaper exit_status will be 0, and only 0.
Returns:
The tid of the newly created thread.

Definition at line 7347 of file daemon_core.cpp.

References Stream::CloneStream(), getpid(), and InfoCommandSinfulString().

{
      // check reaper_id validity
      if ( (reaper_id < 1) || (reaper_id > maxReap)
             || (reapTable[reaper_id - 1].num == 0) ) {
            dprintf(D_ALWAYS,"Create_Thread: invalid reaper_id\n");
            return FALSE;
      }

      if( DoFakeCreateThread() ) {
                  // Rather than creating a thread (or fork), we have been
                  // configured to just call the function directly in the
                  // current process, and then register a timer to call the
                  // reaper.

            // need to copy the sock because our caller is going to delete/close it
            Stream *s = sock ? sock->CloneStream() : (Stream *)NULL;

            priv_state saved_priv = get_priv();
            int exit_status = start_func(arg,s);

#ifndef WIN32
                  // In unix, we need to make exit_status like wait waitpid() returns
            exit_status = exit_status<<8;
#endif

            priv_state new_priv = get_priv();
            if( saved_priv != new_priv ) {
                  char const *reaper =
                        reaper_id > 0 ? reapTable[reaper_id-1].handler_descrip : NULL;
                  dprintf(D_ALWAYS,
                              "Create_Thread: UNEXPECTED: priv state changed "
                              "during worker function: %d %d (%s)\n",
                              (int)saved_priv, (int)new_priv,
                              reaper ? reaper : "no reaper" );
                  set_priv(saved_priv);
            }

            FakeCreateThreadReaperCaller *reaper_caller =
                  new FakeCreateThreadReaperCaller( exit_status, reaper_id );

            return reaper_caller->FakeThreadID();
      }

      // Before we create the thread, call InfoCommandSinfulString once.
      // This makes certain that InfoCommandSinfulString has allocated its
      // buffer which will make it thread safe when called from SendSignal().
      (void)InfoCommandSinfulString();

#ifdef WIN32
      unsigned tid;
      HANDLE hThread;
      priv_state priv;
      // need to copy the sock because our caller is going to delete/close it
      Stream *s = sock ? sock->CloneStream() : (Stream *)NULL;

      thread_info *tinfo = (thread_info *)malloc(sizeof(thread_info));
      tinfo->start_func = start_func;
      tinfo->arg = arg;
      tinfo->sock = s;
            // find out this threads priv state, so our new thread starts out
            // at the same priv state.  on Unix this is not a worry, since
            // priv_state on Unix is per process.  but on NT, it is per thread.
      priv = set_condor_priv();
      set_priv(priv);
      tinfo->priv = priv;
            // create the thread.
      hThread = (HANDLE) _beginthreadex(NULL, 1024,
                         (CRT_THREAD_HANDLER)win32_thread_start_func,
                         (void *)tinfo, 0, &tid);
      if ( hThread == NULL ) {
            EXCEPT("CreateThread failed");
      }
#else
            // we have to do the same checking for pid collision here as
            // we do in the Create_Process() case (see comments there).
      static int num_pid_collisions = 0;
      int max_pid_retry = 0;
      int errorpipe[2];
    if (pipe(errorpipe) < 0) {
        dprintf( D_ALWAYS,
                         "Create_Thread: pipe() failed with errno %d (%s)\n",
                         errno, strerror(errno) );
            return FALSE;
    }
      int tid;
      tid = fork();
      if (tid == 0) {                     // new thread (i.e., child process)
            _condor_exit_with_exec = 1;
            // close the read end of our error pipe and set the
            // close-on-exec flag on the write end
        close(errorpipe[0]);
        fcntl(errorpipe[1], F_SETFD, FD_CLOEXEC);

            dprintf_init_fork_child();

            pid_t pid = ::getpid();
            PidEntry* pidinfo = NULL;
        if( (pidTable->lookup(pid, pidinfo) >= 0) ) {
                // we've already got this pid in our table! we've got
                // to bail out immediately so our parent can retry.
            int child_errno = ERRNO_PID_COLLISION;
            write(errorpipe[1], &child_errno, sizeof(child_errno));
                  close( errorpipe[1] );
            exit(4);
        }
                  // if we got this far, we know we don't need the errorpipe
                  // anymore, so we can close it now...
            close( errorpipe[1] );
            exit(start_func(arg, sock));
      } else if ( tid > 0 ) {  // parent process
                  // close the write end of our error pipe
        close( errorpipe[1] );
            // check our error pipe for any problems before the exec
        bool had_child_error = false;
        int child_errno = 0;
        if( read(errorpipe[0], &child_errno, sizeof(int)) == sizeof(int) ) {
                  had_child_error = true;
            }
            close( errorpipe[0] );
            if( had_child_error ) {
                // If we were able to read the errno from the
                // errorpipe before it was closed, then we know the
                // error happened before the exec.  We need to reap
                // the child and return FALSE.
            int child_status;
            waitpid(tid, &child_status, 0);
                  if( child_errno != ERRNO_PID_COLLISION ) {
                        EXCEPT( "Impossible: Create_Thread child_errno (%d) is not "
                                    "ERRNO_PID_COLLISION!", child_errno );
                  }
                  dprintf( D_ALWAYS, "Create_Thread: child failed because "
                               "PID %d is still in use by DaemonCore\n",
                               tid );
                  num_pid_collisions++;
                  max_pid_retry = param_integer( "MAX_PID_COLLISION_RETRY",
                                                               DEFAULT_MAX_PID_COLLISIONS );
                  if( num_pid_collisions > max_pid_retry ) {
                        dprintf( D_ALWAYS, "Create_Thread: ERROR: we've had "
                                     "%d consecutive pid collisions, giving up! "
                                     "(%d PIDs being tracked internally.)\n",
                                     num_pid_collisions, pidTable->getNumElements() );
                        num_pid_collisions = 0;
                        return FALSE;
                  }
                        // if we're here, it means we had a pid collision,
                        // but it's not (yet) fatal, and we should just
                        // re-try the whole Create_Thread().
                  dprintf( D_ALWAYS, "Re-trying Create_Thread() to avoid "
                               "PID re-use\n" );
                  return Create_Thread( start_func, arg, sock, reaper_id );
            }
      } else {  // fork() failure
            dprintf( D_ALWAYS, "Create_Thread: fork() failed: %s (%d)\n",
                         strerror(errno), errno );
            num_pid_collisions = 0;
        close( errorpipe[0] );
        close( errorpipe[1] );
            return FALSE;
      }
            // if we got here, there's no collision, so reset our counter
      num_pid_collisions = 0;
      if (arg) free(arg);                 // arg should point to malloc()'ed data
#endif

      dprintf(D_DAEMONCORE,"Create_Thread: created new thread, tid=%d\n",tid);

      // store the thread info in our pidTable
      // -- this is safe on Unix since our thread is really a process but
      //    on NT we need to avoid conflicts between tids and pids -
      //      the DaemonCore reaper code handles this on NT by checking
      //      hProcess.  If hProcess is NULL, it is a thread, else a process.
      PidEntry *pidtmp = new PidEntry;
      pidtmp->new_process_group = FALSE;
      pidtmp->sinful_string[0] = '\0';
      pidtmp->is_local = TRUE;
      pidtmp->parent_is_local = TRUE;
      pidtmp->reaper_id = reaper_id;
      pidtmp->hung_tid = -1;
      pidtmp->was_not_responding = FALSE;
#ifdef WIN32
      // we lie here and set pidtmp->pid to equal the tid.  this allows
      // the DaemonCore WinNT pidwatcher code to remain mostly ignorant
      // that this is really a thread instead of a process.  we can get
      // away with this because currently WinNT pids and tids do not
      // conflict --- lets hope it stays that way!
      pidtmp->pid = tid;
      pidtmp->hProcess = NULL;      // setting this to NULL means this is a thread
      pidtmp->hThread = hThread;
      pidtmp->pipeEnd = NULL;
      pidtmp->tid = tid;
      pidtmp->hWnd = 0;
      pidtmp->deallocate = 0;
#else
      pidtmp->pid = tid;
#endif
      int insert_result = pidTable->insert(tid,pidtmp);
      assert( insert_result == 0 );
#ifdef WIN32
      WatchPid(pidtmp);
#endif
      return tid;
}


Generated by  Doxygen 1.6.0   Back to index