Singleton abstracting access to all Variables expected to be defined in Prefect
Source code in backend/archiver/config/variables.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 | class Variables:
"""Singleton abstracting access to all Variables expected to be defined in Prefect"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Variables, cls).__new__(cls)
return cls._instance
def __get(self, name: str) -> str:
c = None
# try for local overrides as env variables
c = os.environ.get(name.upper())
if c is not None:
return c
try:
c = Variable.get(name)
finally:
if c is None:
getLogger().warning(f"Value {name} not found in config, returning empty string")
return ""
return c
@property
def SCICAT_ENDPOINT(self) -> str:
return self.__get("scicat_endpoint")
@property
def SCICAT_API_PREFIX(self) -> str:
return self.__get("scicat_api_prefix") or ""
@property
def SCICAT_DATASETS_API_PREFIX(self) -> str:
return self.__get("scicat_datasets_api_prefix") or ""
@property
def SCICAT_JOBS_API_PREFIX(self) -> str:
return self.__get("scicat_jobs_api_prefix") or ""
@property
def MINIO_RETRIEVAL_BUCKET(self) -> str:
return self.__get("minio_retrieval_bucket")
@property
def MINIO_LANDINGZONE_BUCKET(self) -> str:
return self.__get("minio_landingzone_bucket")
@property
def MINIO_STAGING_BUCKET(self) -> str:
return self.__get("minio_staging_bucket")
@property
def MINIO_REGION(self) -> str:
return self.__get("minio_region")
@property
def MINIO_ENDPOINT(self) -> str:
return self.__get("minio_endpoint")
@property
def MINIO_URL_EXPIRATION_DAYS(self) -> int:
return int(self.__get("minio_url_expiration_days") or 7)
@property
def MINIO_EXTERNAL_ENDPOINT(self) -> str:
return self.__get("minio_external_endpoint")
@property
def ARCHIVER_SCRATCH_FOLDER(self) -> Path:
return Path(self.__get("archiver_scratch_folder"))
@property
def ARCHIVER_TARGET_SIZE_GB(self) -> int:
return int(self.__get("archiver_target_size_gb") or 200)
@property
def ARCHIVER_NUM_WORKERS(self) -> int:
return int(self.__get("archiver_num_workers") or 30)
|