|
14 | 14 | # See the License for the specific language governing permissions and |
15 | 15 | # limitations under the License. |
16 | 16 | # |
17 | | -import os |
18 | 17 | import unittest |
19 | 18 | from pprint import pformat |
20 | 19 |
|
21 | | -from test_env import (TPP_TOKEN_URL, CLOUD_APIKEY, CLOUD_URL, TPP_PM_ROOT, CLOUD_ENTRUST_CA_NAME,CLOUD_DIGICERT_CA_NAME, |
22 | | - TPP_CA_NAME, TPP_USER, TPP_PASSWORD) |
| 20 | +from test_env import (TPP_TOKEN_URL, CLOUD_APIKEY, CLOUD_URL, TPP_PM_ROOT, CLOUD_ENTRUST_CA_NAME, |
| 21 | + CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD) |
23 | 22 | from test_utils import timestamp |
24 | | -from vcert import TPPTokenConnection, CloudConnection, Authentication, SCOPE_PM, logger |
| 23 | +from vcert import TPPTokenConnection, CloudConnection, Authentication, SCOPE_PM, logger, VenafiError |
25 | 24 | from vcert.parser import json_parser, yaml_parser |
26 | 25 | from vcert.parser.utils import parse_policy_spec |
27 | 26 | from vcert.policy import (Policy, Subject, KeyPair, SubjectAltNames, Defaults, DefaultSubject, DefaultKeyPair, |
28 | 27 | PolicySpecification) |
29 | | -from vcert.policy.pm_cloud import CA_TYPE_DIGICERT, CA_TYPE_ENTRUST |
| 28 | +from vcert.policy.pm_cloud import (CA_TYPE_DIGICERT, CA_TYPE_ENTRUST, validate_policy_spec as validate_ps_vaas, |
| 29 | + get_ca_info, default_error_msg) |
| 30 | +from vcert.policy.pm_tpp import (is_service_generated_csr, validate_policy_spec as validate_ps_tpp, no_match_error_msg, |
| 31 | + too_many_error_msg, unsupported_error_msg, supported_key_types, |
| 32 | + supported_rsa_key_sizes, supported_elliptic_curves) |
30 | 33 |
|
31 | 34 | # This values are loaded from the project root which is vcert-python, not tests folder |
32 | 35 | POLICY_SPEC_JSON = './tests/resources/policy_specification.json' |
@@ -171,6 +174,347 @@ def _get_random_zone(): |
171 | 174 | return _get_zone() |
172 | 175 |
|
173 | 176 |
|
| 177 | +class TestLocalMethods(unittest.TestCase): |
| 178 | + def test_exceptions_tpp(self): |
| 179 | + try: |
| 180 | + is_service_generated_csr("") |
| 181 | + except VenafiError as err: |
| 182 | + self.assertEqual(err.args[0], "csr generation value cannot be empty") |
| 183 | + |
| 184 | + ps = PolicySpecification(policy=Policy(), defaults=Defaults()) |
| 185 | + ps.policy.auto_installed = True |
| 186 | + ps.defaults.auto_installed = False |
| 187 | + |
| 188 | + # Testing Subject structure |
| 189 | + try: |
| 190 | + validate_ps_tpp(ps) |
| 191 | + except VenafiError as err: |
| 192 | + self.assertEqual(err.args[0], "Subject structure is empty") |
| 193 | + |
| 194 | + s = Subject(orgs=["foo", "bar"], |
| 195 | + org_units=["QA Venafi"], |
| 196 | + localities=["foo", "bar"], |
| 197 | + states=["foo", "bar"], |
| 198 | + countries=["foo", "bar"]) |
| 199 | + ps.policy.subject = s |
| 200 | + |
| 201 | + try: |
| 202 | + validate_ps_tpp(ps) |
| 203 | + except VenafiError as err: |
| 204 | + self.assertEqual(err.args[0], too_many_error_msg.format('organizations')) |
| 205 | + ps.policy.subject.orgs = ["Venafi"] |
| 206 | + try: |
| 207 | + validate_ps_tpp(ps) |
| 208 | + except VenafiError as err: |
| 209 | + self.assertEqual(err.args[0], too_many_error_msg.format('localities')) |
| 210 | + ps.policy.subject.localities = ["Salt Lake City"] |
| 211 | + try: |
| 212 | + validate_ps_tpp(ps) |
| 213 | + except VenafiError as err: |
| 214 | + self.assertEqual(err.args[0], too_many_error_msg.format('states')) |
| 215 | + ps.policy.subject.states = ["Utah"] |
| 216 | + try: |
| 217 | + validate_ps_tpp(ps) |
| 218 | + except VenafiError as err: |
| 219 | + self.assertEqual(err.args[0], too_many_error_msg.format('countries')) |
| 220 | + ps.policy.subject.countries = ["USA"] |
| 221 | + try: |
| 222 | + validate_ps_tpp(ps) |
| 223 | + except VenafiError as err: |
| 224 | + self.assertEqual(err.args[0], "country code [USA] does not match ISO Alpha-2 specification") |
| 225 | + ps.policy.subject.countries = ["US"] |
| 226 | + |
| 227 | + # Testing KeyPair object |
| 228 | + try: |
| 229 | + validate_ps_tpp(ps) |
| 230 | + except VenafiError as err: |
| 231 | + self.assertEqual(err.args[0], "Key Pair structure is empty") |
| 232 | + |
| 233 | + kp = KeyPair(key_types=["foo", "bar"], |
| 234 | + rsa_key_sizes=[123, 456], |
| 235 | + elliptic_curves=["foo", "bar"], |
| 236 | + service_generated=False) |
| 237 | + ps.policy.key_pair = kp |
| 238 | + |
| 239 | + try: |
| 240 | + validate_ps_tpp(ps) |
| 241 | + except VenafiError as err: |
| 242 | + self.assertEqual(err.args[0], too_many_error_msg.format('key types')) |
| 243 | + ps.policy.key_pair.key_types = ["foo"] |
| 244 | + try: |
| 245 | + validate_ps_tpp(ps) |
| 246 | + except VenafiError as err: |
| 247 | + msg = unsupported_error_msg.format('key types', supported_key_types, kp.key_types) |
| 248 | + self.assertEqual(err.args[0], msg) |
| 249 | + ps.policy.key_pair.key_types = ["RSA"] |
| 250 | + try: |
| 251 | + validate_ps_tpp(ps) |
| 252 | + except VenafiError as err: |
| 253 | + self.assertEqual(err.args[0], too_many_error_msg.format('key bit strength')) |
| 254 | + ps.policy.key_pair.rsa_key_sizes = [256] |
| 255 | + try: |
| 256 | + validate_ps_tpp(ps) |
| 257 | + except VenafiError as err: |
| 258 | + msg = unsupported_error_msg.format('key bit strength', supported_rsa_key_sizes, kp.rsa_key_sizes) |
| 259 | + self.assertEqual(err.args[0], msg) |
| 260 | + ps.policy.key_pair.rsa_key_sizes = [4096] |
| 261 | + try: |
| 262 | + validate_ps_tpp(ps) |
| 263 | + except VenafiError as err: |
| 264 | + self.assertEqual(err.args[0], too_many_error_msg.format('elliptic curve')) |
| 265 | + ps.policy.key_pair.elliptic_curves = ["foo"] |
| 266 | + try: |
| 267 | + validate_ps_tpp(ps) |
| 268 | + except VenafiError as err: |
| 269 | + msg = unsupported_error_msg.format('elliptic curve', supported_elliptic_curves, kp.elliptic_curves) |
| 270 | + self.assertEqual(err.args[0], msg) |
| 271 | + ps.policy.key_pair.elliptic_curves = ["P521"] |
| 272 | + |
| 273 | + # Testing DefaultSubject structure |
| 274 | + ds = DefaultSubject(org="Foo", |
| 275 | + org_units=["foo", "bar"], |
| 276 | + locality="foo", |
| 277 | + state="foo", |
| 278 | + country="foo") |
| 279 | + ps.defaults.subject = ds |
| 280 | + |
| 281 | + try: |
| 282 | + validate_ps_tpp(ps) |
| 283 | + except VenafiError as err: |
| 284 | + msg = no_match_error_msg.format('organizations', ds.org, s.orgs[0]) |
| 285 | + self.assertEqual(err.args[0], msg) |
| 286 | + ps.defaults.subject.org = s.orgs[0] |
| 287 | + try: |
| 288 | + validate_ps_tpp(ps) |
| 289 | + except VenafiError as err: |
| 290 | + msg = no_match_error_msg.format('orgUnits', ds.org_units[0], s.org_units[0]) |
| 291 | + self.assertEqual(err.args[0], msg) |
| 292 | + ps.defaults.subject.org_units = s.org_units |
| 293 | + try: |
| 294 | + validate_ps_tpp(ps) |
| 295 | + except VenafiError as err: |
| 296 | + msg = no_match_error_msg.format('localities', ds.locality, s.localities[0]) |
| 297 | + self.assertEqual(err.args[0], msg) |
| 298 | + ps.defaults.subject.locality = s.localities[0] |
| 299 | + try: |
| 300 | + validate_ps_tpp(ps) |
| 301 | + except VenafiError as err: |
| 302 | + msg = no_match_error_msg.format('states', ds.state, s.states[0]) |
| 303 | + self.assertEqual(err.args[0], msg) |
| 304 | + ps.defaults.subject.state = s.states[0] |
| 305 | + try: |
| 306 | + validate_ps_tpp(ps) |
| 307 | + except VenafiError as err: |
| 308 | + msg = no_match_error_msg.format('countries', ds.country, s.countries[0]) |
| 309 | + self.assertEqual(err.args[0], msg) |
| 310 | + ps.defaults.subject.country = s.countries[0] |
| 311 | + |
| 312 | + # Testing DefaultKeyPair against Policy KeyPair |
| 313 | + dkp = DefaultKeyPair(key_type="foo", |
| 314 | + rsa_key_size=256, |
| 315 | + elliptic_curve="foo", |
| 316 | + service_generated=True) |
| 317 | + ps.defaults.key_pair = dkp |
| 318 | + |
| 319 | + try: |
| 320 | + validate_ps_tpp(ps) |
| 321 | + except VenafiError as err: |
| 322 | + msg = no_match_error_msg.format('key types', dkp.key_type, kp.key_types[0]) |
| 323 | + self.assertEqual(err.args[0], msg) |
| 324 | + ps.defaults.key_pair.key_type = kp.key_types[0] |
| 325 | + try: |
| 326 | + validate_ps_tpp(ps) |
| 327 | + except VenafiError as err: |
| 328 | + msg = no_match_error_msg.format('rsa key sizes', dkp.rsa_key_size, kp.rsa_key_sizes[0]) |
| 329 | + self.assertEqual(err.args[0], msg) |
| 330 | + ps.defaults.key_pair.rsa_key_size = kp.rsa_key_sizes[0] |
| 331 | + try: |
| 332 | + validate_ps_tpp(ps) |
| 333 | + except VenafiError as err: |
| 334 | + msg = no_match_error_msg.format('elliptic curves', dkp.elliptic_curve, kp.elliptic_curves[0]) |
| 335 | + self.assertEqual(err.args[0], msg) |
| 336 | + ps.defaults.key_pair.elliptic_curve = kp.elliptic_curves[0] |
| 337 | + try: |
| 338 | + validate_ps_tpp(ps) |
| 339 | + except VenafiError as err: |
| 340 | + msg = no_match_error_msg.format('generation type', dkp.service_generated, kp.service_generated) |
| 341 | + self.assertEqual(err.args[0], msg) |
| 342 | + ps.defaults.key_pair.service_generated = kp.service_generated |
| 343 | + |
| 344 | + # Testing DefaultKeyPair |
| 345 | + dkp2 = DefaultKeyPair(key_type="foo", |
| 346 | + rsa_key_size=256, |
| 347 | + elliptic_curve="foo", |
| 348 | + service_generated=False) |
| 349 | + ps.policy = None |
| 350 | + ps.defaults.key_pair = dkp2 |
| 351 | + |
| 352 | + try: |
| 353 | + validate_ps_tpp(ps) |
| 354 | + except VenafiError as err: |
| 355 | + msg = unsupported_error_msg.format('key type', supported_key_types, dkp2.key_type) |
| 356 | + self.assertEqual(err.args[0], msg) |
| 357 | + ps.defaults.key_pair.key_type = "RSA" |
| 358 | + try: |
| 359 | + validate_ps_tpp(ps) |
| 360 | + except VenafiError as err: |
| 361 | + msg = unsupported_error_msg.format('rsa key size', supported_rsa_key_sizes, dkp2.rsa_key_size) |
| 362 | + self.assertEqual(err.args[0], msg) |
| 363 | + ps.defaults.key_pair.rsa_key_size = 4096 |
| 364 | + try: |
| 365 | + validate_ps_tpp(ps) |
| 366 | + except VenafiError as err: |
| 367 | + msg = unsupported_error_msg.format('elliptic curve', supported_elliptic_curves, dkp2.elliptic_curve) |
| 368 | + self.assertEqual(err.args[0], msg) |
| 369 | + ps.defaults.key_pair.elliptic_curve = "P521" |
| 370 | + |
| 371 | + # Testing autoinstalled option |
| 372 | + ps.policy = Policy(subject=s, key_pair=kp, auto_installed=True) |
| 373 | + try: |
| 374 | + validate_ps_tpp(ps) |
| 375 | + except VenafiError as err: |
| 376 | + self.assertEqual(err.args[0], no_match_error_msg.format('autoinstalled', False, True)) |
| 377 | + ps.defaults.auto_installed = True |
| 378 | + |
| 379 | + def test_exceptions_vaas(self): |
| 380 | + try: |
| 381 | + get_ca_info("foo\\bar") |
| 382 | + except VenafiError as err: |
| 383 | + self.assertEqual(err.args[0], f"Certificate Authority name invalid [foo\\bar]") |
| 384 | + |
| 385 | + ps = PolicySpecification(policy=Policy(), defaults=Defaults()) |
| 386 | + kp = KeyPair(key_types=["foo", "bar"], |
| 387 | + rsa_key_sizes=[256], |
| 388 | + elliptic_curves=["asd"], |
| 389 | + service_generated=True) |
| 390 | + ps.policy.key_pair = kp |
| 391 | + ps.policy.subject_alt_names = SubjectAltNames(dns_allowed=True, email_allowed=True) |
| 392 | + s = Subject(orgs=["Venafi"], |
| 393 | + org_units=["QA Venafi"], |
| 394 | + localities=["Salt Lake City"], |
| 395 | + states=["Utah"], |
| 396 | + countries=["US"]) |
| 397 | + ps.policy.subject = s |
| 398 | + ds = DefaultSubject(org="Foo", |
| 399 | + org_units=["Bar"], |
| 400 | + locality="Kwan", |
| 401 | + state="Merida", |
| 402 | + country="MX") |
| 403 | + ps.defaults.subject = ds |
| 404 | + dkp = DefaultKeyPair(key_type="foo", |
| 405 | + rsa_key_size=256, |
| 406 | + elliptic_curve="bar", |
| 407 | + service_generated=False) |
| 408 | + ps.defaults.key_pair = dkp |
| 409 | + |
| 410 | + # validate key pair values |
| 411 | + try: |
| 412 | + validate_ps_vaas(ps) |
| 413 | + except VenafiError as err: |
| 414 | + msg = "Key Type values exceeded. Only one Key Type is allowed by VaaS" |
| 415 | + self.assertEqual(err.args[0], msg) |
| 416 | + ps.policy.key_pair.key_types = ["foo"] |
| 417 | + try: |
| 418 | + validate_ps_vaas(ps) |
| 419 | + except VenafiError as err: |
| 420 | + msg = f"Key Type [{ps.policy.key_pair.key_types[0]}] is not supported by VaaS" |
| 421 | + self.assertEqual(err.args[0], msg) |
| 422 | + ps.policy.key_pair.key_types = ["RSA"] |
| 423 | + try: |
| 424 | + validate_ps_vaas(ps) |
| 425 | + except VenafiError as err: |
| 426 | + msg = f"The Key Size [{256}] is not supported by VaaS" |
| 427 | + self.assertEqual(err.args[0], msg) |
| 428 | + ps.policy.key_pair.rsa_key_sizes = [4096] |
| 429 | + |
| 430 | + # validate subject CN and SAN regexes |
| 431 | + try: |
| 432 | + validate_ps_vaas(ps) |
| 433 | + except VenafiError as err: |
| 434 | + msg = "Subject Alt name [SubjAltNameEmailAllowed] is not allowed by VaaS" |
| 435 | + self.assertEqual(err.args[0], msg) |
| 436 | + ps.policy.subject_alt_names.email_allowed = False |
| 437 | + |
| 438 | + # validate default subject values against policy values |
| 439 | + try: |
| 440 | + validate_ps_vaas(ps) |
| 441 | + except VenafiError as err: |
| 442 | + msg = default_error_msg.format('Organization', ds.org, s.orgs) |
| 443 | + self.assertEqual(err.args[0], msg) |
| 444 | + ps.defaults.subject.org = s.orgs[0] |
| 445 | + try: |
| 446 | + validate_ps_vaas(ps) |
| 447 | + except VenafiError as err: |
| 448 | + msg = default_error_msg.format('Org Units', ds.org_units, s.org_units) |
| 449 | + self.assertEqual(err.args[0], msg) |
| 450 | + ps.defaults.subject.org_units = s.org_units |
| 451 | + try: |
| 452 | + validate_ps_vaas(ps) |
| 453 | + except VenafiError as err: |
| 454 | + msg = default_error_msg.format('Localities', ds.locality, s.localities) |
| 455 | + self.assertEqual(err.args[0], msg) |
| 456 | + ps.defaults.subject.locality = s.localities[0] |
| 457 | + try: |
| 458 | + validate_ps_vaas(ps) |
| 459 | + except VenafiError as err: |
| 460 | + msg = default_error_msg.format('States', ds.state, s.states) |
| 461 | + self.assertEqual(err.args[0], msg) |
| 462 | + ps.defaults.subject.state = s.states[0] |
| 463 | + try: |
| 464 | + validate_ps_vaas(ps) |
| 465 | + except VenafiError as err: |
| 466 | + msg = default_error_msg.format('Countries', ds.country, s.countries) |
| 467 | + self.assertEqual(err.args[0], msg) |
| 468 | + ps.defaults.subject.country = s.countries[0] |
| 469 | + |
| 470 | + # validate default key pair values against policy values |
| 471 | + try: |
| 472 | + validate_ps_vaas(ps) |
| 473 | + except VenafiError as err: |
| 474 | + msg = default_error_msg.format('Key Types', dkp.key_type, kp.key_types) |
| 475 | + self.assertEqual(err.args[0], msg) |
| 476 | + ps.defaults.key_pair.key_type = kp.key_types[0] |
| 477 | + try: |
| 478 | + validate_ps_vaas(ps) |
| 479 | + except VenafiError as err: |
| 480 | + msg = default_error_msg.format('RSA Key Sizes', dkp.rsa_key_size, kp.rsa_key_sizes) |
| 481 | + self.assertEqual(err.args[0], msg) |
| 482 | + ps.defaults.key_pair.rsa_key_size = kp.rsa_key_sizes[0] |
| 483 | + try: |
| 484 | + validate_ps_vaas(ps) |
| 485 | + except VenafiError as err: |
| 486 | + msg = default_error_msg.format('Elliptic Curves', dkp.elliptic_curve, kp.elliptic_curves) |
| 487 | + self.assertEqual(err.args[0], msg) |
| 488 | + ps.defaults.key_pair.elliptic_curve = kp.elliptic_curves[0] |
| 489 | + try: |
| 490 | + validate_ps_vaas(ps) |
| 491 | + except VenafiError as err: |
| 492 | + msg = default_error_msg.format('Service Generated', dkp.service_generated, kp.service_generated) |
| 493 | + self.assertEqual(err.args[0], msg) |
| 494 | + ps.defaults.key_pair.service_generated = kp.service_generated |
| 495 | + |
| 496 | + # validate default values when policy is not defined |
| 497 | + ps.policy = None |
| 498 | + dkp2 = DefaultKeyPair(key_type="foo", |
| 499 | + rsa_key_size=256, |
| 500 | + elliptic_curve="bar", |
| 501 | + service_generated=False) |
| 502 | + ps.defaults.key_pair = dkp2 |
| 503 | + |
| 504 | + try: |
| 505 | + validate_ps_vaas(ps) |
| 506 | + except VenafiError as err: |
| 507 | + msg = f"Default Key Type [{dkp2.key_type}] is not supported by VaaS" |
| 508 | + self.assertEqual(err.args[0], msg) |
| 509 | + ps.defaults.key_pair.key_type = "RSA" |
| 510 | + try: |
| 511 | + validate_ps_vaas(ps) |
| 512 | + except VenafiError as err: |
| 513 | + msg = f"Default Key Size [{256}] is not supported by VaaS" |
| 514 | + self.assertEqual(err.args[0], msg) |
| 515 | + ps.defaults.key_pair.rsa_key_size = 4096 |
| 516 | + |
| 517 | + |
174 | 518 | def create_policy(connector, zone, policy_spec=None, policy=None, defaults=None): |
175 | 519 | if not policy_spec: |
176 | 520 | policy_spec = PolicySpecification() |
@@ -262,13 +606,3 @@ def _get_zone(): |
262 | 606 | def _get_tpp_policy_name(): |
263 | 607 | time = timestamp() |
264 | 608 | return f"{_get_app_name().format(time)}" |
265 | | - |
266 | | -# def _resolve_resources_path(path): |
267 | | -# resources_dir = os.path.dirname(__file__) |
268 | | -# log.debug(f"Testing root folder: [{resources_dir}]") |
269 | | -# if resources_dir.endswith('tests'): |
270 | | -# resolved_path = f"./{path}" |
271 | | -# else: |
272 | | -# resolved_path = f"./tests/{path}" |
273 | | -# log.debug(f"resolved path: [{resolved_path}]") |
274 | | -# return resolved_path |
|
0 commit comments