Skip to content

file_bulk_serializer#

PartialFileSerializer#

Bases: FileSerializer

File serializer that does not validate required fields.

All fields have required=False and id is writable. Null id is treated as missing id.

Source code in src/apps/files/serializers/file_bulk_serializer.py
class PartialFileSerializer(FileSerializer):
    """File serializer that does not validate required fields.

    All fields have required=False and id is writable.
    Null id is treated as missing id.
    """

    class Meta(FileSerializer.Meta):
        extra_kwargs = {
            "id": {"read_only": False, "allow_null": True, "required": False},
        }

    def to_internal_value(self, data):
        val = super().to_internal_value(data)
        if val.get("id") is None:
            val.pop("id", None)

        val["original_data"] = data  # retain original data for error reporting
        val["errors"] = {}  # store for validation errors
        return val

BulkFileFail#

Source code in src/apps/files/serializers/file_bulk_serializer.py
@dataclass
class BulkFileFail:
    object: dict
    errors: dict

BulkAction#

Bases: Enum

Source code in src/apps/files/serializers/file_bulk_serializer.py
class BulkAction(Enum):
    INSERT = "insert"
    UPDATE = "update"
    UPSERT = "upsert"
    DELETE = "delete"

FileBulkSuccessSerializer#

Bases: Serializer

Source code in src/apps/files/serializers/file_bulk_serializer.py
class FileBulkSuccessSerializer(serializers.Serializer):
    object = PartialFileSerializer(help_text=_("Serialized file from database."))
    action = serializers.CharField()

FileBulkFailSerializer#

Bases: Serializer

Source code in src/apps/files/serializers/file_bulk_serializer.py
class FileBulkFailSerializer(serializers.Serializer):
    object = serializers.JSONField(help_text=_("Failed input data."))
    errors = serializers.JSONField(help_text=_("Errors organized by input field."))

FileBulkReturnValueSerializer#

Bases: Serializer

Source code in src/apps/files/serializers/file_bulk_serializer.py
class FileBulkReturnValueSerializer(serializers.Serializer):
    success = FileBulkSuccessSerializer(many=True)
    failed = FileBulkFailSerializer(many=True)

FileBulkSerializer#

Bases: ListSerializer

Serializer for bulk file creation.

Action parameter should be one of BulkAction values: * insert: Create new files. * update: Update existing files. * upsert: Create new files or replace already existing files. * delete: Delete existing files. Update supports partial updating. Values omitted from the request are not changed.

If input file has an id, it's treated as an existing file. If input file has a storage_identifier, its existence is checked from the database. If input file has no id and no storage_identifier, it's treated as a new file.

Call serializer.save() to apply changes. After this, serializer.data will return an object in the format { "success": [ { "object": { ... }, action: "insert" } , ], "failed": [ { "object": { ... }, errors: { ... } } , ] }

When ignore_errors is enabled (False by default), changes will be committed even if there are errors for some objects. Otherwise, only failed objects will be returned.

Where success objects will contain the deserialized file object and failed objects will contain the corresponding input data. Order of input files is maintained so successes and failed objects are in the same order as in the request.

Source code in src/apps/files/serializers/file_bulk_serializer.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class FileBulkSerializer(serializers.ListSerializer):
    """Serializer for bulk file creation.

    Action parameter should be one of BulkAction values:
    * insert: Create new files.
    * update: Update existing files.
    * upsert: Create new files or replace already existing files.
    * delete: Delete existing files.
    Update supports partial updating. Values omitted from the request are not changed.

    If input file has an id, it's treated as an existing file.
    If input file has a storage_identifier, its existence is checked from the database.
    If input file has no id and no storage_identifier, it's treated as a new file.

    Call serializer.save() to apply changes. After this,
    serializer.data will return an object in the format
    {
        "success": [
            { "object": { ... }, action: "insert" } ,
        ],
        "failed": [
            { "object": { ... }, errors: { ... } } ,
        ]
    }

    When ignore_errors is enabled (False by default), changes will be committed even if
    there are errors for some objects. Otherwise, only failed objects will be returned.

    Where success objects will contain the deserialized file object
    and failed objects will contain the corresponding input data.
    Order of input files is maintained so successes and failed objects
    are in the same order as in the request.
    """

    BULK_INSERT_ACTIONS = {BulkAction.INSERT, BulkAction.UPSERT}
    BULK_UPDATE_ACTIONS = {BulkAction.UPDATE, BulkAction.UPSERT}
    BULK_DELETE_ACTIONS = {BulkAction.DELETE}

    def __init__(self, *args, action: BulkAction, ignore_errors=False, **kwargs):
        self.action: BulkAction = action
        self.child = PartialFileSerializer(patch=action not in self.BULK_INSERT_ACTIONS)
        self.ignore_errors = ignore_errors
        super().__init__(*args, **kwargs)
        self.failed: List[BulkFileFail] = []

    def fail(self, object: dict, errors: dict):
        """Add object to list of failed items."""
        for key, error in errors.items():
            # simplify 1-length arrays from `[error]` to `error``
            if isinstance(error, list) and len(error) == 1:
                errors[key] = error[0]
        self.failed.append(BulkFileFail(object=object, errors=errors))

    @property
    def failed_as_dicts(self) -> List[dict]:
        return [asdict(fail) for fail in self.failed]

    def check_id_field_allowed(self, files: List[dict]) -> List[dict]:
        """Check that id field is allowed."""
        update_allowed = self.action in self.BULK_UPDATE_ACTIONS
        deleting = self.action in self.BULK_DELETE_ACTIONS
        if not (update_allowed or deleting):
            for file in files:
                if "id" in file:
                    file["errors"].setdefault("id", _("Field not allowed for inserting files."))
        return files

    def check_ids_exist(self, files: List[dict]) -> List[dict]:
        """Check that all file ids point to existing files."""
        data_ids = [f["id"] for f in files if "id" in f]
        existing_ids = set(File.objects.filter(id__in=data_ids).values_list("id", flat=True))

        for file in files:
            if "id" in file and file["id"] not in existing_ids:
                file["errors"].setdefault("id", _("File with id not found."))
        return files

    def group_files_by_storage_service(self, files: List[dict]) -> Dict[Optional[str], dict]:
        """Return files grouped by storage service."""
        files_by_service = {}
        for f in files:
            files_by_service.setdefault(f.get("storage_service"), []).append(f)
        return files_by_service

    def populate_id_from_external_identifier(self, files):
        """Add id value to files that already exist based on external id."""

        insert_allowed = self.action in self.BULK_INSERT_ACTIONS
        files_missing_id = [f for f in files if "id" not in f]
        for storage_service, storage_files in self.group_files_by_storage_service(
            files_missing_id
        ).items():
            if not storage_service:
                if not insert_allowed:
                    for f in storage_files:
                        f["errors"][
                            "storage_service"
                        ] = "Either storage_service or id is required."
                continue

            files_by_external_id = {}

            for f in storage_files:
                if external_id := f.get("storage_identifier"):
                    files_by_external_id.setdefault(external_id, []).append(f)
                elif not insert_allowed:
                    f["errors"][
                        "storage_identifier"
                    ] = "Either storage_identifier or id is required."

            # Get all files with matching external id
            existing_files = File.available_objects.filter(
                storage__storage_service=storage_service,
                storage_identifier__in=files_by_external_id.keys(),
            ).values(
                "storage_identifier",
                "id",
                csc_project=F("storage__csc_project"),
            )

            for existing_file in existing_files:
                for file in files_by_external_id[existing_file["storage_identifier"]]:
                    file["id"] = existing_file["id"]
                    # Project id can also be determined from external id when storage_service is known
                    if not file.get("csc_project"):
                        file["csc_project"] = existing_file["csc_project"]

        return files

    def check_duplicate_ids(self, files: List[dict]) -> List[dict]:
        """Check that same files are not being modified multiple times."""
        existing_files = [f for f in files if "id" in f]
        id_values = set()
        for f in existing_files:
            if f["id"] in id_values:
                f["errors"].setdefault("id", _("Duplicate file in request."))
            else:
                id_values.add(f["id"])
        return files

    def check_creating_new_allowed(self, files: List[dict]) -> List[dict]:
        """Check if inserting new files is allowed."""
        insert_allowed = self.action in self.BULK_INSERT_ACTIONS
        if not insert_allowed:
            # All files should be existing and have an id
            for file in files:
                if "id" not in file and "id" not in file["errors"]:
                    file["errors"]["id"] = "File not found."
        return files

    def check_changing_existing_allowed(self, files: List[dict]) -> List[dict]:
        """Check if inserting new files is allowed."""
        update_allowed = self.action in self.BULK_UPDATE_ACTIONS
        deleting = self.action in self.BULK_DELETE_ACTIONS
        # User-provided id is only allowed when updating
        if not (update_allowed or deleting):
            # All files should be new and not have an existing id
            for file in files:
                if "id" in file and "id" not in file["errors"]:
                    file["errors"]["id"] = _("File already exists.")
        return files

    def check_files_allowed_actions(self, files: List[dict]) -> List[dict]:
        """Check that only allowed actions are performed on files.

        Files containing an id field are assumed to exist."""
        files = self.check_creating_new_allowed(files)
        files = self.check_changing_existing_allowed(files)
        return files

    def assign_existing_storage_data(self, files: List[dict]) -> List[dict]:
        """Assign storage and related fields to existing file data."""
        existing_files_with_missing_project_data = [
            f for f in files if "id" in f and not ("storage_service" in f and "csc_project" in f)
        ]
        project_data = File.objects.filter(
            id__in=[f["id"] for f in existing_files_with_missing_project_data]
        ).values(
            "id",
            storage_service=F("storage__storage_service"),
            csc_project=F("storage__csc_project"),
        )
        project_data_by_id = {f["id"]: f for f in project_data}

        # Don't overwrite existing input values so user later gets an
        # error if attempting to modify the values
        for f in existing_files_with_missing_project_data:
            if project_data := project_data_by_id.get(f["id"]):
                if f.get("storage_service") is None:
                    f["storage_service"] = project_data["storage_service"]
                if f.get("csc_project") is None:
                    f["csc_project"] = project_data["csc_project"]
        return files

    def assign_storage_to_files(self, files: List[dict]) -> List[dict]:
        """Assign FileStorage instances to files."""
        if not files:
            return files

        allow_create = self.action in self.BULK_INSERT_ACTIONS
        files = FileStorage.objects.assign_to_file_data(
            files, allow_create=allow_create, raise_exception=False
        )
        return files

    def flush_file_errors(self, files: List[dict]) -> List[dict]:
        """Remove errors field and return only files that have no errors."""
        ok_values = []
        for f in files:
            if errors := f.pop("errors"):
                # Add invalid files to self.failed
                self.fail(object=f["original_data"], errors=errors)
            else:
                ok_values.append(f)
        return ok_values

    def to_internal_value(self, data) -> List[dict]:
        # Deserialize data and run basic validation
        if not isinstance(data, list):
            msg = self.error_messages["not_a_list"].format(input_type=type(data).__name__)
            raise serializers.ValidationError(
                {api_settings.NON_FIELD_ERRORS_KEY: [msg]}, code="not_a_list"
            )

        files = []
        for item in data:
            try:
                validated = self.child.run_validation(item)
                files.append(validated)
            except serializers.ValidationError as exc:
                self.fail(object=item, errors=exc.detail)

        # Identifier checks
        files = self.check_id_field_allowed(files)
        files = self.check_ids_exist(files)

        files = self.populate_id_from_external_identifier(files)
        files = self.check_duplicate_ids(files)

        # Checks for required and forbidden values
        files = self.check_files_allowed_actions(files)

        # Assign FileStorage and related values
        files = self.assign_existing_storage_data(files)
        files = self.assign_storage_to_files(files)

        # FileStorage-specific checks
        files = FileStorage.check_required_file_fields(files, raise_exception=False)
        files = FileStorage.check_file_data_conflicts(files, raise_exception=False)

        return self.flush_file_errors(files)

    def update_file_instance(self, instance, file_data) -> Optional[File]:
        """Update instance attributes of a File instance.

        Returns the updated file, or None if there were errors."""
        # store original data for error messages
        errors = {}
        for field, value in file_data.items():
            if field in FileSerializer.create_only_fields:
                existing_value = getattr(instance, field, None)
                if field == "storage":
                    existing_value = {"id": existing_value.id}
                if value != existing_value:
                    errors[field] = _("Cannot change value after creation")
            else:
                setattr(instance, field, value)  # assign new values
        if errors:
            self.fail(
                object=instance._original_data,
                errors=errors,
            )
            return None
        instance.record_modified = timezone.now()
        return instance

    def assign_characteristics(self, file: File, characteristics_data: Optional[dict]):
        """Assign characteristics data (not saved yet) to file."""
        if not characteristics_data:
            file.characteristics = None
            return
        if characteristics := file.characteristics:
            file.characteristics._changed = False
            for field, value in characteristics_data.items():
                if getattr(characteristics, field) != value:
                    file.characteristics._changed = True
                    setattr(characteristics, field, value)
        else:
            file.characteristics = FileCharacteristics(**characteristics_data)
            file.characteristics._changed = True

    def get_file_instances(self, validated_data) -> List[File]:
        """Return not yet saved instances from validated data."""
        system_creator = get_technical_metax_user()

        existing_files_by_id = (
            File.available_objects.prefetch_related("storage")
            .select_related("characteristics")
            .in_bulk([f["id"] for f in validated_data if "id" in f])
        )

        files = []
        for f in validated_data:
            characteristics_data = f.pop("characteristics", serializers.empty)
            if "id" not in f:  # new file
                # Note: To determine if a File instance is not yet in the DB,
                # use instance._state.adding. For a UUID-style id, the id is
                # set on instantiation instead of on save, so checking
                # `id is None` won't work.
                original_data = f.pop("original_data")
                f.pop("storage_service", None)  # included in FileStorage
                f.pop("csc_project", None)  # included in FileStorage
                file = File(**f, system_creator_id=system_creator)
                file._original_data = original_data
                files.append(file)
            else:  # existing file
                file = existing_files_by_id[f["id"]]
                file._original_data = f.pop("original_data")
                if self.action in self.BULK_UPDATE_ACTIONS:
                    file = self.update_file_instance(file, f)
                if file:
                    files.append(file)

            if file and characteristics_data is not serializers.empty:
                self.assign_characteristics(file, characteristics_data)
        return files

    def do_create_or_update(self, files: List[File]) -> List[dict]:
        """Perform bulk create and update actions for files."""
        fields_to_update = {
            field.name
            for field in File._meta.get_fields()
            if field.concrete and field.name not in {"id", *FileSerializer.create_only_fields}
        }
        characteristics_fields = [
            field.name
            for field in FileCharacteristics._meta.get_fields()
            if field.concrete and field.name != "id"
        ]

        # Create or update files and characteristics
        changed_characteristics = [
            file.characteristics
            for file in files
            if file.characteristics and getattr(file.characteristics, "_changed", False)
        ]
        FileCharacteristics.objects.bulk_create(
            changed_characteristics,
            batch_size=5000,
            update_conflicts=True,  # Update characteristics that already exist
            unique_fields=["id"],
            update_fields=characteristics_fields,
        )

        being_created = {f.id for f in files if f._state.adding}
        files = File.objects.bulk_create(
            files,
            batch_size=5000,
            update_conflicts=True,  # Update files that already exist
            unique_fields=["id"],
            update_fields=fields_to_update,
        )
        # Related objects need to be fetched again from DB after save
        prefetch_related_objects(files, "storage", "characteristics")

        # Cleanup any orphaned characteristics
        FileCharacteristics.objects.filter(file__isnull=True).delete()

        def file_action(file):
            if file.id in being_created:
                return BulkAction.INSERT
            else:
                return BulkAction.UPDATE

        return [
            {
                "object": f,
                "action": file_action(f).value,
            }
            for f in files
        ]

    def do_delete(self, files: List[File]) -> List[dict]:
        """Perform bulk delete on files."""
        now = timezone.now()
        for f in files:
            f.removed = now  # include new value in response

        # Update all files in db at once
        file_ids = [f.id for f in files]
        files_to_delete = File.objects.filter(id__in=file_ids)
        pre_files_deleted.send(sender=File, queryset=files_to_delete)  # Deprecate datasets
        files_to_delete.update(removed=now)

        return [
            {
                "object": f,
                "action": "delete",
            }
            for f in files
        ]

    def create(self, validated_data):
        """Perform bulk file action.

        Returns success objects which will be stored in self.instance by .save()."""
        if len(validated_data) == 0:
            return []

        files = self.get_file_instances(validated_data)

        # Return empty success list on error
        if not self.ignore_errors and self.failed:
            return []

        success_files: List[dict]
        is_insert = self.action in self.BULK_INSERT_ACTIONS
        is_update = self.action in self.BULK_UPDATE_ACTIONS
        if is_insert or is_update:
            success_files = self.do_create_or_update(files)
        elif self.action in self.BULK_DELETE_ACTIONS:
            success_files = self.do_delete(files)

        return success_files

    def to_representation(self, instance):
        return FileBulkReturnValueSerializer(
            {"success": instance, "failed": self.failed_as_dicts}
        ).data

    @property
    def data(self):
        """Skip ListSerializer.data that tries to return list."""
        return super(serializers.ListSerializer, self).data

data property #

Skip ListSerializer.data that tries to return list.

assign_characteristics(file, characteristics_data) #

Assign characteristics data (not saved yet) to file.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def assign_characteristics(self, file: File, characteristics_data: Optional[dict]):
    """Assign characteristics data (not saved yet) to file."""
    if not characteristics_data:
        file.characteristics = None
        return
    if characteristics := file.characteristics:
        file.characteristics._changed = False
        for field, value in characteristics_data.items():
            if getattr(characteristics, field) != value:
                file.characteristics._changed = True
                setattr(characteristics, field, value)
    else:
        file.characteristics = FileCharacteristics(**characteristics_data)
        file.characteristics._changed = True

assign_existing_storage_data(files) #

Assign storage and related fields to existing file data.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def assign_existing_storage_data(self, files: List[dict]) -> List[dict]:
    """Assign storage and related fields to existing file data."""
    existing_files_with_missing_project_data = [
        f for f in files if "id" in f and not ("storage_service" in f and "csc_project" in f)
    ]
    project_data = File.objects.filter(
        id__in=[f["id"] for f in existing_files_with_missing_project_data]
    ).values(
        "id",
        storage_service=F("storage__storage_service"),
        csc_project=F("storage__csc_project"),
    )
    project_data_by_id = {f["id"]: f for f in project_data}

    # Don't overwrite existing input values so user later gets an
    # error if attempting to modify the values
    for f in existing_files_with_missing_project_data:
        if project_data := project_data_by_id.get(f["id"]):
            if f.get("storage_service") is None:
                f["storage_service"] = project_data["storage_service"]
            if f.get("csc_project") is None:
                f["csc_project"] = project_data["csc_project"]
    return files

assign_storage_to_files(files) #

Assign FileStorage instances to files.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def assign_storage_to_files(self, files: List[dict]) -> List[dict]:
    """Assign FileStorage instances to files."""
    if not files:
        return files

    allow_create = self.action in self.BULK_INSERT_ACTIONS
    files = FileStorage.objects.assign_to_file_data(
        files, allow_create=allow_create, raise_exception=False
    )
    return files

check_changing_existing_allowed(files) #

Check if inserting new files is allowed.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def check_changing_existing_allowed(self, files: List[dict]) -> List[dict]:
    """Check if inserting new files is allowed."""
    update_allowed = self.action in self.BULK_UPDATE_ACTIONS
    deleting = self.action in self.BULK_DELETE_ACTIONS
    # User-provided id is only allowed when updating
    if not (update_allowed or deleting):
        # All files should be new and not have an existing id
        for file in files:
            if "id" in file and "id" not in file["errors"]:
                file["errors"]["id"] = _("File already exists.")
    return files

check_creating_new_allowed(files) #

Check if inserting new files is allowed.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def check_creating_new_allowed(self, files: List[dict]) -> List[dict]:
    """Check if inserting new files is allowed."""
    insert_allowed = self.action in self.BULK_INSERT_ACTIONS
    if not insert_allowed:
        # All files should be existing and have an id
        for file in files:
            if "id" not in file and "id" not in file["errors"]:
                file["errors"]["id"] = "File not found."
    return files

check_duplicate_ids(files) #

Check that same files are not being modified multiple times.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def check_duplicate_ids(self, files: List[dict]) -> List[dict]:
    """Check that same files are not being modified multiple times."""
    existing_files = [f for f in files if "id" in f]
    id_values = set()
    for f in existing_files:
        if f["id"] in id_values:
            f["errors"].setdefault("id", _("Duplicate file in request."))
        else:
            id_values.add(f["id"])
    return files

check_files_allowed_actions(files) #

Check that only allowed actions are performed on files.

Files containing an id field are assumed to exist.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def check_files_allowed_actions(self, files: List[dict]) -> List[dict]:
    """Check that only allowed actions are performed on files.

    Files containing an id field are assumed to exist."""
    files = self.check_creating_new_allowed(files)
    files = self.check_changing_existing_allowed(files)
    return files

check_id_field_allowed(files) #

Check that id field is allowed.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def check_id_field_allowed(self, files: List[dict]) -> List[dict]:
    """Check that id field is allowed."""
    update_allowed = self.action in self.BULK_UPDATE_ACTIONS
    deleting = self.action in self.BULK_DELETE_ACTIONS
    if not (update_allowed or deleting):
        for file in files:
            if "id" in file:
                file["errors"].setdefault("id", _("Field not allowed for inserting files."))
    return files

check_ids_exist(files) #

Check that all file ids point to existing files.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def check_ids_exist(self, files: List[dict]) -> List[dict]:
    """Check that all file ids point to existing files."""
    data_ids = [f["id"] for f in files if "id" in f]
    existing_ids = set(File.objects.filter(id__in=data_ids).values_list("id", flat=True))

    for file in files:
        if "id" in file and file["id"] not in existing_ids:
            file["errors"].setdefault("id", _("File with id not found."))
    return files

create(validated_data) #

Perform bulk file action.

Returns success objects which will be stored in self.instance by .save().

Source code in src/apps/files/serializers/file_bulk_serializer.py
def create(self, validated_data):
    """Perform bulk file action.

    Returns success objects which will be stored in self.instance by .save()."""
    if len(validated_data) == 0:
        return []

    files = self.get_file_instances(validated_data)

    # Return empty success list on error
    if not self.ignore_errors and self.failed:
        return []

    success_files: List[dict]
    is_insert = self.action in self.BULK_INSERT_ACTIONS
    is_update = self.action in self.BULK_UPDATE_ACTIONS
    if is_insert or is_update:
        success_files = self.do_create_or_update(files)
    elif self.action in self.BULK_DELETE_ACTIONS:
        success_files = self.do_delete(files)

    return success_files

do_create_or_update(files) #

Perform bulk create and update actions for files.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def do_create_or_update(self, files: List[File]) -> List[dict]:
    """Perform bulk create and update actions for files."""
    fields_to_update = {
        field.name
        for field in File._meta.get_fields()
        if field.concrete and field.name not in {"id", *FileSerializer.create_only_fields}
    }
    characteristics_fields = [
        field.name
        for field in FileCharacteristics._meta.get_fields()
        if field.concrete and field.name != "id"
    ]

    # Create or update files and characteristics
    changed_characteristics = [
        file.characteristics
        for file in files
        if file.characteristics and getattr(file.characteristics, "_changed", False)
    ]
    FileCharacteristics.objects.bulk_create(
        changed_characteristics,
        batch_size=5000,
        update_conflicts=True,  # Update characteristics that already exist
        unique_fields=["id"],
        update_fields=characteristics_fields,
    )

    being_created = {f.id for f in files if f._state.adding}
    files = File.objects.bulk_create(
        files,
        batch_size=5000,
        update_conflicts=True,  # Update files that already exist
        unique_fields=["id"],
        update_fields=fields_to_update,
    )
    # Related objects need to be fetched again from DB after save
    prefetch_related_objects(files, "storage", "characteristics")

    # Cleanup any orphaned characteristics
    FileCharacteristics.objects.filter(file__isnull=True).delete()

    def file_action(file):
        if file.id in being_created:
            return BulkAction.INSERT
        else:
            return BulkAction.UPDATE

    return [
        {
            "object": f,
            "action": file_action(f).value,
        }
        for f in files
    ]

do_delete(files) #

Perform bulk delete on files.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def do_delete(self, files: List[File]) -> List[dict]:
    """Perform bulk delete on files."""
    now = timezone.now()
    for f in files:
        f.removed = now  # include new value in response

    # Update all files in db at once
    file_ids = [f.id for f in files]
    files_to_delete = File.objects.filter(id__in=file_ids)
    pre_files_deleted.send(sender=File, queryset=files_to_delete)  # Deprecate datasets
    files_to_delete.update(removed=now)

    return [
        {
            "object": f,
            "action": "delete",
        }
        for f in files
    ]

fail(object, errors) #

Add object to list of failed items.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def fail(self, object: dict, errors: dict):
    """Add object to list of failed items."""
    for key, error in errors.items():
        # simplify 1-length arrays from `[error]` to `error``
        if isinstance(error, list) and len(error) == 1:
            errors[key] = error[0]
    self.failed.append(BulkFileFail(object=object, errors=errors))

flush_file_errors(files) #

Remove errors field and return only files that have no errors.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def flush_file_errors(self, files: List[dict]) -> List[dict]:
    """Remove errors field and return only files that have no errors."""
    ok_values = []
    for f in files:
        if errors := f.pop("errors"):
            # Add invalid files to self.failed
            self.fail(object=f["original_data"], errors=errors)
        else:
            ok_values.append(f)
    return ok_values

get_file_instances(validated_data) #

Return not yet saved instances from validated data.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def get_file_instances(self, validated_data) -> List[File]:
    """Return not yet saved instances from validated data."""
    system_creator = get_technical_metax_user()

    existing_files_by_id = (
        File.available_objects.prefetch_related("storage")
        .select_related("characteristics")
        .in_bulk([f["id"] for f in validated_data if "id" in f])
    )

    files = []
    for f in validated_data:
        characteristics_data = f.pop("characteristics", serializers.empty)
        if "id" not in f:  # new file
            # Note: To determine if a File instance is not yet in the DB,
            # use instance._state.adding. For a UUID-style id, the id is
            # set on instantiation instead of on save, so checking
            # `id is None` won't work.
            original_data = f.pop("original_data")
            f.pop("storage_service", None)  # included in FileStorage
            f.pop("csc_project", None)  # included in FileStorage
            file = File(**f, system_creator_id=system_creator)
            file._original_data = original_data
            files.append(file)
        else:  # existing file
            file = existing_files_by_id[f["id"]]
            file._original_data = f.pop("original_data")
            if self.action in self.BULK_UPDATE_ACTIONS:
                file = self.update_file_instance(file, f)
            if file:
                files.append(file)

        if file and characteristics_data is not serializers.empty:
            self.assign_characteristics(file, characteristics_data)
    return files

group_files_by_storage_service(files) #

Return files grouped by storage service.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def group_files_by_storage_service(self, files: List[dict]) -> Dict[Optional[str], dict]:
    """Return files grouped by storage service."""
    files_by_service = {}
    for f in files:
        files_by_service.setdefault(f.get("storage_service"), []).append(f)
    return files_by_service

populate_id_from_external_identifier(files) #

Add id value to files that already exist based on external id.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def populate_id_from_external_identifier(self, files):
    """Add id value to files that already exist based on external id."""

    insert_allowed = self.action in self.BULK_INSERT_ACTIONS
    files_missing_id = [f for f in files if "id" not in f]
    for storage_service, storage_files in self.group_files_by_storage_service(
        files_missing_id
    ).items():
        if not storage_service:
            if not insert_allowed:
                for f in storage_files:
                    f["errors"][
                        "storage_service"
                    ] = "Either storage_service or id is required."
            continue

        files_by_external_id = {}

        for f in storage_files:
            if external_id := f.get("storage_identifier"):
                files_by_external_id.setdefault(external_id, []).append(f)
            elif not insert_allowed:
                f["errors"][
                    "storage_identifier"
                ] = "Either storage_identifier or id is required."

        # Get all files with matching external id
        existing_files = File.available_objects.filter(
            storage__storage_service=storage_service,
            storage_identifier__in=files_by_external_id.keys(),
        ).values(
            "storage_identifier",
            "id",
            csc_project=F("storage__csc_project"),
        )

        for existing_file in existing_files:
            for file in files_by_external_id[existing_file["storage_identifier"]]:
                file["id"] = existing_file["id"]
                # Project id can also be determined from external id when storage_service is known
                if not file.get("csc_project"):
                    file["csc_project"] = existing_file["csc_project"]

    return files

update_file_instance(instance, file_data) #

Update instance attributes of a File instance.

Returns the updated file, or None if there were errors.

Source code in src/apps/files/serializers/file_bulk_serializer.py
def update_file_instance(self, instance, file_data) -> Optional[File]:
    """Update instance attributes of a File instance.

    Returns the updated file, or None if there were errors."""
    # store original data for error messages
    errors = {}
    for field, value in file_data.items():
        if field in FileSerializer.create_only_fields:
            existing_value = getattr(instance, field, None)
            if field == "storage":
                existing_value = {"id": existing_value.id}
            if value != existing_value:
                errors[field] = _("Cannot change value after creation")
        else:
            setattr(instance, field, value)  # assign new values
    if errors:
        self.fail(
            object=instance._original_data,
            errors=errors,
        )
        return None
    instance.record_modified = timezone.now()
    return instance