replication.re 25.3 KB
Newer Older
ccacciari's avatar
ccacciari committed
1
2
3
################################################################################
#                                                                              #
# Module Replication:                                                          #
4
5
#        - enable transfer single file                                         #   
#        - enable transfer collection                                          #
6
#        - enable transfer all files whose transfers have previously failed    #
ccacciari's avatar
ccacciari committed
7
8
9
10
11
#                                                                              #
################################################################################

# List of the functions:
#
12
# EUDATUpdateLogging(*status_transfer_success, *source, *destination, *cause)
13
# EUDATCheckIntegrity(*source, *source_res, *destination, *dest_res, *logEnabled,*notification,*response)
14
# EUDATReplication(*source, *destination, *dest_res, *registered, *recursive)
15
# EUDATTransferUsingFailLog(*buffer_length, *stats)
16
# EUDATRegDataRepl(*source, *destination, *dest_res, *recursive, *response)
17
# EUDATPIDRegistration(*source, *destination, *notification, *registration_response)
18
# EUDATSearchAndCreatePID(*path, *pid)
19
# EUDATSearchAndDefineField(*path, *pid, *key)
Claudio Cacciari's avatar
Claudio Cacciari committed
20
# EUDATCheckIntegrityColl(*sCollPesult*source_res, *dCollPath, *dest_res, *logEnabled, *response) 
21
# EUDATCheckIntegrityDO(*source, *source_res, *destination, *dest_res, *logEnabled, *response)
22

23

24
# Update the logging files specific for EUDAT B2SAFE
25
26
# 
# Parameters:
27
#    *status_transfer_success    [IN] Status of transfered file (true or false)     
28
29
#    *source                     [IN] path of transfered file in iRODS
#    *destination                [IN] path of the destination file in iRODS
30
#    *cause                      [IN] cause of the failed transfer
31
#
Long Phan's avatar
Long Phan committed
32
# Author: Long Phan, JSC
33
# Modified by Claudio Cacciari, Cineca
34
#-------------------------------------------------------------------------------
35
EUDATUpdateLogging(*status_transfer_success, *source, *destination, *cause) {
36
37
38

    # Update Logging Statistical File
    *level = "INFO";
39
40
    *message = "*source::*destination::" ++ "*status_transfer_success::*cause";
    if (!*status_transfer_success) {
41
        EUDATQueue("push", *message, 0);
42
        *level = "ERROR";
43
    } 
44
45
46
    EUDATLog(*message, *level);
}

47
# Checks differences about checksum and size between two paths
48
49
# 
# Parameters:
50
#    *source         [IN] source path in iRODS
51
#    *source_res     [IN]  iRODS resource name of the source
52
#    *destination    [IN] detination path in iRODS
53
#    *dest_res       [IN]  iRODS resource name of the destination
54
55
#    *logEnabled     [IN] boolean value: "true" to enable the logging system, 
#                                        "false" to silence it.
56
#    *notification   [IN] value [0|1]: if 1 enable the notification via messaging system
57
58
#    *response       [OUT] the reason of the failure
#
Long Phan's avatar
Long Phan committed
59
# Author: Long Phan, JSC
60
# Modified by Claudio Cacciari, Cineca
61
#-------------------------------------------------------------------------------
62
63
EUDATCheckIntegrity(*source, *source_res, *destination, *dest_res, 
                    *logEnabled, *notification, *response) {
64

65
    *status_transfer_success = bool("true");
66
67
68
69

    msiGetObjType(*source,*source_type);
    if (*source_type == '-c') {
        logDebug("source path *source is a collection");
70
71
72
        *status_transfer_success = EUDATCheckIntegrityColl(*source, *source_res,
                                                           *destination, *dest_res,
                                                           *logEnabled, *response);
73
74
    } else if (*source_type == '-d') {
        logDebug("source path *source is a data object");
75
76
77
        *status_transfer_success = EUDATCheckIntegrityDO(*source, *source_res, 
                                                         *destination, *dest_res, 
                                                         *logEnabled, *response);
78
79
80
    }
  
    if (!*status_transfer_success) {
81
        logError("[EUDATCheckIntegrity] *source and *destination are not coherent: *response");
82
    } else {
83
        logInfo("[EUDATCheckIntegrity] *source and *destination are coherent");
84
85
    }

86
87
88
89
90
91
#    if (*notification == 1) {
#        EUDATGetZoneNameFromPath(*source, *zone);
#        *queue = *zone ++ "_" ++ $userNameClient;
#        *message = "status:*status_transfer_success;response:*source::*destination::*response";
#        EUDATMessage(*queue, *message);
#    }
92

93
    *status_transfer_success;
94
95
}

96
EUDATCheckIntegrity(*source, *destination, *logEnabled, *notification, *response) {
Claudio Cacciari's avatar
Claudio Cacciari committed
97
98
99
100
101
    *source_res = "null";
    *dest_res = "null";
    *status_transfer_success = EUDATCheckIntegrity(*source, *source_res, *destination, *dest_res, 
                                                   *logEnabled, *notification, *response);
    *status_transfer_success;
102
103
}

104
# Data set replication
105
106
#
# Parameters:
107
#    *source      [IN] path of the source data set in iRODS
108
#    *destination [IN] destination of replication in iRODS
109
#    *dest_res    [IN] resource of the destination
110
#    *registered  [IN] boolean value: "true" for registered data, "false" otherwise
111
112
#    *recursive   [IN] boolean value: "true" to enable the recursive replication
#                      of registered data, "false" otherwise.
113
#    *response    [OUT]the result of the replication
114
# 
Long Phan's avatar
Long Phan committed
115
# Author: Long Phan, JSC
116
# Modified by Claudio Cacciari, Cineca
117
#-------------------------------------------------------------------------------
118
EUDATReplication(*source, *destination, *dest_res, *registered, *recursive, *response) {
119

120
    logInfo("[EUDATReplication] transfering *source to *destination"); 
121
122
    *status = bool("true");
    *response = "";
123
124
125
    if ((*dest_res == "") || (*dest_res == "None")) {
        *dest_res = "null";
    }
126

127
    # Catch Error CAT_NO_ACCESS_PERMISSION before replication
128
129
    if (errormsg(EUDATCatchErrorDataOwner(*source,*msg), *errmsg) < 0) {

130
        logDebug("[EUDATReplication] *errmsg");
131
        *status = bool("false");
132
133
        *response = "no access permission to the path *source for user $userNameClient";
        EUDATUpdateLogging(*status,*source,*destination,*response);
134
135
136
137

    } else {

        logInfo("[EUDATReplication] *msg");
138
        if (EUDATtoBoolean(*registered)) {
139
            logDebug("[EUDATReplication] replicating registered data");
140
            *status = EUDATRegDataRepl(*source, *destination, *dest_res, EUDATtoBoolean(*recursive), *response);
141
        } else {
142
            logDebug("[EUDATReplication] replicating data without PID registration");
143
144
            msiGetObjType(*source,*source_type);
            if (*source_type == '-c')  {
145
                msiCollRsync(*source,*destination,*dest_res,"IRODS_TO_IRODS",*rsyncStatus);
146
147
            }
            else if (*source_type == '-d') {            
148
                msiDataObjRsync(*source,"IRODS_TO_IRODS",*dest_res,*destination,*rsyncStatus);
149
            }
150
            if (*rsyncStatus != 0) {
151
                logDebug("[EUDATReplication] perform a further verification about checksum and size");
152
                *logEnabled = bool("true");
153
                *notification = 0;
Claudio Cacciari's avatar
Claudio Cacciari committed
154
                *source_res = "null";
155
156
                *status = EUDATCheckIntegrity(*source, *source_res, *destination, *dest_res,
                                              *logEnabled, *notification, *response);
157
158
159
            }
        }
    }   
160

161
162
163
164
    if (*status) { 
        *response = "*source::*destination::*dest_res"
                 ++ "::registered=*registered::recursive=*recursive";
    }
165
    logDebug("[EUDATReplication] response = *response");
166

167
    *status;
168
169
}

170
EUDATReplication(*source, *destination, *registered, *recursive, *response) {
Claudio Cacciari's avatar
Claudio Cacciari committed
171
172
173
    *dest_res = "null";
    *status = EUDATReplication(*source, *destination, *dest_res, *registered, *recursive, *response);
    *status;
174
175
}

176
177
178
# Transfer all data object saved in the logging system,
# according to the format: cause::path_of_transfer_file::target_of_transfer_file.
#
179
# Parameters:
180
#   *buffer_length    [IN] max number of failed transfers to process.
181
#                          It has to be > 1.
182
#   *stats           [OUT] the number of successful transfers
183
#
Long Phan's avatar
Long Phan committed
184
# Author: Long Phan, JSC
185
# Modified by Claudio Cacciari, Cineca;
186
#-------------------------------------------------------------------------------
187
188
189
190
191
192
193
EUDATTransferUsingFailLog(*buffer_length, *stats) {

    logInfo("[EUDATTransferUsingFailLog] checking the last *buffer_length failed transfers");   
#TODO check the single error causes 
    *success_counter = 0;
    *failure_counter = 0;
#TODO add these parameters to the fail log messages
194
195
    *registered = "true";
    *recursive = "true";
196
 
197
198
199
    # get size of queue-log before transfer.    
    EUDATQueue("queuesize", *l, 0);
    EUDATQueue("pop", *messages, *buffer_length);
200
    
201
    *msg_list = split("*messages",",");    
202
    foreach (*message in *msg_list) {
203
204
        *message = triml(*message, "'");
        *message = trimr(*message, "'");
205
        logDebug("message: *message");
206
        *list = split("*message","::");
207
208
209

        *counter = 0;
        foreach (*item_LIST in *list) {
210
211
212
            if (*counter == 0) {*source = *item_LIST;}
            else if (*counter == 1) {*destination = *item_LIST;}
            else if (*counter == 3) {*cause = *item_LIST;}
ccacciari's avatar
ccacciari committed
213
            *counter = *counter + 1;
214
            if (*counter == 4) {break;}
215
        }
216
217
218
219
220
        logDebug("cause:*cause");
        logDebug("source:*source; target:*destination");
        *repl_status = EUDATReplication(*source, *destination, *registered, *recursive, *response);
        if (*repl_status) { *success_counter = *success_counter + 1; }
        else { *failure_counter = *failure_counter + 1; }
221
222
    }

223
224
    *stats = "# of successes:*success_counter, # of failures:*failure_counter";

225
    # get size of queue-log after transfer. 
226
    EUDATQueue("queuesize", *la, 0);
227
    logDebug("AFTER TRANSFER: Length of Queue = *la");
228
    if (int(*l) == int(*la)) {
229
230
        logDebug("No Data Objects have been transfered");
    } 
231
      
232
233
}

erastova's avatar
erastova committed
234
235
236
237
238
239
240
# Data object replication and PID management based on remote execution
# It is assumed that iRODS zone federation is established
#  and the replicated file is accessible from the source
#
# Parameters:
# *source       [IN] source path of the data object to be replicated
# *destination  [IN] destination path of the replicated data object
241
# *dest_res     [IN] resource of the destination
242
243
244
245
# *recursive    [IN] boolean value: "true" to consider all the sub-collections 
#                    and objects under the root collection, "false" to consider 
#                    only the root path.
# *response     [OUT] message about the reason of the failure
erastova's avatar
erastova committed
246
#
247
# Authors: Elena Erastova, RZG; Claudio Cacciari, Cineca
erastova's avatar
erastova committed
248
#-----------------------------------------------------------------------------
249
EUDATRegDataRepl(*source, *destination, *dest_res, *recursive, *response) {
erastova's avatar
erastova committed
250

251
252
    *status = bool("true");
    *response = "";
253
254
255
    if ((*dest_res == "") || (*dest_res == "None")) {
        *dest_res = "null";
    }    
256
257
258
259
260
261
262
263

    # initial value of parentPID
    *parentPID = "None";

    # search and create pid related to the source of the replication
    EUDATSearchAndCreatePID(*source, *parentPID);
    if (*parentPID == "empty" || (*parentPID == "None")) {
        *status = bool("false");
264
265
        *response = "PID is empty, no replication will be executed for *source";
        EUDATUpdateLogging(*status,*source,*destination,"empty PID");
266
        logDebug(*response);
267
268
    }
    else {
269
        logDebug("[EUDATRegDataRepl] PID exist for *source");
270
271
272
        msiGetObjType(*source,*source_type);
        if (*source_type == '-c')  {

273
274
            logDebug("[EUDATRegDataRepl] The path *source is a collection");
            logDebug("[EUDATRegDataRepl] Replication's beginning ...... ");
275
            msiCollRsync(*source,*destination,*dest_res,"IRODS_TO_IRODS",*rsyncStatus);
276
            if (*rsyncStatus != 0) {
277
                logDebug("[EUDATRegDataRepl] perform a further verification about checksum and size");
278
                *logEnabled = bool("true");
279
                *notification = 0;
Claudio Cacciari's avatar
Claudio Cacciari committed
280
                *source_res = "null";
281
282
                *status = EUDATCheckIntegrity(*source, *source_res, *destination, *dest_res,
                                              *logEnabled, *notification, *response);
283
284
285
            }
            
            if (*status) {
286
                *notification = 0;
287
                EUDATPIDRegistration(*source, *destination, *notification, *response);
288
289
290
291
292
                if (*response != "None") { *status = bool("false") }
                # update the parent PID of each replica with the related child PID
                if (*status && *recursive) {
               
                    *responses = "";
293
                    logDebug("[EUDATRegDataRepl] loop over the sub-collections"); 
294
295
296
                    msiStrlen(*source,*pathLength);
                    # loop over the sub-collections of the collection
                    foreach (*Row in SELECT COLL_NAME WHERE COLL_NAME like '*source/%') {
297
298
299
300
301
302
                        msiSubstr(*Row.COLL_NAME, str(int(*pathLength)+1), "null", *subCollection);
                        *target = "*destination/*subCollection";
                        EUDATPIDRegistration(*Row.COLL_NAME, *target, *notification, *singleRes);
                        if (*singleRes != "None") { 
                            *contents = *Row.COLL_NAME ++ '::*target::false::*singleRes';
                            *responses = *responses ++ *contents ++ ",";
303
                        }
304
                        *status = (*singleRes == "None") && *status;
305
                    }
306
                    logDebug("[EUDATRegDataRepl] loop over the objects of the collection");
307
308
                    # loop over the objects of the collection
                    foreach (*Row in SELECT DATA_NAME,COLL_NAME WHERE COLL_NAME = '*source' || like '*source/%') {
309
310
311
312
313
314
315
                        *objPath = *Row.COLL_NAME ++ '/' ++ *Row.DATA_NAME;
                        msiSubstr(*objPath, str(int(*pathLength)+1), "null", *subCollection);
                        *target = "*destination/*subCollection";
                        EUDATPIDRegistration(*objPath, *target, *notification, *singleRes);
                        if (*singleRes != "None") {                       
                            *contents = "*objPath::*target::false::*singleRes";
                            *responses = *responses ++ *contents ++ ",";
316
                        }
317
                        *status = (*singleRes == "None") && *status;
318
319
320
321
322
323
324
                    }
                    *response = trimr(*responses, ",");
                }
            }

        } else {
            
325
            logDebug("[EUDATRegDataRepl] Replication's beginning ...... ");
326
            msiDataObjRsync(*source,"IRODS_TO_IRODS",*dest_res,*destination,*rsyncStatus);
327
            if (*rsyncStatus != 0) {
328
                 logDebug("[EUDATRegDataRepl] perform a further verification about checksum and size");
329
                 *logEnabled = bool("true");
330
                 *notification = 0;
Claudio Cacciari's avatar
Claudio Cacciari committed
331
                 *source_res = "null";
332
333
                 *status = EUDATCheckIntegrity(*source, *source_res, *destination, *dest_res,
                                               *logEnabled, *notification, *response);
334
335
            }
            if (*status) {
336
                *notification = 0;
337
                # update the parent PID of the replica with the related child PID
338
                EUDATPIDRegistration(*source, *destination, *notification, *response);
339
                logDebug("[EUDATRegDataRepl] registration's response: *response");
340
341
342
343
344
345
346
347
                if (*response != "None") { *status = bool("false") }
            }
        }
    }

    *status;
}

348
EUDATRegDataRepl(*source, *destination, *recursive, *response) {
Claudio Cacciari's avatar
Claudio Cacciari committed
349
350
351
    *dest_res = "null";
    *status = EUDATRegDataRepl(*source, *destination, *dest_res, *recursive, *response);
    *status;
352
353
}

354
355
356
357
# Verify that a PID exist for a given path and optionally create it 
# if not found.
#
# Parameters:
358
359
360
361
# *source                [IN]  source iRODS path
# *destination           [IN]  target iRODS path
# *notification          [IN]  enable messaging for async call [0|1]
# *registration_response [OUT] a message containing the reason of the failure
362
363
364
#
# Author: Claudio, Cineca
#-----------------------------------------------------------------------------
365
EUDATPIDRegistration(*source, *destination, *notification, *registration_response) {
366
367
368
369

    logInfo("[EUDATPIDRegistration] registration of PIDs for replication from *source to *destination");

    *registration_response = "None";
erastova's avatar
erastova committed
370
371
    *parentPID = "None";
    *parentROR = "None";
372
    *childPID = "None";
Claudio Cacciari's avatar
Claudio Cacciari committed
373
374
    *fio = "None";
    *fixed = "False";
375
376
377

    EUDATGetZoneNameFromPath(*destination, *zoneName);
    EUDATGetZoneHostFromZoneName(*zoneName, *zoneConn);
378
    logDebug("[EUDATPIDRegistration] remote zone name: *zoneName, connection contact: *zoneConn");
379

380
    EUDATSearchAndCreatePID(*source, *parentPID);
381
    if (*parentPID == "empty" || (*parentPID == "None")) {
382
        *registration_response = "PID of source *source is None";
383
        logDebug(*registration_response);
384
        # Update Logging (Statistic File and Failed Files)
385
        EUDATUpdateLogging(bool("false"),*source,*destination,*registration_response);
386
387
    }
    else {
388
        *parentROR = EUDATSearchAndDefineField(*source, *parentPID, "EUDAT/ROR");
389
390
391
392
        if (*parentROR == "None") {
            logDebug("EUDAT/ROR was empty, using parent PID");
            *parentROR = *parentPID;
        }
393
        *fio = EUDATSearchAndDefineField(*source, *parentPID, "EUDAT/FIO");
394
395
396
397
        if (*fio == "None") {
            logDebug("EUDAT/FIO was empty, using parent PID");
            *fio = *parentPID;
        }
398
399
400
        *fixed = EUDATSearchAndDefineField(*source, *parentPID, "EUDAT/FIXED_CONTENT");
        logDebug("[EUDATPIDRegistration] the path *destination has EUDAT/PARENT=*parentPID, "
                 ++ "EUDAT/ROR=*parentROR, EUDAT/FIO=*fio, EUDAT/FIXED_CONTENT=*fixed");
401
402
403
        # create a PID for the replica which is done on the remote server
        # using remote execution
        remote(*zoneConn,"null") {
404
            EUDATCreatePID(*parentPID, *destination, *parentROR, *fio, *fixed, *childPID);
405
406
407
        }
        # update parent PID with a child one 
        # if the child exists in ICAT on the remote server
408
        if (*childPID != "None") {
409
410
411
412
413
414
415
416
417
            *replica = EUDATUpdatePIDWithNewChild(*parentPID, *childPID);
            if (*replica != "None") {
                EUDATCreateAVU("EUDAT/REPLICA", *replica, *source);
            }
            else {
                *registration_response = "REPLICA attribute of source *source is None";
                logDebug(*registration_response);
                EUDATUpdateLogging(bool("false"),*source,*destination,*registration_response);
            }
418
419
        }
        else {
420
            *registration_response = "PID of destination *destination is None";
421
            logDebug(*registration_response);
422
            EUDATUpdateLogging(bool("false"),*source,*destination,*registration_response);
423
        }
erastova's avatar
erastova committed
424
    }
425
   
426
427
428
429
430
431
432
433
#    if (*notification == 1) { 
#        if (*registration_response == "None") { *statusMsg = "true"; }
#        else { *statusMsg = "false"; }
#        EUDATGetZoneNameFromPath(*source, *zone);
#        *queue = *zone ++ "_" ++ $userNameClient;
#        *message = "status:*statusMsg;response:*source::*destination::*registration_response";
#        EUDATMessage(*queue, *message);
#    }
erastova's avatar
erastova committed
434
}
erastova's avatar
erastova committed
435

436
437
# Search PID for a given path and in case it is not found, 
# it creates a new PID.
erastova's avatar
erastova committed
438
439
#
# Parameters:
440
441
# *path    [IN] iRODS path
# *pid     [OUT] the related PID
erastova's avatar
erastova committed
442
#
443
# Author: Claudio, Cineca
erastova's avatar
erastova committed
444
#-----------------------------------------------------------------------------
445
EUDATSearchAndCreatePID(*path, *pid) {
erastova's avatar
erastova committed
446

447
    logDebug("[EUDATSearchAndCreatePID] query PID of path *path");
448
    EUDATgetLastAVU(*path, "PID", *pid);
449
    logDebug("[EUDATSearchAndCreatePID] retrieved the iCAT PID value *pid for path *path");
450
451
    # if there is no entry for the PID in ICAT, get it from EPIC
    if (*pid == "None") {
452
        EUDATCreatePID("None", *path, "None", "None", "false", *pid);
453
        EUDATiPIDcreate(*path, *pid);
erastova's avatar
erastova committed
454
    }
455
456
}

457
# Search key/value pair for a given path
458
459
460
#
# Parameters:
# *path       [IN]  iRODS path
461
462
# *pid        [IN]  the PID associated to path
# *key        [IN]  The name of the PID field
463
464
465
#
# Author: Claudio, Cineca
#-----------------------------------------------------------------------------
466
467
468
469
470
EUDATSearchAndDefineField(*path, *pid, *key) {

    logDebug("[EUDATSearchAndDefineField] query PID *pid and path *path to get *key");
    *val = "None";
    # get *key of the *path from ICAT
471
    EUDATgetLastAVU(*path, *key, *val);
472
473
474
475
476
477
478
479
    logDebug("[EUDATSearchAndDefineField] got *key=*val from iCAT");
    # if there is no entry for the *key in ICAT, get it from EPIC
    if (*val == "None") {
        *val = EUDATGeteValPid(*pid, *key);
        logDebug("[EUDATSearchAndDefineField] got *key=*val from PID service");
        # if there is a *key in EPIC create it in ICAT
        if (*val != "None") {
            EUDATCreateAVU(*key, *val, *path);
480
        }
481
    }
482
    *val
erastova's avatar
erastova committed
483
484
}

485
# Compare cheksums of data objects in the source and destination
486
487
488
489
#  collection recursively
#
# Parameters:
# *sCollPath    [IN] path of the source collection
490
# *source_res   [IN]  iRODS resource name of the source
491
# *dCollPath    [IN] path of the destination collection
492
# *dest_res     [IN]  iRODS resource name of the destination
493
494
495
496
# *logEnabled   [IN] boolean value: "true" to enable the logging system, 
#                                     "false" to silence it.
# *response     [OUT] the reason of the failure
#
497
498
499
500
# 
# return a boolean value: true: checksums of the DOs in the collections match
#                         false: checksums of the DOS in the collections
#                                do not match
501
502
#
# Author: Elena Erastova, RZG
503
# Author: Claudio Cacciari, Cineca
504
#-----------------------------------------------------------------------------
505
EUDATCheckIntegrityColl(*sCollPath, *source_res, *dCollPath, *dest_res, *logEnabled, *check_response) {
506

507
    if (*source_res == "" || *source_res == "None") { *source_res = "%" }
508
    *check_response = "";
509
510
    logInfo("[EUDATCheckIntegrityColl] Compare the size and the checksum"
            ++ " of DOs in *sCollPath:*source_res: and *dCollPath:*dest_res:");
511
512
513
514
515
516
517
518
519
520
521
522
523

    # Verify that source input path is a collection
    msiGetObjType(*sCollPath, *type);
    if (*type != '-c') {
        logError("Input path *collPath is not a collection");
        fail;
    }
    # Verify that destination input path is a collection
    msiGetObjType(*dCollPath, *type);
    if (*type != '-c') {
        logError("Input path *dCollPath is not a collection");
        fail;
    }
524
525

    *totalResult = bool("true");
526
527
528
529
530
    # Get the length of the source collection string
    msiStrlen(*sCollPath,*pathLength);
    # For each DO in the sorce collection recursively compare checksum amd size
    # with the DO in the destination collection.
    # If they do not match, write an error in the b2safe log
531
532
    foreach(*Row in SELECT DATA_NAME, DATA_RESC_NAME, COLL_NAME 
            WHERE DATA_RESC_NAME like '*source_res' and COLL_NAME = '*sCollPath' || like '*sCollPath/%') {
533
534
535
        *objPath = *Row.COLL_NAME ++ '/' ++ *Row.DATA_NAME;
        msiSubstr(*objPath, str(int(*pathLength)+1), "null", *subCollection);
        *destination = "*dCollPath/*subCollection";
536
        *result = EUDATCheckIntegrityDO(*objPath, *source_res, *destination, *dest_res, *logEnabled, *singleRes);
537
538
539
540
541
        if (!*result) {
            *contents = "*objPath::*destination::*result::*singleRes";
            *check_response = *check_response ++ *contents ++ ",";
        }
        *totalResult = *result && *totalResult;
542
    }
543
    *check_response = trimr(*check_response, ",");
544

545
    *totalResult;
546
547
}

548
EUDATCheckIntegrityColl(*sCollPath, *dCollPath, *logEnabled, *check_response) {
Claudio Cacciari's avatar
Claudio Cacciari committed
549
550
551
552
553
    *source_res = "null";
    *dest_res = "null";
    *totalResult =  EUDATCheckIntegrityColl(*sCollPath, *source_res, *dCollPath, *dest_res, 
                                            *logEnabled, *check_response);
    *totalResult;
554
555
}

556
557
558
559
560
# Checks differences about checksum and size between two Data Objects
# and log the result to the B2SAFE logging system
# 
# Parameters:
#    *source         [IN] path of source file in iRODS
561
#    *source_res     [IN]  iRODS resource name of the source
562
#    *destination    [IN] path of target file in iRODS
563
#    *dest_res       [IN]  iRODS resource name of the destination
564
565
566
#    *logEnabled     [IN] boolean value: "true" to enable the logging system, 
#                                        "false" to silence it.
#    *response       [OUT] the reason of the failure
567
568
#
# Author: Claudio Cacciari, Cineca
569
#-------------------------------------------------------------------------------
570
EUDATCheckIntegrityDO(*source, *source_res, *destination, *dest_res, *logEnabled, *response) {
571
572
        
    *status = bool("true");
573
    *response = "";
574
575
576

    *err = errorcode(msiObjStat(*destination, *objStat));
    if (*err < 0) {
577
        *response = "missing replicated object";
578
        *status = bool("false");
579
    } else if (!EUDATCatchErrorSize(*source, *source_res, *destination, *dest_res, *response)) {
580
        *status = bool("false");
581
    } else if (!EUDATCatchErrorChecksum(*source, *source_res, *destination, *dest_res, *response)) {
582
        *status = bool("false");
583
    }
584
585
586
    if (*logEnabled) {
        EUDATUpdateLogging(*status, *source, *destination, *response);
    }
587
588

    *status;
589
}
590
591

EUDATCheckIntegrityDO(*source, *destination, *logEnabled, *response) {
Claudio Cacciari's avatar
Claudio Cacciari committed
592
593
594
595
596
    *source_res = "null";
    *dest_res = "null";
    *status = EUDATCheckIntegrityDO(*source, *source_res, *destination, *dest_res, 
                                    *logEnabled, *response);
    *status;
597
}