58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 | def validate_schema(
*,
schema: JSONdictType,
path: str,
anyof_parent_schema: JSONdictType | None = None,
root_schema: JSONdictType | None = None,
verbose: bool = False,
):
"""
Recursive function that checks some patterns on a JSON schema.
Args:
schema: The JSON Schema to validate.
path:
Current path in the JSON Schema traversal, expanded at each recursive call.
anyof_parent_schema: Additional context for validation of `anyOf` items.
verbose: Whether to print additional logs.
"""
if verbose:
logger.setLevel(logging.INFO)
logger.info(f"START validating {path}")
# Recursive schema exploration
for def_key in schema.get(_DEFS, []):
validate_schema(
schema=schema[_DEFS][def_key],
path=f"{path}/{_DEFS}/{def_key}",
verbose=verbose,
root_schema=root_schema,
)
for prop_key in schema.get(_PROPERTIES, []):
validate_schema(
schema=schema[_PROPERTIES][prop_key],
path=f"{path}/{_PROPERTIES}/{prop_key}",
verbose=verbose,
root_schema=root_schema,
)
if _ITEMS in schema:
validate_schema(
schema=schema[_ITEMS],
path=f"{path}/{_ITEMS}",
verbose=verbose,
root_schema=root_schema,
)
for ind, item in enumerate(schema.get(_ANYOF, [])):
validate_schema(
schema=item,
path=f"{path}/{_ANYOF}/{ind}",
verbose=verbose,
anyof_parent_schema=schema,
root_schema=root_schema,
)
# Validation
if verbose:
logger.info(f"Now validate {json.dumps(schema)}")
# E0x: general errors
if schema.get(_NAME, None) in _FORBIDDEN_NAMES:
raise ValueError(f"[E01] Forbidden {_NAME} at {path}")
if _DEFINITIONS in schema:
raise ValueError(f'[E02] Unsupported keyword "{_DEFINITIONS}" at {path}')
if _ENUM in schema:
if len(set(type(item) for item in schema[_ENUM])) > 1:
raise ValueError(f"[E03] Non-homogeneous {_ENUM} at {path}")
if (
_TYPE not in schema
and _ANYOF not in schema
and _ONEOF not in schema
and _ITEMS not in schema
and _REF not in schema
):
raise ValueError(f"[E04] Unsupported schema at {path}")
if schema.get(_TYPE) == _BOOLEAN and not (
_DEFAULT in schema
or (
anyof_parent_schema is not None
and _ANYOF in anyof_parent_schema
and _DEFAULT in anyof_parent_schema
)
):
raise ValueError(f"[E05] Boolean with no {_DEFAULT} at {path}")
if (
schema.get(_TYPE) == _OBJECT
and schema.get(_ADDITIONAL_PROPERTIES) == _BOOLEAN_TYPE
):
raise ValueError(f"[E06] Object of booleans at {path}")
if _DEFAULT in schema:
default_value = schema[_DEFAULT]
if isinstance(default_value, list):
for item in default_value:
_raise_E07_if_empty_string(item, path=path)
elif isinstance(default_value, dict):
for key, value in default_value.items():
_raise_E07_if_empty_string(key, path=path)
_raise_E07_if_empty_string(value, path=path)
else:
_raise_E07_if_empty_string(default_value, path=path)
if _BOOLEAN_TYPE in schema.get(_PREFIX_ITEMS, []):
raise ValueError(f"[E08] Unsupported boolean in 'tuple' at {path}")
# E1x: anyOf-related errors
if _ANYOF in schema:
if schema[_ANYOF] in _CASES_NULLABLE_BOOLEAN_ANYOF:
raise ValueError(f"[E10] Nullable boolean at {path}")
if _NULL_TYPE in schema[_ANYOF]:
if any(
_ENUM in item.keys()
for item in schema[_ANYOF]
if isinstance(item, dict)
):
raise ValueError(f"[E11] Nullable {_ENUM} at {path}")
for internal_schema in schema[_ANYOF]:
if internal_schema == _NULL_TYPE:
continue
if _REF in internal_schema:
if root_schema is None:
raise RuntimeError(
f"[I90] Internal error at {path}: `root_schema` not set."
)
ref_value = internal_schema.get(_REF)
_hash, _defs, ref_key = ref_value.split("/")
if _hash != "#" or _defs != _DEFS:
raise RuntimeError(
f"[I91] Internal error at {path}: "
f"Invalid {_REF} string {ref_value}"
)
try:
_internal_def = root_schema[_DEFS][ref_key]
except KeyError as e:
raise RuntimeError(
f"[I92] Internal error at {path}: KeyError {str(e)}"
)
if _ENUM in _internal_def:
raise ValueError(f"[E12] Nullable {_ENUM} at {path}")
if (
len(
[
item
for item in schema[_ANYOF]
if (
isinstance(item, dict)
and item.get("type", None) in _NON_NULL_PRIMITIVE_TYPES
)
]
)
> 1
):
raise ValueError(f"[E13] Unsupported {_ANYOF} of primitive types at {path}")
if (
len(
[
item
for item in schema[_ANYOF]
if isinstance(item, dict) and _REF in item
]
)
> 1
):
raise ValueError(
f"[E14] Unsupported {_ANYOF} with more than one {_REF} at {path}"
)
if len(schema[_ANYOF]) > 2:
# Note: this branch is likely unreachable, but we keep it as an
# additional safety measure
raise ValueError(
f"[E15] Unsupported {_ANYOF} with more than two items at {path}"
)
# E2x: oneOf-related errors
if _ONEOF in schema:
if _ITEMS in schema:
if _DISCRIMINATOR not in schema[_ITEMS]:
raise ValueError(f"[E20] {_ONEOF} with no {_DISCRIMINATOR} at {path}")
else:
if _DISCRIMINATOR not in schema:
raise ValueError(f"[E21] {_ONEOF} with no {_DISCRIMINATOR} at {path}")
if not all(_REF in item for item in schema[_ONEOF]):
raise ValueError(f"[E22] Unsupported non-{_REF} item in {_ONEOF} at {path}")
|