Skip to content

Commit 3bc52fb

Browse files
Merge pull request #108 from Venafi/contacts-support
Contacts support
2 parents 613ebcd + d6345fe commit 3bc52fb

7 files changed

Lines changed: 424 additions & 43 deletions

File tree

tests/test_env.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
CLOUD_URL = environ.get('CLOUD_URL')
3131
CLOUD_APIKEY = environ.get('CLOUD_APIKEY')
3232
CLOUD_ZONE = environ.get('CLOUD_ZONE')
33+
CLOUD_TEAM = environ.get('CLOUD_TEAM')
3334

3435
TPP_PM_ROOT = environ.get('TPP_PM_ROOT')
3536
TPP_CA_NAME = environ.get('TPP_CA_NAME')

tests/test_pm.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from pprint import pformat
1919

2020
from test_env import (TPP_TOKEN_URL, CLOUD_APIKEY, CLOUD_URL, TPP_PM_ROOT, CLOUD_ENTRUST_CA_NAME,
21-
CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD)
21+
CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD, CLOUD_TEAM)
2222
from test_utils import timestamp
2323
from vcert import TPPTokenConnection, CloudConnection, Authentication, SCOPE_PM, logger, VenafiError
2424
from vcert.parser import json_parser, yaml_parser
@@ -94,6 +94,9 @@ def __init__(self, *args, **kwargs):
9494
self.yaml_file = POLICY_SPEC_YAML
9595
super(TestTPPPolicyManagement, self).__init__(*args, **kwargs)
9696

97+
prefixed_universal = None
98+
username = "osstestuser"
99+
97100
def test_create_policy_from_json(self):
98101
# ps = json_parser.parse_file(self.json_file)
99102
# self._create_policy_tpp(policy_spec=ps)
@@ -120,6 +123,31 @@ def test_create_policy_no_defaults(self):
120123
policy.key_pair.rsa_key_sizes = [2048]
121124
self._create_policy_tpp(policy=policy)
122125

126+
def test_browse_identities(self):
127+
connector = self.tpp_conn
128+
identity = connector.get_tpp_identity(self.username)
129+
self.assertEqual(self.username, identity.name)
130+
self.prefixed_universal = identity.prefixed_universal
131+
132+
def test_validate_identity(self):
133+
connector = self.tpp_conn
134+
if not self.prefixed_universal:
135+
self.test_browse_identities()
136+
response = connector.validate_identity(self.prefixed_universal)
137+
self.assertIsNotNone(response)
138+
self.assertEqual(self.username, response.name)
139+
140+
def test_create_and_get_policy_with_contacts(self):
141+
connector = self.tpp_conn
142+
zone = f"{TPP_PM_ROOT}\\{_get_tpp_policy_name()}"
143+
policy_specification = PolicySpecification()
144+
policy_specification.policy = _get_policy_obj(ca_type=CA_TYPE_TPP)
145+
policy_specification.defaults = _get_defaults_obj()
146+
policy_specification.policy.key_pair.rsa_key_sizes = [2048]
147+
connector.set_policy(zone, policy_specification)
148+
result = connector.get_policy(zone)
149+
self.assertEqual(1, len(result.users))
150+
123151
def _create_policy_tpp(self, policy_spec=None, policy=None, defaults=None):
124152
zone = f"{TPP_PM_ROOT}\\{_get_tpp_policy_name()}"
125153
create_policy(self.tpp_conn, zone, policy_spec, policy, defaults)
@@ -164,6 +192,90 @@ def test_validate_domains(self):
164192
policy = self._create_policy_cloud(policy=_get_policy_obj())
165193
self.assertListEqual(policy.policy.domains, POLICY_DOMAINS)
166194

195+
def test_create_policy_with_no_users(self):
196+
zone = self._get_random_zone()
197+
connector = self.cloud_conn
198+
policy_specification = PolicySpecification()
199+
policy_specification.policy = _get_policy_obj()
200+
policy_specification.defaults = _get_defaults_obj()
201+
connector.set_policy(zone, policy_specification)
202+
result = connector.get_policy(zone)
203+
self.assertEqual(1, len(result.users))
204+
self.assertEqual("jenkins@opensource.qa.venafi.io", result.users[0])
205+
206+
def test_create_policy_with_users(self):
207+
zone = self._get_random_zone()
208+
connector = self.cloud_conn
209+
policy_specification = PolicySpecification()
210+
policy_specification.policy = _get_policy_obj()
211+
policy_specification.defaults = _get_defaults_obj()
212+
policy_specification.users = ["pki-admin@opensource.qa.venafi.io", "resource-owner@opensource.qa.venafi.io"]
213+
connector.set_policy(zone, policy_specification)
214+
result = connector.get_policy(zone)
215+
self.assertEqual(2, len(result.users))
216+
self.assertIn("pki-admin@opensource.qa.venafi.io", result.users)
217+
self.assertIn("resource-owner@opensource.qa.venafi.io", result.users)
218+
219+
def test_update_policy_with_no_users(self):
220+
zone = self._get_random_zone()
221+
connector = self.cloud_conn
222+
policy_specification = PolicySpecification()
223+
policy_specification.policy = _get_policy_obj()
224+
policy_specification.defaults = _get_defaults_obj()
225+
policy_specification.users = ["pki-admin@opensource.qa.venafi.io",
226+
"resource-owner@opensource.qa.venafi.io"]
227+
connector.set_policy(zone, policy_specification)
228+
result = connector.get_policy(zone)
229+
self.assertEqual(2, len(result.users))
230+
self.assertIn("pki-admin@opensource.qa.venafi.io", result.users)
231+
self.assertIn("resource-owner@opensource.qa.venafi.io", result.users)
232+
233+
# Update Policy Specification with no users
234+
policy_specification2 = PolicySpecification()
235+
policy_specification2.policy = _get_policy_obj()
236+
policy_specification2.defaults = _get_defaults_obj()
237+
connector.set_policy(zone, policy_specification2)
238+
result2 = connector.get_policy(zone)
239+
self.assertEqual(2, len(result2.users))
240+
self.assertIn("pki-admin@opensource.qa.venafi.io", result2.users)
241+
self.assertIn("resource-owner@opensource.qa.venafi.io", result2.users)
242+
243+
def test_update_policy_with_users(self):
244+
zone = self._get_random_zone()
245+
connector = self.cloud_conn
246+
policy_specification = PolicySpecification()
247+
policy_specification.policy = _get_policy_obj()
248+
policy_specification.defaults = _get_defaults_obj()
249+
policy_specification.users = ["jenkins@opensource.qa.venafi.io"]
250+
connector.set_policy(zone, policy_specification)
251+
result = connector.get_policy(zone)
252+
self.assertEqual(1, len(result.users))
253+
self.assertEqual("jenkins@opensource.qa.venafi.io", result.users[0])
254+
255+
# Update Policy Specification with users
256+
policy_specification2 = PolicySpecification()
257+
policy_specification2.policy = _get_policy_obj()
258+
policy_specification2.defaults = _get_defaults_obj()
259+
policy_specification2.users = ["pki-admin@opensource.qa.venafi.io",
260+
"resource-owner@opensource.qa.venafi.io"]
261+
connector.set_policy(zone, policy_specification2)
262+
result2 = connector.get_policy(zone)
263+
self.assertEqual(2, len(result2.users))
264+
self.assertIn("pki-admin@opensource.qa.venafi.io", result2.users)
265+
self.assertIn("resource-owner@opensource.qa.venafi.io", result2.users)
266+
267+
def test_create_policy_with_team(self):
268+
zone = self._get_random_zone()
269+
connector = self.cloud_conn
270+
policy_specification = PolicySpecification()
271+
policy_specification.policy = _get_policy_obj()
272+
policy_specification.defaults = _get_defaults_obj()
273+
policy_specification.users = [CLOUD_TEAM]
274+
connector.set_policy(zone, policy_specification)
275+
result = connector.get_policy(zone)
276+
self.assertEqual(1, len(result.users))
277+
self.assertEqual(CLOUD_TEAM, result.users[0])
278+
167279
def _create_policy_cloud(self, policy_spec=None, policy=None, defaults=None):
168280
zone = self._get_random_zone()
169281
response = create_policy(self.cloud_conn, zone, policy_spec, policy, defaults)

vcert/connection_cloud.py

Lines changed: 123 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
from .policy.pm_cloud import (build_policy_spec, validate_policy_spec, AccountDetails, build_cit_request, build_user,
3535
UserDetails, build_company, build_apikey, build_app_update_request, get_ca_info,
3636
CertificateAuthorityDetails, CertificateAuthorityInfo, build_account_details,
37-
build_app_create_request)
38-
from .vaas_utils import AppDetails, RecommendedSettings, EdgeEncryptionKey, zip_to_pem, value_matches_regex
37+
build_app_create_request, build_team, get_cit_data_from_response, build_owner_json)
38+
from .vaas_utils import AppDetails, OwnerIdsAndTypes, RecommendedSettings, EdgeEncryptionKey, zip_to_pem, value_matches_regex
3939

4040
TOKEN_HEADER_NAME = "tppl-api-key" # nosec
4141
APPLICATION_SERVER_TYPE_ID = "784938d1-ef0d-11eb-9461-7bb533ba575b"
@@ -50,6 +50,8 @@
5050
CSR_ATTR_COUNTRY = 'country'
5151
CSR_ATTR_SANS_BY_TYPE = 'subjectAlternativeNamesByType'
5252
CSR_ATTR_SANS_DNS = 'dnsNames'
53+
OWNER_TYPE_USER = "USER"
54+
OWNER_TYPE_TEAM = "TEAM"
5355

5456
log = get_child("connection-vaas")
5557

@@ -87,6 +89,9 @@ def __init__(self):
8789
ISSUING_TEMPLATES_UPDATE = ISSUING_TEMPLATES + "/{}"
8890
USER_ACCOUNTS = API_VERSION + "useraccounts"
8991
DEK_PUBLIC_KEY = API_VERSION + "edgeencryptionkeys/{}"
92+
USERS_USERNAME = API_VERSION + "users/username/{}"
93+
USERS_ID = API_VERSION + "users/{}"
94+
TEAMS_ID = API_VERSION + "teams"
9095

9196

9297
class CondorChainOptions:
@@ -119,6 +124,16 @@ def _parse_zone(zone):
119124
return app_name, cit_alias
120125

121126

127+
def create_owner(owner_type, owner_id):
128+
owner = OwnerIdsAndTypes(owner_type, owner_id)
129+
return owner
130+
131+
132+
def resolve_apikey_owner(user_details):
133+
owner = create_owner(OWNER_TYPE_USER, user_details.user.user_id)
134+
return owner
135+
136+
122137
class CloudConnection(CommonConnection):
123138
def __init__(self, token, url=None, http_request_kwargs=None):
124139
super().__init__()
@@ -607,22 +622,118 @@ def set_policy(self, zone, policy_spec):
607622
raise VenafiError('User Details not found')
608623

609624
app_details = self._get_app_details_by_name(app_name)
625+
for_update, owners_list = self.resolve_owners(policy_spec.users, user_details)
610626
if app_details:
611-
# Application exists. Update with cit
612-
if not self._policy_exists(zone):
627+
# Application exists. Update with cit and owners
628+
if for_update and len(policy_spec.users) == 0:
629+
log.debug("No users provided in the policy specification")
630+
else:
631+
owner_list = build_owner_json(owners_list)
632+
app_details.owner_ids_and_types = owner_list
633+
if app_details.cit_alias_id_map:
613634
# Only link cit with Application when cit is not already associated with Application
614-
app_req = build_app_update_request(app_details, resp_cit_data)
615-
status, data = self._put(URLS.APP_BY_ID.format(app_details.app_id), app_req)
616-
if status != HTTPStatus.OK:
617-
raise VenafiError(f"Could not update Application [{app_name}] with cit [{resp_cit_data}]")
635+
cit_map = app_details.cit_alias_id_map
636+
cit_id, cit_name = get_cit_data_from_response(cit_data)
637+
cit_map[cit_name] = cit_id
638+
app_req = build_app_update_request(app_details, cit_map)
639+
status, data = self._put(URLS.APP_BY_ID.format(app_details.app_id), app_req)
640+
if status != HTTPStatus.OK:
641+
raise VenafiError(f"Could not update Application [{app_name}] with cit [{resp_cit_data}]")
618642
else:
619643
# Application does not exist. Create one
620-
app_req = build_app_create_request(app_name, user_details, resp_cit_data)
644+
app_req = build_app_create_request(app_name, owners_list, resp_cit_data)
621645
status, data = self._post(URLS.APPLICATIONS, app_req)
622646
if status != HTTPStatus.CREATED:
623647
raise VenafiError(f"Could not create application [{app_name}]")
624648
return
625649

650+
def resolve_owners(self, users_list, user_details):
651+
owners_list = list()
652+
for_update = False
653+
if not users_list:
654+
# When no users are provided on the list, adds apikey user as owner
655+
api_owner = resolve_apikey_owner(user_details)
656+
owners_list.append(api_owner)
657+
for_update = True
658+
else:
659+
# Resolving the usernames list
660+
# Creating a higher level Teams object to cache the response
661+
teams_list = list()
662+
users_list = list(set(users_list))
663+
for username in users_list:
664+
cloud_owner = self.resolve_user_to_cloud_owner(username)
665+
if cloud_owner:
666+
owners_list.append(cloud_owner)
667+
else:
668+
team_owner = self.resolve_user_to_cloud_team(username)
669+
if not team_owner:
670+
raise VenafiError(f"Unable to find identity [{username}]")
671+
owners_list.append(team_owner)
672+
return for_update, owners_list
673+
674+
def resolve_user_to_cloud_owner(self, username):
675+
user = self.get_vaas_identity(username)
676+
if not user:
677+
return None
678+
owner = create_owner(OWNER_TYPE_USER, user.user_id)
679+
return owner
680+
681+
def resolve_user_to_cloud_team(self, username):
682+
teams = list()
683+
data_teams = None
684+
try:
685+
status, response = self._get(URLS.TEAMS_ID)
686+
data_teams = response['teams']
687+
except VenafiError as err:
688+
log.debug(f"Error while getting team [{username}]: {err.args[0]}")
689+
# date_teams will never be empty, we are always expecting at least one team
690+
for data in data_teams:
691+
team = build_team(data)
692+
teams.append(team)
693+
if len(teams) > 0:
694+
for team in teams:
695+
if team.name == username:
696+
owner = create_owner(OWNER_TYPE_TEAM, team.team_id)
697+
return owner
698+
return None
699+
700+
def get_vaas_identity(self, username):
701+
if not username or username == "":
702+
raise VenafiError("Username cannot be empty")
703+
url = URLS.USERS_USERNAME.format(username)
704+
try:
705+
status, response = self._get(url)
706+
if status == HTTPStatus.NOT_FOUND:
707+
return None
708+
identities = response['users'][0]
709+
identity = build_user(identities)
710+
return identity
711+
except VenafiError as err:
712+
log.debug(f"Unable to find identity [{username}] of type USER: {err.args[0]}")
713+
714+
def resolve_cloud_owners_names(self, zone):
715+
app_name, cit_alias = _parse_zone(zone)
716+
app_details = self._get_app_details_by_name(app_name)
717+
users_list = list()
718+
teams_response = list()
719+
for owner in app_details.owner_ids_and_types:
720+
if owner['ownerType'] == OWNER_TYPE_USER:
721+
status, data = self._get(URLS.USERS_ID.format(owner['ownerId']))
722+
user = build_user(data)
723+
users_list.append(user.username)
724+
elif owner['ownerType'] == OWNER_TYPE_TEAM:
725+
if not teams_response:
726+
status, data = self._get(URLS.TEAMS_ID)
727+
data_team = data['teams']
728+
for t in data_team:
729+
team = build_team(t)
730+
teams_response.append(team)
731+
if teams_response:
732+
for team in teams_response:
733+
if team.team_id == owner['ownerId']:
734+
users_list.append(team.name)
735+
return users_list
736+
626737
def _get_ca_details(self, ca_name):
627738
"""
628739
:param str ca_name:
@@ -825,6 +936,8 @@ def _get_policy(self, zone, subject_cn_to_str):
825936
raise VenafiError("Certificate Authority info not found")
826937

827938
ps = build_policy_spec(cit, info, subject_cn_to_str)
939+
users_list = self.resolve_cloud_owners_names(zone)
940+
ps.users = users_list
828941
return ps
829942

830943
def _get_dek_hash(self, cert_id):
@@ -875,3 +988,4 @@ def _retrieve_service_generated_cert(self, request, dek_info):
875988

876989
cert, chain, private_key = zip_to_pem(data, request.chain_option)
877990
return Certificate(cert=cert, chain=chain, key=private_key)
991+

0 commit comments

Comments
 (0)