Skip to content

organization_indexer#

OrganizationIndexer#

Load organizations and up to one level of suborganizations.

Source code in src/apps/actors/services/organization_indexer.py
class OrganizationIndexer:
    """Load organizations and up to one level of suborganizations."""

    def fetch_orgs_from_api(self):
        """Fetch organizations from API and write to csv file."""
        _logger.info(f"Fetching organization data from {settings.ORGANIZATION_FETCH_API_URL}")
        res = requests.get(settings.ORGANIZATION_FETCH_API_URL)
        data = res.json()

        orgs_json = data["hits"]["hits"]
        orgs = []

        for org in orgs_json:
            org_source = org["_source"]

            name_fi = str(org_source["nameFi"]).strip()
            name_en = str(org_source["nameEn"]).strip()
            name_sv = str(org_source.get("nameSv")).strip()
            org_code = str(org_source["organizationId"]).strip()

            organization = {
                "org_name_fi": name_fi,
                "org_name_en": name_en,
                "org_name_sv": name_sv,
                "org_code": org_code,
            }
            orgs.append(organization)

            sub_units = org_source.get("subUnits") or []
            for sub_unit in sub_units:
                unit_sub_code = str(sub_unit["subUnitID"]).strip()
                unit_name = str(sub_unit["subUnitName"]).strip()

                sub_organization = {
                    **organization,
                    "unit_name": unit_name,
                    "unit_sub_code": unit_sub_code,
                }
                orgs.append(sub_organization)

        orgs = sorted(orgs, key=lambda org: (org["org_name_fi"], org.get("unit_name", "")))

        _logger.info(f"Retrieved {len(orgs)} organizations.")
        if settings.ORGANIZATION_DATA_FILE:
            with open(settings.ORGANIZATION_DATA_FILE, "w") as f:
                writer = csv.DictWriter(
                    f, delimiter=",", quotechar='"', lineterminator="\n", fieldnames=CSV_HEADERS
                )
                writer.writeheader()
                writer.writerows(orgs)
            _logger.info(f"CSV updated.")
        return orgs

    def row_to_dict(self, org: dict):
        """Convert organizations.csv style org to format closer to Metax."""
        org_name_fi = org.get("org_name_fi", "")
        org_name_en = org.get("org_name_en", "")
        org_name_sv = org.get("org_name_sv", "")
        main_org_code = org.get("org_code", "")
        # unit_main_code is unused
        unit_sub_code = org.get("unit_sub_code", "")
        unit_name = org.get("unit_name", "").rstrip()
        org_isni = org.get("org_isni", "")
        org_csc = org.get("org_csc", "")

        label = {
            "fi": org_name_fi,
            "und": org_name_fi,
        }

        if org_name_en:
            label["en"] = org_name_en

        if org_name_sv:
            label["sv"] = org_name_sv

        org_code = main_org_code
        parent_org_code = None
        if unit_sub_code:
            org_code = f"{org_code}-{unit_sub_code}"
            parent_org_code = main_org_code
            label = {
                "en": unit_name,
                "fi": unit_name,
                "sv": unit_name,
                "und": unit_name,
            }

        url = f"{settings.ORGANIZATION_BASE_URI}{quote(org_code)}"
        parent_url = (
            f"{settings.ORGANIZATION_BASE_URI}{quote(parent_org_code)}"
            if parent_org_code
            else None
        )
        org = {
            "url": url,
            "pref_label": label,
            "code": org_code,
            "parent": parent_url,
            "isni": org_isni,  # not used currently
            "csc": org_csc,  # not used currently
            "in_scheme": settings.ORGANIZATION_SCHEME,
        }
        return org

    def get_orgs_from_csv(self):
        _logger.info(f"Reading organizations from csv")
        with open(settings.ORGANIZATION_DATA_FILE) as f:
            reader = csv.DictReader(f, delimiter=",", quotechar='"', lineterminator="\n")
            return list(reader)

    def orgs_list_to_dict(self, orgs: list):
        orgs_dict = {}
        for org in orgs:
            org = self.row_to_dict(org)
            if existing := orgs_dict.get(org["url"]):
                label = org.get("pref_label", {}).get("en")
                existing_label = existing.get("pref_label", {}).get("en")
                _logger.warning(
                    f"Duplicate organization URL, skipping: {org['url']} {label}, existing: {existing_label}"
                )
            else:
                orgs_dict[org["url"]] = org
        _logger.info(f"Loaded {len(orgs_dict)} organizations.")
        return orgs_dict

    def sort_parents_first(self, orgs_dict):
        """Sort organizations so main organizations listed first."""
        return sorted(orgs_dict.values(), key=lambda x: x["parent"] is not None)

    @transaction.atomic
    def update_orgs(self, orgs_dict):
        all_reference_orgs = Organization.all_objects.filter(
            is_reference_data=True, in_scheme=settings.ORGANIZATION_SCHEME
        )

        # Deprecate organizations that have been removed from source data
        new_deprecated = all_reference_orgs.filter(deprecated__isnull=True).exclude(
            url__in=orgs_dict.keys()
        )
        if count := new_deprecated.count():
            _logger.info(
                f"Reference data organizations in database but no longer in source data: {count}"
            )
            new_deprecated.update(deprecated=timezone.now())

        existing_orgs = all_reference_orgs.filter(url__in=orgs_dict.keys())
        orgs_by_url = {org.url: org for org in existing_orgs}

        # create parent organizations first so children can refer to them
        orgs_parents_first = self.sort_parents_first(orgs_dict)

        for org_dict in orgs_parents_first:
            url = org_dict["url"]
            org = orgs_by_url.get(url)
            is_new = not org
            if is_new:
                org = Organization(url=url, is_reference_data=True)
                orgs_by_url[url] = org

            org.in_scheme = org_dict.get("in_scheme")
            org.code = org_dict.get("code")
            org.same_as = []
            org.pref_label = org_dict["pref_label"]
            org.deprecated = None
            if parent := org_dict.get("parent"):
                org.parent = orgs_by_url[parent]
            else:
                org.parent = None

            org.save()
        _logger.info(f"Organizations updated")

    def index(self, cached=False):
        orgs: list
        if cached:
            orgs = self.get_orgs_from_csv()
        else:
            orgs = self.fetch_orgs_from_api()

        orgs_dict = self.orgs_list_to_dict(orgs)
        with cachalot_disabled():
            self.update_orgs(orgs_dict)

fetch_orgs_from_api() #

Fetch organizations from API and write to csv file.

Source code in src/apps/actors/services/organization_indexer.py
def fetch_orgs_from_api(self):
    """Fetch organizations from API and write to csv file."""
    _logger.info(f"Fetching organization data from {settings.ORGANIZATION_FETCH_API_URL}")
    res = requests.get(settings.ORGANIZATION_FETCH_API_URL)
    data = res.json()

    orgs_json = data["hits"]["hits"]
    orgs = []

    for org in orgs_json:
        org_source = org["_source"]

        name_fi = str(org_source["nameFi"]).strip()
        name_en = str(org_source["nameEn"]).strip()
        name_sv = str(org_source.get("nameSv")).strip()
        org_code = str(org_source["organizationId"]).strip()

        organization = {
            "org_name_fi": name_fi,
            "org_name_en": name_en,
            "org_name_sv": name_sv,
            "org_code": org_code,
        }
        orgs.append(organization)

        sub_units = org_source.get("subUnits") or []
        for sub_unit in sub_units:
            unit_sub_code = str(sub_unit["subUnitID"]).strip()
            unit_name = str(sub_unit["subUnitName"]).strip()

            sub_organization = {
                **organization,
                "unit_name": unit_name,
                "unit_sub_code": unit_sub_code,
            }
            orgs.append(sub_organization)

    orgs = sorted(orgs, key=lambda org: (org["org_name_fi"], org.get("unit_name", "")))

    _logger.info(f"Retrieved {len(orgs)} organizations.")
    if settings.ORGANIZATION_DATA_FILE:
        with open(settings.ORGANIZATION_DATA_FILE, "w") as f:
            writer = csv.DictWriter(
                f, delimiter=",", quotechar='"', lineterminator="\n", fieldnames=CSV_HEADERS
            )
            writer.writeheader()
            writer.writerows(orgs)
        _logger.info(f"CSV updated.")
    return orgs

row_to_dict(org) #

Convert organizations.csv style org to format closer to Metax.

Source code in src/apps/actors/services/organization_indexer.py
def row_to_dict(self, org: dict):
    """Convert organizations.csv style org to format closer to Metax."""
    org_name_fi = org.get("org_name_fi", "")
    org_name_en = org.get("org_name_en", "")
    org_name_sv = org.get("org_name_sv", "")
    main_org_code = org.get("org_code", "")
    # unit_main_code is unused
    unit_sub_code = org.get("unit_sub_code", "")
    unit_name = org.get("unit_name", "").rstrip()
    org_isni = org.get("org_isni", "")
    org_csc = org.get("org_csc", "")

    label = {
        "fi": org_name_fi,
        "und": org_name_fi,
    }

    if org_name_en:
        label["en"] = org_name_en

    if org_name_sv:
        label["sv"] = org_name_sv

    org_code = main_org_code
    parent_org_code = None
    if unit_sub_code:
        org_code = f"{org_code}-{unit_sub_code}"
        parent_org_code = main_org_code
        label = {
            "en": unit_name,
            "fi": unit_name,
            "sv": unit_name,
            "und": unit_name,
        }

    url = f"{settings.ORGANIZATION_BASE_URI}{quote(org_code)}"
    parent_url = (
        f"{settings.ORGANIZATION_BASE_URI}{quote(parent_org_code)}"
        if parent_org_code
        else None
    )
    org = {
        "url": url,
        "pref_label": label,
        "code": org_code,
        "parent": parent_url,
        "isni": org_isni,  # not used currently
        "csc": org_csc,  # not used currently
        "in_scheme": settings.ORGANIZATION_SCHEME,
    }
    return org

sort_parents_first(orgs_dict) #

Sort organizations so main organizations listed first.

Source code in src/apps/actors/services/organization_indexer.py
def sort_parents_first(self, orgs_dict):
    """Sort organizations so main organizations listed first."""
    return sorted(orgs_dict.values(), key=lambda x: x["parent"] is not None)