00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "resourcebase.h"
00022 #include "agentbase_p.h"
00023
00024 #include "resourceadaptor.h"
00025 #include "collectionsync.h"
00026 #include "itemsync.h"
00027 #include "resourcescheduler.h"
00028 #include "tracerinterface.h"
00029 #include "xdgbasedirs_p.h"
00030
00031 #include "changerecorder.h"
00032 #include "collectionfetchjob.h"
00033 #include "collectionmodifyjob.h"
00034 #include "itemfetchjob.h"
00035 #include "itemfetchscope.h"
00036 #include "itemmodifyjob.h"
00037 #include "itemmodifyjob_p.h"
00038 #include "session.h"
00039
00040 #include <kaboutdata.h>
00041 #include <kcmdlineargs.h>
00042 #include <kdebug.h>
00043 #include <klocale.h>
00044
00045 #include <QtCore/QDebug>
00046 #include <QtCore/QDir>
00047 #include <QtCore/QHash>
00048 #include <QtCore/QSettings>
00049 #include <QtCore/QTimer>
00050 #include <QtGui/QApplication>
00051 #include <QtDBus/QtDBus>
00052
00053 using namespace Akonadi;
00054
00055 class Akonadi::ResourceBasePrivate : public AgentBasePrivate
00056 {
00057 public:
00058 ResourceBasePrivate( ResourceBase *parent )
00059 : AgentBasePrivate( parent ),
00060 scheduler( 0 ),
00061 mItemSyncer( 0 )
00062 {
00063 mStatusMessage = defaultReadyMessage();
00064 }
00065
00066 Q_DECLARE_PUBLIC( ResourceBase )
00067
00068 void delayedInit()
00069 {
00070 if ( !QDBusConnection::sessionBus().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) )
00071 kFatal() << "Unable to register service at D-Bus: " << QDBusConnection::sessionBus().lastError().message();
00072 AgentBasePrivate::delayedInit();
00073 }
00074
00075 virtual void changeProcessed()
00076 {
00077 mMonitor->changeProcessed();
00078 if ( !mMonitor->isEmpty() )
00079 scheduler->scheduleChangeReplay();
00080 scheduler->taskDone();
00081 }
00082
00083 void slotDeliveryDone( KJob* job );
00084
00085 void slotCollectionSyncDone( KJob *job );
00086 void slotLocalListDone( KJob *job );
00087 void slotSynchronizeCollection( const Collection &col );
00088 void slotCollectionListDone( KJob *job );
00089
00090 void slotItemSyncDone( KJob *job );
00091
00092 void slotPercent( KJob* job, unsigned long percent );
00093
00094 QString mName;
00095
00096
00097 Collection currentCollection;
00098
00099 ResourceScheduler *scheduler;
00100 ItemSync *mItemSyncer;
00101 };
00102
00103 ResourceBase::ResourceBase( const QString & id )
00104 : AgentBase( new ResourceBasePrivate( this ), id )
00105 {
00106 Q_D( ResourceBase );
00107
00108 new ResourceAdaptor( this );
00109
00110 const QString name = d->mSettings->value( QLatin1String( "Resource/Name" ) ).toString();
00111 if ( !name.isEmpty() )
00112 d->mName = name;
00113
00114 d->scheduler = new ResourceScheduler( this );
00115
00116 d->mMonitor->setChangeRecordingEnabled( true );
00117 connect( d->mMonitor, SIGNAL(changesAdded()),
00118 d->scheduler, SLOT(scheduleChangeReplay()) );
00119
00120 d->mMonitor->setResourceMonitored( d->mId.toLatin1() );
00121
00122 connect( d->scheduler, SIGNAL(executeFullSync()),
00123 SLOT(retrieveCollections()) );
00124 connect( d->scheduler, SIGNAL(executeCollectionTreeSync()),
00125 SLOT(retrieveCollections()) );
00126 connect( d->scheduler, SIGNAL(executeCollectionSync(Akonadi::Collection)),
00127 SLOT(slotSynchronizeCollection(Akonadi::Collection)) );
00128 connect( d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)),
00129 SLOT(retrieveItem(Akonadi::Item,QSet<QByteArray>)) );
00130 connect( d->scheduler, SIGNAL( status( int, QString ) ),
00131 SIGNAL( status( int, QString ) ) );
00132 connect( d->scheduler, SIGNAL(executeChangeReplay()),
00133 d->mMonitor, SLOT(replayNext()) );
00134
00135 d->scheduler->setOnline( d->mOnline );
00136 if ( !d->mMonitor->isEmpty() )
00137 d->scheduler->scheduleChangeReplay();
00138 }
00139
00140 ResourceBase::~ResourceBase()
00141 {
00142 }
00143
00144 void ResourceBase::synchronize()
00145 {
00146 d_func()->scheduler->scheduleFullSync();
00147 }
00148
00149 void ResourceBase::setName( const QString &name )
00150 {
00151 Q_D( ResourceBase );
00152 if ( name == d->mName )
00153 return;
00154
00155
00156 d->mName = name;
00157
00158 if ( d->mName.isEmpty() || d->mName == d->mId )
00159 d->mSettings->remove( QLatin1String( "Resource/Name" ) );
00160 else
00161 d->mSettings->setValue( QLatin1String( "Resource/Name" ), d->mName );
00162
00163 d->mSettings->sync();
00164
00165 emit nameChanged( d->mName );
00166 }
00167
00168 QString ResourceBase::name() const
00169 {
00170 Q_D( const ResourceBase );
00171 if ( d->mName.isEmpty() )
00172 return d->mId;
00173 else
00174 return d->mName;
00175 }
00176
00177 static char* sAppName = 0;
00178
00179 QString ResourceBase::parseArguments( int argc, char **argv )
00180 {
00181 QString identifier;
00182 if ( argc < 3 ) {
00183 kDebug( 5250 ) << "Not enough arguments passed...";
00184 exit( 1 );
00185 }
00186
00187 for ( int i = 1; i < argc - 1; ++i ) {
00188 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) )
00189 identifier = QLatin1String( argv[ i + 1 ] );
00190 }
00191
00192 if ( identifier.isEmpty() ) {
00193 kDebug( 5250 ) << "Identifier argument missing";
00194 exit( 1 );
00195 }
00196
00197 sAppName = qstrdup( identifier.toLatin1().constData() );
00198 KCmdLineArgs::init( argc, argv, sAppName, 0,
00199 ki18nc("@title, application name", "Akonadi Resource"), "0.1",
00200 ki18nc("@title, application description", "Akonadi Resource") );
00201
00202 KCmdLineOptions options;
00203 options.add("identifier <argument>",
00204 ki18nc("@label, commandline option", "Resource identifier"));
00205 KCmdLineArgs::addCmdLineOptions( options );
00206
00207 return identifier;
00208 }
00209
00210 int ResourceBase::init( ResourceBase *r )
00211 {
00212 QApplication::setQuitOnLastWindowClosed( false );
00213 int rv = kapp->exec();
00214 delete r;
00215 delete[] sAppName;
00216 return rv;
00217 }
00218
00219 void ResourceBase::itemRetrieved( const Item &item )
00220 {
00221 Q_D( ResourceBase );
00222 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem );
00223 if ( !item.isValid() ) {
00224 QDBusMessage reply( d->scheduler->currentTask().dbusMsg );
00225 reply << false;
00226 QDBusConnection::sessionBus().send( reply );
00227 d->scheduler->taskDone();
00228 return;
00229 }
00230
00231 Item i( item );
00232 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
00233 foreach ( const QByteArray &part, requestedParts ) {
00234 if ( !item.loadedPayloadParts().contains( part ) ) {
00235 kWarning( 5250 ) << "Item does not provide part" << part;
00236 }
00237 }
00238
00239 ItemModifyJob *job = new ItemModifyJob( i );
00240
00241 job->disableRevisionCheck();
00242 connect( job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)) );
00243 }
00244
00245 void ResourceBasePrivate::slotDeliveryDone(KJob * job)
00246 {
00247 Q_Q( ResourceBase );
00248 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem );
00249 QDBusMessage reply( scheduler->currentTask().dbusMsg );
00250 if ( job->error() ) {
00251 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() );
00252 reply << false;
00253 } else {
00254 reply << true;
00255 }
00256 QDBusConnection::sessionBus().send( reply );
00257 scheduler->taskDone();
00258 }
00259
00260 void ResourceBase::changeCommitted(const Item& item)
00261 {
00262 Q_D( ResourceBase );
00263 ItemModifyJob *job = new ItemModifyJob( item );
00264 job->d_func()->setClean();
00265 job->disableRevisionCheck();
00266 d->changeProcessed();
00267 }
00268
00269 void ResourceBase::changeCommitted( const Collection &collection )
00270 {
00271 Q_D( ResourceBase );
00272 CollectionModifyJob *job = new CollectionModifyJob( collection );
00273 Q_UNUSED( job );
00274
00275 d->changeProcessed();
00276 }
00277
00278 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId,
00279 const QString &mimeType, const QStringList &_parts )
00280 {
00281 Q_D( ResourceBase );
00282 if ( !isOnline() ) {
00283 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) );
00284 return false;
00285 }
00286
00287 setDelayedReply( true );
00288
00289 Item item( uid );
00290 item.setMimeType( mimeType );
00291 item.setRemoteId( remoteId );
00292
00293 QSet<QByteArray> parts;
00294 Q_FOREACH( const QString &str, _parts )
00295 parts.insert( str.toLatin1() );
00296
00297 d->scheduler->scheduleItemFetch( item, parts, message().createReply() );
00298
00299 return true;
00300 }
00301
00302 void ResourceBase::collectionsRetrieved(const Collection::List & collections)
00303 {
00304 Q_D( ResourceBase );
00305 CollectionSync *syncer = new CollectionSync( d->mId );
00306 syncer->setRemoteCollections( collections );
00307 connect( syncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) );
00308 }
00309
00310 void ResourceBase::collectionsRetrievedIncremental(const Collection::List & changedCollections, const Collection::List & removedCollections)
00311 {
00312 Q_D( ResourceBase );
00313 CollectionSync *syncer = new CollectionSync( d->mId );
00314 syncer->setRemoteCollections( changedCollections, removedCollections );
00315 connect( syncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) );
00316 }
00317
00318 void ResourceBasePrivate::slotCollectionSyncDone(KJob * job)
00319 {
00320 Q_Q( ResourceBase );
00321 if ( job->error() ) {
00322 emit q->error( job->errorString() );
00323 } else {
00324 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) {
00325 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive );
00326 list->setResource( mId );
00327 q->connect( list, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)) );
00328 return;
00329 }
00330 }
00331 scheduler->taskDone();
00332 }
00333
00334 void ResourceBasePrivate::slotLocalListDone(KJob * job)
00335 {
00336 Q_Q( ResourceBase );
00337 if ( job->error() ) {
00338 emit q->error( job->errorString() );
00339 } else {
00340 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections();
00341 foreach ( const Collection &col, cols ) {
00342 scheduler->scheduleSync( col );
00343 }
00344 }
00345 scheduler->taskDone();
00346 }
00347
00348 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col )
00349 {
00350 Q_Q( ResourceBase );
00351 currentCollection = col;
00352
00353 QStringList contentTypes = currentCollection.contentMimeTypes();
00354 contentTypes.removeAll( Collection::mimeType() );
00355 if ( !contentTypes.isEmpty() ) {
00356 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) );
00357 q->retrieveItems( currentCollection );
00358 return;
00359 }
00360 scheduler->taskDone();
00361 }
00362
00363 void ResourceBase::itemsRetrievalDone()
00364 {
00365 Q_D( ResourceBase );
00366
00367 if ( d->mItemSyncer ) {
00368 d->mItemSyncer->deliveryDone();
00369 }
00370
00371 else {
00372 d->scheduler->taskDone();
00373 }
00374 }
00375
00376 Collection ResourceBase::currentCollection() const
00377 {
00378 Q_D( const ResourceBase );
00379 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection ,
00380 "ResourceBase::currentCollection()",
00381 "Trying to access current collection although no item retrieval is in progress" );
00382 return d->currentCollection;
00383 }
00384
00385 Item ResourceBase::currentItem() const
00386 {
00387 Q_D( const ResourceBase );
00388 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
00389 "ResourceBase::currentItem()",
00390 "Trying to access current item although no item retrieval is in progress" );
00391 return d->scheduler->currentTask().item;
00392 }
00393
00394 void ResourceBase::synchronizeCollectionTree()
00395 {
00396 d_func()->scheduler->scheduleCollectionTreeSync();
00397 }
00398
00399 void ResourceBase::cancelTask()
00400 {
00401 Q_D( ResourceBase );
00402 switch ( d->scheduler->currentTask().type ) {
00403 case ResourceScheduler::FetchItem:
00404 itemRetrieved( Item() );
00405 break;
00406 case ResourceScheduler::ChangeReplay:
00407 d->changeProcessed();
00408 break;
00409 default:
00410 d->scheduler->taskDone();
00411 }
00412 }
00413
00414 void ResourceBase::cancelTask( const QString &msg )
00415 {
00416 cancelTask();
00417
00418 emit error( msg );
00419 }
00420
00421 void ResourceBase::doSetOnline( bool state )
00422 {
00423 d_func()->scheduler->setOnline( state );
00424 }
00425
00426 void ResourceBase::synchronizeCollection(qint64 collectionId )
00427 {
00428 CollectionFetchJob* job = new CollectionFetchJob( Collection(collectionId), CollectionFetchJob::Base );
00429 job->setResource( identifier() );
00430 connect( job, SIGNAL(result(KJob*)), SLOT(slotCollectionListDone(KJob*)) );
00431 }
00432
00433 void ResourceBasePrivate::slotCollectionListDone( KJob *job )
00434 {
00435 if ( !job->error() ) {
00436 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections();
00437 if ( !list.isEmpty() ) {
00438 Collection col = list.first();
00439 scheduler->scheduleSync( col );
00440 }
00441 }
00442
00443 }
00444
00445 void ResourceBase::setTotalItems( int amount )
00446 {
00447 kDebug() << amount;
00448 Q_D( ResourceBase );
00449 setItemStreamingEnabled( true );
00450 d->mItemSyncer->setTotalItems( amount );
00451 }
00452
00453 void ResourceBase::setItemStreamingEnabled( bool enable )
00454 {
00455 Q_D( ResourceBase );
00456 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00457 "ResourceBase::setItemStreamingEnabled()",
00458 "Calling setItemStreamingEnabled() although no item retrieval is in progress" );
00459 if ( !d->mItemSyncer ) {
00460 d->mItemSyncer = new ItemSync( currentCollection() );
00461 connect( d->mItemSyncer, SIGNAL(percent(KJob*,unsigned long)), SLOT(slotPercent(KJob*,unsigned long)) );
00462 connect( d->mItemSyncer, SIGNAL(result(KJob*)), SLOT(slotItemSyncDone(KJob*)) );
00463 }
00464 d->mItemSyncer->setStreamingEnabled( enable );
00465 }
00466
00467 void ResourceBase::itemsRetrieved( const Item::List &items )
00468 {
00469 Q_D( ResourceBase );
00470 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00471 "ResourceBase::itemsRetrieved()",
00472 "Calling itemsRetrieved() although no item retrieval is in progress" );
00473 if ( !d->mItemSyncer ) {
00474 d->mItemSyncer = new ItemSync( currentCollection() );
00475 connect( d->mItemSyncer, SIGNAL(percent(KJob*,unsigned long)), SLOT(slotPercent(KJob*,unsigned long)) );
00476 connect( d->mItemSyncer, SIGNAL(result(KJob*)), SLOT(slotItemSyncDone(KJob*)) );
00477 }
00478 d->mItemSyncer->setFullSyncItems( items );
00479 }
00480
00481 void ResourceBase::itemsRetrievedIncremental(const Item::List &changedItems, const Item::List &removedItems)
00482 {
00483 Q_D( ResourceBase );
00484 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00485 "ResourceBase::itemsRetrievedIncremental()",
00486 "Calling itemsRetrievedIncremental() although no item retrieval is in progress" );
00487 if ( !d->mItemSyncer ) {
00488 d->mItemSyncer = new ItemSync( currentCollection() );
00489 connect( d->mItemSyncer, SIGNAL(percent(KJob*,unsigned long)), SLOT(slotPercent(KJob*,unsigned long)) );
00490 connect( d->mItemSyncer, SIGNAL(result(KJob*)), SLOT(slotItemSyncDone(KJob*)) );
00491 }
00492 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems );
00493 }
00494
00495 void ResourceBasePrivate::slotItemSyncDone( KJob *job )
00496 {
00497 mItemSyncer = 0;
00498 Q_Q( ResourceBase );
00499 if ( job->error() ) {
00500 emit q->error( job->errorString() );
00501 }
00502 scheduler->taskDone();
00503 }
00504
00505 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent )
00506 {
00507 Q_Q( ResourceBase );
00508 Q_UNUSED( job );
00509 emit q->percent( percent );
00510 }
00511
00512 #include "resourcebase.moc"