Skip to content

pid_ms_client#

_DummyPIDMSClient#

Source code in src/apps/core/services/pid_ms_client.py
class _DummyPIDMSClient:
    def __init__(self):
        # Empty constructor
        pass

    def createURN(self, dataset_id):
        dummy_pid = "urn:nbn:fi:fd-dummy-" + str(uuid.uuid4())
        return dummy_pid

_PIDMSClient#

Source code in src/apps/core/services/pid_ms_client.py
class _PIDMSClient:
    def __init__(self):
        self.pid_ms_url = f"https://{settings.PID_MS_BASEURL}"
        self.pid_ms_apikey = settings.PID_MS_APIKEY
        self.etsin_url = settings.ETSIN_URL

    def createURN(self, dataset_id):
        payload = {
            "url": f"https://{self.etsin_url}/dataset/{dataset_id}",
            "type": "URN",
            "persist": 0,
        }
        headers = {"apikey": self.pid_ms_apikey}
        try:
            response = requests.post(self.pid_ms_url + "/v1/pid", json=payload, headers=headers)
            response.raise_for_status()
            return response.text
        except Exception as e:
            _logger.error(f"Exception in PIDMSClient: {e}")
            raise ServiceUnavailableError

ServiceUnavailableError#

Bases: APIException

Source code in src/apps/core/services/pid_ms_client.py
class ServiceUnavailableError(APIException):
    status_code = 503
    default_detail = "Service temporarily unavailable, try again later."
    default_code = "service_unavailable"