00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "itemsync.h"
00022
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031
00032 #include <kdebug.h>
00033
00034 #include <QtCore/QStringList>
00035
00036 using namespace Akonadi;
00037
00041 class ItemSync::Private
00042 {
00043 public:
00044 Private( ItemSync *parent ) :
00045 q( parent ),
00046 mTransactionMode( Single ),
00047 mCurrentTransaction( 0 ),
00048 mTransactionJobs( 0 ),
00049 mPendingJobs( 0 ),
00050 mProgress( 0 ),
00051 mTotalItems( -1 ),
00052 mTotalItemsProcessed( 0 ),
00053 mStreaming( false ),
00054 mIncremental( false ),
00055 mLocalListDone( false ),
00056 mDeliveryDone( false ),
00057 mFinished( false )
00058 {
00059
00060 mFetchScope.fetchFullPayload();
00061 mFetchScope.fetchAllAttributes();
00062 }
00063
00064 void createLocalItem( const Item &item );
00065 void checkDone();
00066 void slotLocalListDone( KJob* );
00067 void slotLocalDeleteDone( KJob* );
00068 void slotLocalChangeDone( KJob* );
00069 void execute();
00070 void processItems();
00071 void deleteItems( const Item::List &items );
00072 void slotTransactionResult( KJob *job );
00073 Job* subjobParent() const;
00074
00075 ItemSync *q;
00076 Collection mSyncCollection;
00077 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00078 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00079 QSet<Akonadi::Item> mUnprocessedLocalItems;
00080
00081
00082 enum TransactionMode {
00083 Single,
00084 Chunkwise,
00085 None
00086 };
00087 TransactionMode mTransactionMode;
00088 TransactionSequence *mCurrentTransaction;
00089 int mTransactionJobs;
00090
00091
00092 ItemFetchScope mFetchScope;
00093
00094
00095 Akonadi::Item::List mRemoteItems;
00096
00097
00098 Item::List mRemovedRemoteItems;
00099
00100
00101 int mPendingJobs;
00102 int mProgress;
00103 int mTotalItems;
00104 int mTotalItemsProcessed;
00105
00106 bool mStreaming;
00107 bool mIncremental;
00108 bool mLocalListDone;
00109 bool mDeliveryDone;
00110 bool mFinished;
00111 };
00112
00113 void ItemSync::Private::createLocalItem( const Item & item )
00114 {
00115
00116 if ( q->error() )
00117 return;
00118 mPendingJobs++;
00119 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00120 q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00121 }
00122
00123 void ItemSync::Private::checkDone()
00124 {
00125 q->setProcessedAmount( KJob::Bytes, mProgress );
00126 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00127 return;
00128
00129 if ( !mFinished ) {
00130 mFinished = true;
00131 q->emitResult();
00132 }
00133 }
00134
00135 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00136 Job( parent ),
00137 d( new Private( this ) )
00138 {
00139 d->mSyncCollection = collection;
00140 }
00141
00142 ItemSync::~ItemSync()
00143 {
00144 delete d;
00145 }
00146
00147 void ItemSync::setFullSyncItems( const Item::List &items )
00148 {
00149 Q_ASSERT( !d->mIncremental );
00150 if ( !d->mStreaming )
00151 d->mDeliveryDone = true;
00152 d->mRemoteItems += items;
00153 d->mTotalItemsProcessed += items.count();
00154 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00155 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00156 if ( d->mTotalItemsProcessed == d->mTotalItems )
00157 d->mDeliveryDone = true;
00158 d->execute();
00159 }
00160
00161 void ItemSync::setTotalItems( int amount )
00162 {
00163 Q_ASSERT( !d->mIncremental );
00164 Q_ASSERT( amount >= 0 );
00165 setStreamingEnabled( true );
00166 kDebug() << amount;
00167 d->mTotalItems = amount;
00168 setTotalAmount( KJob::Bytes, amount );
00169 if ( d->mTotalItems == 0 ) {
00170 d->mDeliveryDone = true;
00171 d->execute();
00172 }
00173 }
00174
00175 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00176 {
00177 d->mIncremental = true;
00178 if ( !d->mStreaming )
00179 d->mDeliveryDone = true;
00180 d->mRemoteItems += changedItems;
00181 d->mRemovedRemoteItems += removedItems;
00182 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00183 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00184 if ( d->mTotalItemsProcessed == d->mTotalItems )
00185 d->mDeliveryDone = true;
00186 d->execute();
00187 }
00188
00189 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00190 {
00191 d->mFetchScope = fetchScope;
00192 }
00193
00194 ItemFetchScope &ItemSync::fetchScope()
00195 {
00196 return d->mFetchScope;
00197 }
00198
00199 void ItemSync::doStart()
00200 {
00201 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00202 job->setFetchScope( d->mFetchScope );
00203
00204
00205 job->fetchScope().setCacheOnly( true );
00206
00207 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00208 }
00209
00210 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00211 {
00212
00213 if ( error() )
00214 return false;
00215
00216
00217
00218
00219
00220
00221 if ( d->mIncremental )
00222 return true;
00223
00224
00225 if ( storedItem.flags() != newItem.flags() ) {
00226 kDebug() << "Stored flags " << storedItem.flags()
00227 << "new flags " << newItem.flags();
00228 return true;
00229 }
00230
00231
00232 QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00233 missingParts.subtract( storedItem.loadedPayloadParts() );
00234 if ( !missingParts.isEmpty() )
00235 return true;
00236
00237
00238
00239
00240 if ( newItem.hasPayload()
00241 && storedItem.payloadData() != newItem.payloadData() )
00242 return true;
00243
00244
00245 foreach ( Attribute* attr, newItem.attributes() ) {
00246 if ( !storedItem.hasAttribute( attr->type() ) )
00247 return true;
00248 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00249 return true;
00250 }
00251
00252 return false;
00253 }
00254
00255 void ItemSync::Private::slotLocalListDone( KJob * job )
00256 {
00257 if ( !job->error() ) {
00258 const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00259 foreach ( const Item &item, list ) {
00260 if ( item.remoteId().isEmpty() )
00261 continue;
00262 mLocalItemsById.insert( item.id(), item );
00263 mLocalItemsByRemoteId.insert( item.remoteId(), item );
00264 mUnprocessedLocalItems.insert( item );
00265 }
00266 }
00267
00268 mLocalListDone = true;
00269 execute();
00270 }
00271
00272 void ItemSync::Private::execute()
00273 {
00274 if ( !mLocalListDone )
00275 return;
00276
00277 if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00278 ++mTransactionJobs;
00279 mCurrentTransaction = new TransactionSequence( q );
00280 mCurrentTransaction->setAutomaticCommittingEnabled( false );
00281 connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) );
00282 }
00283
00284 processItems();
00285 if ( !mDeliveryDone ) {
00286 if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00287 mCurrentTransaction->commit();
00288 mCurrentTransaction = 0;
00289 }
00290 return;
00291 }
00292
00293
00294 if ( !mIncremental ) {
00295 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00296 mUnprocessedLocalItems.clear();
00297 }
00298
00299 deleteItems( mRemovedRemoteItems );
00300 mLocalItemsById.clear();
00301 mLocalItemsByRemoteId.clear();
00302 mRemovedRemoteItems.clear();
00303
00304 if ( mCurrentTransaction ) {
00305 mCurrentTransaction->commit();
00306 mCurrentTransaction = 0;
00307 }
00308
00309 checkDone();
00310 }
00311
00312 void ItemSync::Private::processItems()
00313 {
00314
00315 foreach ( Item remoteItem, mRemoteItems ) {
00316 #ifndef NDEBUG
00317 if ( remoteItem.remoteId().isEmpty() ) {
00318 kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00319 }
00320 #endif
00321
00322 Item localItem = mLocalItemsById.value( remoteItem.id() );
00323 if ( !localItem.isValid() )
00324 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00325 mUnprocessedLocalItems.remove( localItem );
00326
00327 if ( !localItem.isValid() ) {
00328 createLocalItem( remoteItem );
00329 continue;
00330 }
00331
00332 if ( q->updateItem( localItem, remoteItem ) ) {
00333 mPendingJobs++;
00334
00335 remoteItem.setId( localItem.id() );
00336 remoteItem.setRevision( localItem.revision() );
00337 remoteItem.setSize( localItem.size() );
00338 remoteItem.setRemoteId( localItem.remoteId() );
00339 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00340 q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00341 } else {
00342 mProgress++;
00343 }
00344 }
00345 mRemoteItems.clear();
00346 }
00347
00348 void ItemSync::Private::deleteItems( const Item::List &items )
00349 {
00350
00351 if ( q->error() )
00352 return;
00353
00354 Item::List itemsToDelete;
00355 foreach ( const Item &item, items ) {
00356 Item delItem( item );
00357 if ( !item.isValid() ) {
00358 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00359 }
00360
00361 if ( !delItem.isValid() ) {
00362 #ifndef NDEBUG
00363 kWarning() << "Delete item (remoteeId=" << delItem.remoteId()
00364 << "mimeType=" << delItem.mimeType()
00365 << ") does not have a valid UID and no item with that remote ID exists either";
00366 #endif
00367 continue;
00368 }
00369
00370 if ( delItem.remoteId().isEmpty() ) {
00371
00372 continue;
00373 }
00374
00375 itemsToDelete.append ( delItem );
00376 }
00377
00378 if ( !itemsToDelete.isEmpty() ) {
00379 mPendingJobs++;
00380 ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00381 q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) );
00382
00383
00384
00385
00386
00387 TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00388 if ( transaction )
00389 transaction->setIgnoreJobFailure( job );
00390 }
00391 }
00392
00393 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00394 {
00395 mPendingJobs--;
00396 mProgress++;
00397
00398 checkDone();
00399 }
00400
00401 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00402 {
00403 Q_UNUSED( job );
00404 mPendingJobs--;
00405 mProgress++;
00406
00407 checkDone();
00408 }
00409
00410 void ItemSync::Private::slotTransactionResult( KJob *job )
00411 {
00412 --mTransactionJobs;
00413 if ( mCurrentTransaction == job )
00414 mCurrentTransaction = 0;
00415
00416 checkDone();
00417 }
00418
00419 Job * ItemSync::Private::subjobParent() const
00420 {
00421 if ( mCurrentTransaction && mTransactionMode != None )
00422 return mCurrentTransaction;
00423 return q;
00424 }
00425
00426 void ItemSync::setStreamingEnabled(bool enable)
00427 {
00428 d->mStreaming = enable;
00429 }
00430
00431 void ItemSync::deliveryDone()
00432 {
00433 Q_ASSERT( d->mStreaming );
00434 d->mDeliveryDone = true;
00435 d->execute();
00436 }
00437
00438 void ItemSync::slotResult(KJob* job)
00439 {
00440 if ( job->error() ) {
00441
00442 Akonadi::Job::removeSubjob( job );
00443
00444 if ( !error() ) {
00445 setError( job->error() );
00446 setErrorText( job->errorText() );
00447 }
00448 } else {
00449 Akonadi::Job::slotResult( job );
00450 }
00451 }
00452
00453 void ItemSync::rollback()
00454 {
00455 setError( UserCanceled );
00456 if ( d->mCurrentTransaction )
00457 d->mCurrentTransaction->rollback();
00458 d->mDeliveryDone = true;
00459 d->execute();
00460 }
00461
00462
00463 #include "itemsync.moc"