1919
2020from test_env import (TPP_TOKEN_URL , CLOUD_APIKEY , CLOUD_URL , TPP_PM_ROOT , CLOUD_ENTRUST_CA_NAME ,
2121 CLOUD_DIGICERT_CA_NAME , TPP_CA_NAME , TPP_USER , TPP_PASSWORD , CLOUD_TEAM )
22- from test_utils import timestamp
23- from vcert import TPPTokenConnection , CloudConnection , Authentication , SCOPE_PM , logger , VenafiError
22+ from test_utils import get_tpp_policy_name , get_vaas_zone
23+ from vcert import TPPTokenConnection , CloudConnection , Authentication , SCOPE_PM , logger , VenafiError , KeyType
2424from vcert .parser import json_parser , yaml_parser
2525from vcert .parser .utils import parse_policy_spec
2626from vcert .policy import (Policy , Subject , KeyPair , SubjectAltNames , Defaults , DefaultSubject , DefaultKeyPair ,
2727 PolicySpecification )
2828from vcert .policy .pm_cloud import (CA_TYPE_DIGICERT , CA_TYPE_ENTRUST , validate_policy_spec as validate_ps_vaas ,
29- get_ca_info , default_error_msg )
29+ get_ca_info , default_error_msg , ipv4 , ipv6 )
3030from vcert .policy .pm_tpp import (is_service_generated_csr , validate_policy_spec as validate_ps_tpp , no_match_error_msg ,
3131 too_many_error_msg , unsupported_error_msg , supported_key_types ,
3232 supported_rsa_key_sizes , supported_elliptic_curves )
@@ -50,7 +50,7 @@ def test_json_parsing(self):
5050 self ._assert_policy_spec (ps )
5151
5252 def test_json_serialization (self ):
53- ps = PolicySpecification (policy = _get_policy_obj (), defaults = _get_defaults_obj ())
53+ ps = PolicySpecification (policy = get_policy_obj (), defaults = get_defaults_obj ())
5454 json_parser .serialize (ps , 'test_json_serialization.json' )
5555
5656 def test_yaml_11_parsing (self ):
@@ -61,7 +61,7 @@ def test_yaml_12_parsing(self):
6161 self ._assert_policy_spec (ps )
6262
6363 def test_yaml_serialization (self ):
64- ps = PolicySpecification (policy = _get_policy_obj (), defaults = _get_defaults_obj ())
64+ ps = PolicySpecification (policy = get_policy_obj (), defaults = get_defaults_obj ())
6565 yaml_parser .serialize (ps , 'test_yaml_serialization.yaml' )
6666
6767 def _assert_policy_spec (self , ps ):
@@ -108,18 +108,18 @@ def test_create_policy_yaml(self):
108108 pass
109109
110110 def test_create_policy_full (self ):
111- policy = _get_policy_obj (ca_type = CA_TYPE_TPP )
111+ policy = get_policy_obj (ca_type = CA_TYPE_TPP )
112112 policy .key_pair .rsa_key_sizes = [2048 ]
113- self ._create_policy_tpp (policy = policy , defaults = _get_defaults_obj ())
113+ self ._create_policy_tpp (policy = policy , defaults = get_defaults_obj ())
114114
115115 def test_create_policy_empty (self ):
116116 self ._create_policy_tpp ()
117117
118118 def test_create_policy_no_policy (self ):
119- self ._create_policy_tpp (defaults = _get_defaults_obj ())
119+ self ._create_policy_tpp (defaults = get_defaults_obj ())
120120
121121 def test_create_policy_no_defaults (self ):
122- policy = _get_policy_obj (ca_type = CA_TYPE_TPP )
122+ policy = get_policy_obj (ca_type = CA_TYPE_TPP )
123123 policy .key_pair .rsa_key_sizes = [2048 ]
124124 self ._create_policy_tpp (policy = policy )
125125
@@ -149,7 +149,7 @@ def test_create_and_get_policy_with_contacts(self):
149149 self .assertEqual (1 , len (result .users ))
150150
151151 def _create_policy_tpp (self , policy_spec = None , policy = None , defaults = None ):
152- zone = f"{ TPP_PM_ROOT } \\ { _get_tpp_policy_name ()} "
152+ zone = f"{ TPP_PM_ROOT } \\ { get_tpp_policy_name ()} "
153153 create_policy (self .tpp_conn , zone , policy_spec , policy , defaults )
154154
155155
@@ -171,27 +171,81 @@ def test_create_policy_yaml(self):
171171 pass
172172
173173 def test_create_policy_full (self ):
174- self ._create_policy_cloud (policy = _get_policy_obj (), defaults = _get_defaults_obj ())
174+ self ._create_policy_cloud (policy = get_policy_obj (), defaults = get_defaults_obj ())
175175
176176 def test_create_policy_empty (self ):
177177 self ._create_policy_cloud ()
178178
179179 def test_create_policy_no_policy (self ):
180- self ._create_policy_cloud (defaults = _get_defaults_obj ())
180+ self ._create_policy_cloud (defaults = get_defaults_obj ())
181181
182182 def test_create_policy_no_defaults (self ):
183- self ._create_policy_cloud (policy = _get_policy_obj ())
183+ self ._create_policy_cloud (policy = get_policy_obj ())
184184
185185 def test_create_policy_entrust (self ):
186- self ._create_policy_cloud (policy = _get_policy_obj (ca_type = CA_TYPE_ENTRUST ), defaults = _get_defaults_obj ())
186+ self ._create_policy_cloud (policy = get_policy_obj (ca_type = CA_TYPE_ENTRUST ), defaults = get_defaults_obj ())
187187
188188 def test_create_policy_digicert (self ):
189- self ._create_policy_cloud (policy = _get_policy_obj (ca_type = CA_TYPE_DIGICERT ), defaults = _get_defaults_obj ())
189+ self ._create_policy_cloud (policy = get_policy_obj (ca_type = CA_TYPE_DIGICERT ), defaults = get_defaults_obj ())
190190
191191 def test_validate_domains (self ):
192- policy = self ._create_policy_cloud (policy = _get_policy_obj ())
192+ policy = self ._create_policy_cloud (policy = get_policy_obj ())
193193 self .assertListEqual (policy .policy .domains , POLICY_DOMAINS )
194194
195+ def test_csr_attributes_service (self ):
196+ cit = self ._create_csr_attributes_policy (service_generated_csr = True )
197+
198+ self .assertFalse (cit .csr_upload_allowed , "csrUploadAllowed attribute is not False" )
199+ self .assertTrue (cit .key_generated_by_venafi_allowed , "keyGeneratedByVenafiAllowed is not True" )
200+
201+ def test_csr_attributes_local (self ):
202+ cit = self ._create_csr_attributes_policy (service_generated_csr = False )
203+
204+ self .assertTrue (cit .csr_upload_allowed , "csrUploadAllowed attribute is not True" )
205+ self .assertFalse (cit .key_generated_by_venafi_allowed , "keyGeneratedByVenafiAllowed is not False" )
206+
207+ def test_csr_attributes_not_specified (self ):
208+ cit = self ._create_csr_attributes_policy ()
209+
210+ self .assertTrue (cit .csr_upload_allowed , "csrUploadAllowed attribute is not True" )
211+ self .assertTrue (cit .key_generated_by_venafi_allowed , "keyGeneratedByVenafiAllowed is not True" )
212+
213+ def test_ec_key_pair (self ):
214+ policy = get_policy_obj ()
215+ kp = KeyPair (
216+ key_types = ['EC' ],
217+ rsa_key_sizes = [2048 , 4096 ],
218+ elliptic_curves = ['P521' , 'P384' ],
219+ reuse_allowed = False )
220+ policy .key_pair = kp
221+
222+ defaults = get_defaults_obj ()
223+ defaults .key_pair = DefaultKeyPair (
224+ key_type = 'EC' ,
225+ rsa_key_size = 2048 ,
226+ elliptic_curve = 'P521' )
227+
228+ ps = self ._create_policy_cloud (policy = policy , defaults = defaults )
229+ self .assertEqual (ps .policy .key_pair .key_types [0 ].upper (), KeyType .ECDSA .upper (), "Policy Key Type is not EC" )
230+ self .assertTrue (len (ps .policy .key_pair .elliptic_curves ) == 2 ,
231+ f"Expected 2 accepted Elliptic Curves. Got { len (ps .policy .key_pair .elliptic_curves )} " )
232+ self .assertIn ('P521' , ['P521' , 'P384' ], "[P521] is not in the allowed Elliptic Curves list" )
233+ self .assertIn ('P384' , ['P521' , 'P384' ], "[P384] is not in the allowed Elliptic Curves list" )
234+
235+ def test_create_policy_uri_ip_email (self ):
236+ policy = get_policy_obj ()
237+ policy .subject_alt_names .email_allowed = True
238+ policy .subject_alt_names .uri_allowed = True
239+ policy .subject_alt_names .ip_allowed = True
240+ uri_protocols = ["https" , "ldaps" , "spiffe" ]
241+ policy .subject_alt_names .uri_protocols = uri_protocols
242+ ps = self ._create_policy_cloud (policy = policy , defaults = get_defaults_obj ())
243+ self .assertListEqual (ps .policy .subject_alt_names .ip_constraints , [ipv4 , ipv6 ])
244+ self .assertListEqual (ps .policy .subject_alt_names .uri_protocols , uri_protocols )
245+ self .assertTrue (ps .policy .subject_alt_names .email_allowed )
246+ self .assertTrue (ps .policy .subject_alt_names .uri_allowed )
247+ self .assertTrue (ps .policy .subject_alt_names .ip_allowed )
248+
195249 def test_create_policy_with_no_users (self ):
196250 zone = self ._get_random_zone ()
197251 connector = self .cloud_conn
@@ -277,13 +331,23 @@ def test_create_policy_with_team(self):
277331 self .assertEqual (CLOUD_TEAM , result .users [0 ])
278332
279333 def _create_policy_cloud (self , policy_spec = None , policy = None , defaults = None ):
280- zone = self . _get_random_zone ()
334+ zone = get_vaas_zone ()
281335 response = create_policy (self .cloud_conn , zone , policy_spec , policy , defaults )
282336 return response
283337
284- @staticmethod
285- def _get_random_zone ():
286- return _get_zone ()
338+ def _create_csr_attributes_policy (self , service_generated_csr = None ):
339+ """
340+
341+ :param bool service_generated_csr:
342+ :rtype: common.Policy
343+ """
344+ policy = get_policy_obj ()
345+ policy .key_pair .service_generated = service_generated_csr
346+ zone = get_vaas_zone ()
347+ create_policy (connector = self .cloud_conn , zone = zone , policy_spec = None , policy = policy )
348+ cit = self .cloud_conn ._get_template_by_id (zone )
349+
350+ return cit
287351
288352
289353class TestLocalMethods (unittest .TestCase ):
@@ -646,7 +710,7 @@ def create_policy(connector, zone, policy_spec=None, policy=None, defaults=None)
646710POLICY_DOMAINS = ['vfidev.com' , 'vfidev.net' , 'venafi.example' ]
647711
648712
649- def _get_policy_obj (ca_type = None ):
713+ def get_policy_obj (ca_type = None ):
650714 policy = Policy (
651715 subject = Subject (
652716 orgs = ['OSS Venafi, Inc.' ],
@@ -683,7 +747,7 @@ def _get_policy_obj(ca_type=None):
683747 return policy
684748
685749
686- def _get_defaults_obj ():
750+ def get_defaults_obj ():
687751 defaults = Defaults (
688752 d_subject = DefaultSubject (
689753 org = 'OSS Venafi, Inc.' ,
@@ -697,24 +761,3 @@ def _get_defaults_obj():
697761 elliptic_curve = 'P521' ),
698762 auto_installed = False )
699763 return defaults
700-
701-
702- def _get_app_name ():
703- name = 'vcert-python-app-{}'
704- return name
705-
706-
707- def _get_cit_name ():
708- cit_name = 'vcert-python-cit-{}'
709- return cit_name
710-
711-
712- def _get_zone ():
713- time = timestamp ()
714- zone = f"{ _get_app_name ().format (time )} \\ { _get_cit_name ().format (time )} "
715- return zone
716-
717-
718- def _get_tpp_policy_name ():
719- time = timestamp ()
720- return f"{ _get_app_name ().format (time )} "
0 commit comments