Logo Search packages:      
Sourcecode: condor version File versions  Download package

transfer_request.cpp

/***************************************************************
 *
 * Copyright (C) 1990-2007, Condor Team, Computer Sciences Department,
 * University of Wisconsin-Madison, WI.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License"); you
 * may not use this file except in compliance with the License.  You may
 * obtain a copy of the License at
 * 
 *    http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 ***************************************************************/

#include "condor_common.h"
#include "condor_debug.h"
#include "MyString.h"
#include "extArray.h"
#include "simplelist.h"
#include "condor_classad.h"
#include "condor_attributes.h"
#include "condor_transfer_request.h"

const char ATTR_IP_PROTOCOL_VERSION[] = "ProtocolVersion";
const char ATTR_IP_NUM_TRANSFERS[] = "NumTransfers";
const char ATTR_IP_TRANSFER_SERVICE[] = "TransferService";
const char ATTR_IP_PEER_VERSION[] = "PeerVersion";

// This function assumes ownership of the pointer 
TransferRequest::TransferRequest(ClassAd *ip)
{
      ASSERT(ip != NULL);

      m_pre_push_func_desc = "None";
      m_pre_push_func = NULL;
      m_pre_push_func_this = NULL;

      m_post_push_func_desc = "None";
      m_post_push_func = NULL;
      m_post_push_func_this = NULL;

      m_update_func_desc = "None";
      m_update_func = NULL;
      m_update_func_this = NULL;

      m_reaper_func_desc = "None";
      m_reaper_func = NULL;
      m_reaper_func_this = NULL;

      m_ip = ip;

      m_rejected = false;

      /* Since this schema check happens here I don't need to check the
            existance of these attributes when I use them. */
      ASSERT(check_schema() == INFO_PACKET_SCHEMA_OK);
}

TransferRequest::TransferRequest()
{
      m_ip = new ClassAd();
}

TransferRequest::~TransferRequest()
{
      delete m_ip;
      m_ip = NULL;
}

SchemaCheck
TransferRequest::check_schema(void)
{
      int version;

      ASSERT(m_ip != NULL);

      /* ALL info packets MUST have a protocol version number */

      /* Check to make sure it exists */
      if (m_ip->Lookup(ATTR_IP_PROTOCOL_VERSION) == NULL) {
            EXCEPT("TransferRequest::check_schema() Failed due to missing %s attribute",
                  ATTR_IP_PROTOCOL_VERSION);
      }

      /* for now, this assumes version 0 of the protocol. */

      /* Check to make sure it resolves to an int, and determine it */
      if (m_ip->LookupInteger(ATTR_IP_PROTOCOL_VERSION, version) == 0) {
            EXCEPT("TransferRequest::check_schema() Failed. ATTR_IP_PROTOCOL_VERSION "
                        "must be an integer.");
      }

      /* for now, just check existance of attribute, but not type */
      if (m_ip->Lookup(ATTR_IP_NUM_TRANSFERS) == NULL) {
            EXCEPT("TransferRequest::check_schema() Failed due to missing %s "
                  "attribute", ATTR_IP_NUM_TRANSFERS);
      }

      if (m_ip->Lookup(ATTR_IP_TRANSFER_SERVICE) == NULL) {
            EXCEPT("TransferRequest::check_schema() Failed due to missing %s "
                  "attribute", ATTR_IP_TRANSFER_SERVICE);
      }

      if (m_ip->Lookup(ATTR_IP_PEER_VERSION) == NULL) {
            EXCEPT("TransferRequest::check_schema() Failed due to missing %s "
                  "attribute", ATTR_IP_PEER_VERSION);
      }


      // currently, this either excepts, or returns ok.
      return INFO_PACKET_SCHEMA_OK;
}

void
TransferRequest::set_client_sock(ReliSock *rsock)
{
      m_client_sock = rsock;
}

ReliSock*
TransferRequest::get_client_sock(void)
{
      return m_client_sock;
}

void
TransferRequest::append_task(ClassAd *ad)
{
      ASSERT(m_ip != NULL);
      m_todo_ads.Append(ad);
}

void
TransferRequest::set_procids(ExtArray<PROC_ID> *procs)
{
      ASSERT(m_ip != NULL);

      m_procids = procs;
}

// do not free this returned pointer
ExtArray<PROC_ID>*
TransferRequest::get_procids(void)
{
      ASSERT(m_ip != NULL);

      return m_procids;
}

void
TransferRequest::dprintf(unsigned int lvl)
{
      MyString pv;

      ASSERT(m_ip != NULL);

      pv = get_peer_version();

      ::dprintf(lvl, "TransferRequest Dump:\n");
      ::dprintf(lvl, "\tProtocol Version: %d\n", get_protocol_version());
      ::dprintf(lvl, "\tServer Mode: %u\n", get_transfer_service());
      ::dprintf(lvl, "\tNum Transfers: %d\n", get_num_transfers());
      ::dprintf(lvl, "\tPeer Version: %s\n", pv.Value());
}

void
TransferRequest::set_num_transfers(int nt)
{
      MyString str;

      ASSERT(m_ip != NULL);

      str += ATTR_IP_NUM_TRANSFERS;
      str += " = ";
      str += nt;

      m_ip->InsertOrUpdate(str.Value());
}

int
TransferRequest::get_num_transfers(void)
{
      int num;

      ASSERT(m_ip != NULL);

      m_ip->LookupInteger(ATTR_IP_NUM_TRANSFERS, num);

      return num;
}

void
TransferRequest::set_transfer_service(MyString &mode)
{
      ASSERT(m_ip != NULL);

      set_transfer_service(mode.Value());
}

void
TransferRequest::set_transfer_service(const char *mode)
{
      MyString str;

      ASSERT(m_ip != NULL);

      str += ATTR_IP_TRANSFER_SERVICE;
      str += " = \"";
      str += mode;
      str += "\"";

      m_ip->InsertOrUpdate(str.Value());
}

void
TransferRequest::set_transfer_service(TreqMode  /*mode*/)
{
      // XXX TODO
}


TreqMode
TransferRequest::get_transfer_service(void)
{
      MyString mode;
      MyString tmp;

      ASSERT(m_ip != NULL);

      m_ip->LookupString(ATTR_IP_TRANSFER_SERVICE, mode);

      return ::transfer_mode(mode);
}

void
TransferRequest::set_protocol_version(int pv)
{
      ASSERT(m_ip != NULL);

      MyString str;

      str += ATTR_IP_PROTOCOL_VERSION;
      str += " = ";
      str += pv;

      m_ip->InsertOrUpdate(str.Value());
}

int
TransferRequest::get_protocol_version(void)
{
      int version; 

      ASSERT(m_ip != NULL);

      m_ip->LookupInteger(ATTR_IP_PROTOCOL_VERSION, version);

      return version;
}

void
TransferRequest::set_xfer_protocol(int xp)
{
      ASSERT(m_ip != NULL);

      MyString str;

      str += ATTR_TREQ_FTP;
      str += " = ";
      str += xp;

      m_ip->InsertOrUpdate(str.Value());
}

int
TransferRequest::get_xfer_protocol(void)
{
      int xfer_protocol; 

      ASSERT(m_ip != NULL);

      m_ip->LookupInteger(ATTR_TREQ_FTP, xfer_protocol);

      return xfer_protocol;
}

void
TransferRequest::set_direction(int dir)
{
      ASSERT(m_ip != NULL);

      MyString str;

      str += ATTR_TREQ_DIRECTION;
      str += " = ";
      str += dir;

      m_ip->InsertOrUpdate(str.Value());
}

int
TransferRequest::get_direction(void)
{
      int dir; 

      ASSERT(m_ip != NULL);

      m_ip->LookupInteger(ATTR_TREQ_DIRECTION, dir);

      return dir;
}


void
TransferRequest::set_used_constraint(bool con)
{
      ASSERT(m_ip != NULL);

      MyString str;

      str += ATTR_TREQ_HAS_CONSTRAINT;
      str += " = ";
      str += con==true?"TRUE":"FALSE";

      m_ip->InsertOrUpdate(str.Value());
}

bool
TransferRequest::get_used_constraint(void)
{
      bool con; 

      ASSERT(m_ip != NULL);

      m_ip->LookupBool(ATTR_TREQ_HAS_CONSTRAINT, con);

      return con;
}

void
TransferRequest::set_peer_version(MyString &pv)
{
      MyString str;

      ASSERT(m_ip != NULL);

      str += ATTR_IP_PEER_VERSION;
      str += " = \"";
      str += pv;
      str += "\"";

      m_ip->InsertOrUpdate(str.Value());
}

void
TransferRequest::set_peer_version(char *pv)
{
      MyString str;
      ASSERT(m_ip != NULL);

      str = pv;

      set_peer_version(str);
}

// This will make a copy when you assign the return value to something.
MyString
TransferRequest::get_peer_version(void)
{
      MyString pv;

      ASSERT(m_ip != NULL);

      m_ip->LookupString(ATTR_IP_PEER_VERSION, pv);

      return pv;
}

// do NOT delete this pointer, it is an alias pointer into the transfer request
SimpleList<ClassAd*>*
TransferRequest::todo_tasks(void)
{
      ASSERT(m_ip != NULL);

      return &m_todo_ads;
}

void
TransferRequest::set_capability(MyString &capability)
{
      m_cap = capability;
}

MyString
TransferRequest::get_capability()
{
      return m_cap;
}

void
TransferRequest::set_rejected_reason(MyString &reason)
{
      m_rejected_reason = reason;
}

MyString
TransferRequest::get_rejected_reason()
{
      return m_rejected_reason;
}

void 
TransferRequest::set_rejected(bool val)
{
      m_rejected = val;
}

bool 
TransferRequest::get_rejected(void)
{
      return m_rejected;
}

void 
TransferRequest::set_pre_push_callback(MyString desc, 
      TreqPrePushCallback callback, Service *base)
{
      m_pre_push_func_desc = desc;
      m_pre_push_func = callback;
      m_pre_push_func_this = base;
}

TreqAction
TransferRequest::call_pre_push_callback(TransferRequest *treq, 
      TransferDaemon *td)
{
      return (m_pre_push_func_this->*(m_pre_push_func))(treq, td);
}

void
TransferRequest::set_post_push_callback(MyString desc, 
      TreqPostPushCallback callback, Service *base)
{
      m_post_push_func_desc = desc;
      m_post_push_func = callback;
      m_post_push_func_this = base;
}

TreqAction
TransferRequest::call_post_push_callback(TransferRequest *treq, 
      TransferDaemon *td)
{
      return (m_post_push_func_this->*(m_post_push_func))(treq, td);
}

void 
TransferRequest::set_update_callback(MyString desc, 
      TreqUpdateCallback callback, Service *base)
{
      m_update_func_desc = desc;
      m_update_func = callback;
      m_update_func_this = base;
}

TreqAction
TransferRequest::call_update_callback(TransferRequest *treq, 
      TransferDaemon *td, ClassAd *update)
{
      return (m_update_func_this->*(m_update_func))(treq, td, update);
}

void 
TransferRequest::set_reaper_callback(MyString desc, 
      TreqReaperCallback callback, Service *base)
{
      m_reaper_func_desc = desc;
      m_reaper_func = callback;
      m_reaper_func_this = base;
}

TreqAction
TransferRequest::call_reaper_callback(TransferRequest *treq)
{
      return (m_reaper_func_this->*(m_reaper_func))(treq);
}


// The transferd only needs a small amount of the information from this
// transfer request seriazed to it from the schedd. Basically the m_ip
// classad which represents the header information for this treq, and
// the job ads.
int
TransferRequest::put(Stream *sock)
{
      ClassAd *ad = NULL;

      sock->encode();

      // shove the internal header classad across
      m_ip->put(*sock);
      sock->eom();

      // now dump all of the jobads through
      m_todo_ads.Rewind();
      while(m_todo_ads.Next(ad))
      {
            ad->put(*sock);
            sock->eom();
      }

      return TRUE;
}

//////////////////////////////////////////////////////////////////////////////
// utility functions for enum conversions.

EncapMethod encap_method(MyString &line)
{
      if (line == "ENCAPSULATION_METHOD_OLD_CLASSADS") {
            return ENCAP_METHOD_OLD_CLASSADS;
      }

      return ENCAP_METHOD_UNKNOWN;
}

TreqMode transfer_mode(MyString mode)
{
      return transfer_mode(mode.Value());
}

TreqMode transfer_mode(const char *mode)
{
      if (mode == "Active") {
            return TREQ_MODE_ACTIVE;
      }

      if (mode == "ActiveShadow") {
            return TREQ_MODE_ACTIVE_SHADOW; /* XXX DEMO mode */
      }

      if (mode == "Passive") {
            return TREQ_MODE_PASSIVE;
      }

      return TREQ_MODE_UNKNOWN;
}


Generated by  Doxygen 1.6.0   Back to index