mirror of
				https://opendev.org/x/pyghmi
				synced 2025-10-26 08:55:20 +00:00 
			
		
		
		
	Implement redfish user management
Provide a close approximation of the ipmi module api with redfish backing. Change-Id: I60e448f67175959f381b4d9c6eacb9b5b4e22931
This commit is contained in:
		| @@ -18,6 +18,7 @@ | ||||
| # The command module for redfish systems.  Provides https-only support | ||||
| # for redfish compliant endpoints | ||||
|  | ||||
| import base64 | ||||
| from datetime import datetime, timedelta | ||||
| from fnmatch import fnmatch | ||||
| import json | ||||
| @@ -263,6 +264,168 @@ class Command(object): | ||||
|         self.powerurl = self.sysinfo.get('Actions', {}).get( | ||||
|             '#ComputerSystem.Reset', {}).get('target', None) | ||||
|  | ||||
|     @property | ||||
|     def _accountserviceurl(self): | ||||
|         sroot = self._do_web_request('/redfish/v1/') | ||||
|         return sroot.get('AccountService', {}).get('@odata.id', None) | ||||
|  | ||||
|     def get_users(self): | ||||
|         """get list of users and channel access information (helper) | ||||
|  | ||||
|         :param channel: number [1:7] | ||||
|  | ||||
|         :return: | ||||
|             name: (str) | ||||
|             uid: (int) | ||||
|             channel: (int) | ||||
|             access: | ||||
|                 callback (bool) | ||||
|                 link_auth (bool) | ||||
|                 ipmi_msg (bool) | ||||
|                 privilege_level: (str)[callback, user, operatorm administrator, | ||||
|                                        proprietary, no_access] | ||||
|         """ | ||||
|         srvurl = self._accountserviceurl | ||||
|         names = {} | ||||
|         if srvurl: | ||||
|             srvinfo = self._do_web_request(srvurl) | ||||
|             srvurl = srvinfo.get('Accounts', {}).get('@odata.id', None) | ||||
|             if srvurl: | ||||
|                 srvinfo = self._do_web_request(srvurl) | ||||
|                 accounts = srvinfo.get('Members', []) | ||||
|                 for account in accounts: | ||||
|                     accinfo = self._do_web_request(account['@odata.id']) | ||||
|                     currname = accinfo.get('UserName', '') | ||||
|                     currid = accinfo.get('Id', None) | ||||
|                     if currname: | ||||
|                         names[currid] = {'name': currname, 'uid': currid, | ||||
|                             'access': { | ||||
|                                 'privilege_level': accinfo.get( | ||||
|                                     'RoleId', 'Unknown')}} | ||||
|         return names | ||||
|  | ||||
|     def _account_url_info_by_id(self, uid): | ||||
|         srvurl = self._accountserviceurl | ||||
|         if srvurl: | ||||
|             srvinfo = self._do_web_request(srvurl) | ||||
|             srvurl = srvinfo.get('Accounts', {}).get('@odata.id', None) | ||||
|             if srvurl: | ||||
|                 srvinfo = self._do_web_request(srvurl) | ||||
|                 accounts = srvinfo.get('Members', []) | ||||
|                 for account in accounts: | ||||
|                     accinfo = self._do_web_request(account['@odata.id']) | ||||
|                     currid = accinfo.get('Id', None) | ||||
|                     if str(currid) == str(uid): | ||||
|                         return account['@odata.id'], accinfo | ||||
|  | ||||
|     def get_user(self, uid): | ||||
|         srvurl = self._accountserviceurl | ||||
|         if srvurl: | ||||
|             srvinfo = self._do_web_request(srvurl) | ||||
|             srvurl = srvinfo.get('Accounts', {}).get('@odata.id', None) | ||||
|             if srvurl: | ||||
|                 srvinfo = self._do_web_request(srvurl) | ||||
|                 accounts = srvinfo.get('Members', []) | ||||
|                 for account in accounts: | ||||
|                     accinfo = self._do_web_request(account['@odata.id']) | ||||
|                     currname = accinfo.get('UserName', '') | ||||
|                     currid = accinfo.get('Id', None) | ||||
|                     if str(currid) == str(uid): | ||||
|                         return {'name': currname, 'uid': uid, | ||||
|                                 'access': { | ||||
|                                     'privilege_level': accinfo.get( | ||||
|                                         'RoleId', 'Unknown')}} | ||||
|  | ||||
|     def set_user_password(self, uid, mode='set_password', password=None): | ||||
|         """Set user password and (modes) | ||||
|  | ||||
|         :param uid: id number of user.  see: get_names_uid()['name'] | ||||
|  | ||||
|         :param mode: | ||||
|             disable       = disable user connections | ||||
|             enable        = enable user connections | ||||
|             set_password  = set or ensure password | ||||
|  | ||||
|         :param password: Password | ||||
|             (optional when mode is [disable or enable]) | ||||
|  | ||||
|         :return: | ||||
|             True on success | ||||
|         """ | ||||
|  | ||||
|         accinfo = self._account_url_info_by_id(uid) | ||||
|         if not accinfo: | ||||
|             raise Exception("No such account found") | ||||
|         etag = accinfo[1].get('@odata.etag', None) | ||||
|         if mode == 'set_password': | ||||
|             self._do_web_request(accinfo[0], {'Password': password}, method='PATCH', etag=etag) | ||||
|         elif mode=='disable': | ||||
|             self._do_web_request(accinfo[0], {'Enabled': False}, method='PATCH', etag=etag) | ||||
|         elif mode == 'enable': | ||||
|             self._do_web_request(accinfo[0], {'Enabled': True}, method='PATCH', etag=etag) | ||||
|         return True | ||||
|  | ||||
|     def disable_user(self, uid, mode): | ||||
|         """Disable User | ||||
|  | ||||
|         Just disable the User. | ||||
|         This will not disable the password or revoke privileges. | ||||
|  | ||||
|         :param uid: user id | ||||
|         :param mode: | ||||
|             disable       = disable user connections | ||||
|             enable        = enable user connections | ||||
|         """ | ||||
|         self.set_user_password(uid, mode) | ||||
|         return True | ||||
|  | ||||
|     def set_user_access(self, uid, privilege_level='ReadOnly'): | ||||
|         accinfo = self._account_url_info_by_id(uid) | ||||
|         if not accinfo: | ||||
|             raise Exception("Unable to find indicated uid") | ||||
|         etag = accinfo[1].get('@odata.etag', None) | ||||
|         self._do_web_request(accinfo[0], {'RoleId': privilege_level}, method='PATCH', etag=etag) | ||||
|  | ||||
|     def create_user(self, uid, name, password, privilege_level='ReadOnly'): | ||||
|         """create/ensure a user is created with provided settings | ||||
|  | ||||
|         :param privilege_level: | ||||
|             User Privilege level.  Redfish role, commonly Administrator, Operator, and ReadOnly | ||||
|         """ | ||||
|         accinfo = self._account_url_info_by_id(uid) | ||||
|         if not accinfo: | ||||
|             raise Exception("Unable to find indicated uid") | ||||
|         etag = accinfo[1].get('@odata.etag', None) | ||||
|         userinfo = { | ||||
|             "UserName": name, | ||||
|             "Password": password, | ||||
|             "RoleId": privilege_level, | ||||
|         } | ||||
|         self._do_web_request(accinfo[0], userinfo, method='PATCH', etag=etag) | ||||
|         return True | ||||
|  | ||||
|     def user_delete(self, uid): | ||||
|         # Redfish doesn't do so well with Deleting users either...  Blanking the username seems to be the | ||||
|         # convention | ||||
|         # First, set a bogus password in case the implementation does honor blank user, at least render such | ||||
|         # an account harmless | ||||
|         self.set_user_password(uid, base64.b64encode(os.urandom(15))) | ||||
|         self.set_user_name(uid, '') | ||||
|         return True | ||||
|  | ||||
|     def set_user_name(self, uid, name): | ||||
|         """Set user name | ||||
|  | ||||
|         :param uid: user id | ||||
|         :param name: username | ||||
|         """ | ||||
|         accinfo = self._account_url_info_by_id(uid) | ||||
|         if not accinfo: | ||||
|             raise Exception("No such account found") | ||||
|         etag = accinfo[1].get('@odata.etag', None) | ||||
|         self._do_web_request(accinfo[0], {'UserName': name}, method='PATCH', etag=etag) | ||||
|         return True | ||||
|  | ||||
|     @property | ||||
|     def _updateservice(self): | ||||
|         if not self._varupdateservice: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user