83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
import requests
|
|
import json
|
|
from keystone import domain
|
|
from keystone import project
|
|
|
|
def create_user(ip, token, user_name,password, email,domain_name,default_project_name, description):
|
|
headers = {'X-Auth-Token': token}
|
|
domain_id = domain.get_domain_id(ip, token, domain_name)
|
|
default_project_id = project.get_project_id(ip, token, default_project_name)
|
|
if domain_id is None:
|
|
return {"error": "域不存在"}
|
|
if default_project_id is None:
|
|
return {"项目不存在"}
|
|
body = {
|
|
"user": {
|
|
"name": user_name,
|
|
"password": password,
|
|
"email": email,
|
|
"domain_id": domain_id,
|
|
"default_project_id": default_project_id,
|
|
"description": description
|
|
}
|
|
}
|
|
resp = requests.post(f"http://{ip}:5000/v3/users", data=json.dumps(body), headers=headers)
|
|
return resp.json()
|
|
|
|
|
|
def get_users(ip, token):
|
|
headers = {'X-Auth-Token': token}
|
|
resp = requests.get(f"http://{ip}:5000/v3/users", headers=headers)
|
|
return resp.json()
|
|
|
|
|
|
def get_user_id(ip, token, user_name):
|
|
headers = {'X-Auth-Token': token}
|
|
result = requests.get(f"http://{ip}:5000/v3/users", headers=headers).json()
|
|
for item in result['users']:
|
|
if item['name'] == user_name:
|
|
return item['id']
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_user(ip, token, user_name):
|
|
user_id = get_user_id(ip, token, user_name)
|
|
if user_id == "NONE":
|
|
return {"error": "用户不存在"}
|
|
headers = {'X-Auth-Token': token}
|
|
api_url = f"http://{ip}:5000/v3/users/{user_id}"
|
|
resp = requests.get(api_url, headers=headers)
|
|
return resp.json()
|
|
|
|
|
|
def delete_user(ip, token, user_name):
|
|
user_id = get_user_id(ip, token, user_name)
|
|
if user_id == "NONE":
|
|
return {"error": "用户不存在"}
|
|
|
|
headers = {'X-Auth-Token': token}
|
|
api_url = f"http://{ip}:5000/v3/users/{user_id}"
|
|
resp = requests.delete(api_url, headers=headers)
|
|
if resp.status_code == 204:
|
|
return {"message":"用户删除成功"}
|
|
else:
|
|
return {"error": "用户删除失败", "status_code": resp.status_code}
|
|
def update_user_password(ip, token, user_name, new_password, original_password):
|
|
user_id = get_user_id(ip, token, user_name)
|
|
if user_id == "NONE":
|
|
return {"error": "User not found"}
|
|
|
|
headers = {'X-Auth-Token': token}
|
|
body = {
|
|
"user": {
|
|
"password": new_password,
|
|
"original_password": original_password
|
|
}
|
|
}
|
|
api_url = f"http://{ip}:5000/v3/users/{user_id}/password"
|
|
resp = requests.post(api_url, data=json.dumps(body), headers=headers)
|
|
if resp.status_code == 204:
|
|
return {"message": "密码更新成功"}
|
|
else:
|
|
return {"error": "密码更新失败", "status_code": resp.status_code} |