diff --git a/res/ab.py b/res/ab.py index 338bd3c64..c2ba59d2b 100644 --- a/res/ab.py +++ b/res/ab.py @@ -39,7 +39,14 @@ def view_shared_abs(url, token, name=None): while True: filtered_params["current"] = current response = requests.get(f"{url}/api/ab/shared/profiles", headers=headers, params=filtered_params) + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) data = response_json.get("data", []) abs.extend(data) @@ -84,7 +91,14 @@ def view_ab_peers(url, token, ab_guid, peer_id=None, alias=None): while True: filtered_params["current"] = current response = requests.get(f"{url}/api/ab/peers", headers=headers, params=filtered_params) + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) data = response_json.get("data", []) peers.extend(data) @@ -103,11 +117,6 @@ def view_ab_tags(url, token, ab_guid): response = requests.get(f"{url}/api/ab/tags/{ab_guid}", headers=headers) response_json = check_response(response) - # Handle error responses - if isinstance(response_json, tuple) and response_json[0] == "Failed": - print(f"Error: {response_json[1]} - {response_json[2]}") - return [] - # Format color values as hex if response_json: for tag in response_json: @@ -122,14 +131,18 @@ def view_ab_tags(url, token, ab_guid): def check_response(response): """Check API response and return result""" - if response.status_code == 200: - try: - response_json = response.json() - return response_json - except ValueError: - return response.text or "Success" - else: - return "Failed", response.status_code, response.text + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + + try: + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) + return response_json + except ValueError: + return response.text or "Success" def add_peer(url, token, ab_guid, peer_id, alias=None, note=None, tags=None, password=None): @@ -395,7 +408,14 @@ def view_ab_rules(url, token, ab_guid): while True: params["current"] = current response = requests.get(f"{url}/api/ab/rules", headers=headers, params=params) + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) data = response_json.get("data", []) rules.extend(data) diff --git a/res/audits.py b/res/audits.py index b5cf28504..d843233da 100644 --- a/res/audits.py +++ b/res/audits.py @@ -149,14 +149,18 @@ def enhance_audit_data(data, audit_type): def check_response(response): """Check API response and return result""" - if response.status_code == 200: - try: - response_json = response.json() - return response_json - except ValueError: - return response.text or "Success" - else: - return "Failed", response.status_code, response.text + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + + try: + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) + return response_json + except ValueError: + return response.text or "Success" def view_audits_common(url, token, endpoint, filters=None, page_size=None, current=None, @@ -216,7 +220,7 @@ def view_audits_common(url, token, endpoint, filters=None, page_size=None, curre string_params[k] = v response = requests.get(f"{url}/api/audits/{endpoint}", headers=headers, params=string_params) - response_json = response.json() + response_json = check_response(response) # Enhance the data with readable formats data = enhance_audit_data(response_json.get("data", []), endpoint) diff --git a/res/device_group.py b/res/device_group.py new file mode 100755 index 000000000..ec98de15b --- /dev/null +++ b/res/device_group.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 + +import requests +import argparse +import json + + +def check_response(response): + """ + Check API response and handle errors. + + Two error cases: + 1. Status code is not 200 -> exit with error + 2. Response contains {"error": "xxx"} -> exit with error + """ + if response.status_code != 200: + print(f"Error: HTTP {response.status_code}: {response.text}") + exit(1) + + # Check for {"error": "xxx"} in response + if response.text and response.text.strip(): + try: + json_data = response.json() + if isinstance(json_data, dict) and "error" in json_data: + print(f"Error: {json_data['error']}") + exit(1) + return json_data + except ValueError: + return response.text + + return None + + +def headers_with(token): + return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + +# ---------- Device Group APIs ---------- + +def list_groups(url, token, name=None, page_size=50): + headers = headers_with(token) + params = {"pageSize": page_size} + if name: + params["name"] = name + data, current = [], 1 + while True: + params["current"] = current + r = requests.get(f"{url}/api/device-groups", headers=headers, params=params) + if r.status_code != 200: + print(f"Error: HTTP {r.status_code} - {r.text}") + exit(1) + res = r.json() + if "error" in res: + print(f"Error: {res['error']}") + exit(1) + rows = res.get("data", []) + data.extend(rows) + total = res.get("total", 0) + current += page_size + if len(rows) < page_size or current > total: + break + return data + + +def get_group_by_name(url, token, name): + groups = list_groups(url, token, name) + for g in groups: + if str(g.get("name")) == name: + return g + return None + + +def create_group(url, token, name, note=None, accessed_from=None): + headers = headers_with(token) + payload = {"name": name} + if note: + payload["note"] = note + if accessed_from: + payload["allowed_incomings"] = accessed_from + r = requests.post(f"{url}/api/device-groups", headers=headers, json=payload) + return check_response(r) + + +def update_group(url, token, name, new_name=None, note=None, accessed_from=None): + headers = headers_with(token) + g = get_group_by_name(url, token, name) + if not g: + print(f"Error: Group '{name}' not found") + exit(1) + guid = g.get("guid") + payload = {} + if new_name is not None: + payload["name"] = new_name + if note is not None: + payload["note"] = note + if accessed_from is not None: + payload["allowed_incomings"] = accessed_from + r = requests.patch(f"{url}/api/device-groups/{guid}", headers=headers, json=payload) + check_response(r) + return "Success" + + +def delete_groups(url, token, names): + headers = headers_with(token) + if isinstance(names, str): + names = [names] + for n in names: + g = get_group_by_name(url, token, n) + if not g: + print(f"Error: Group '{n}' not found") + exit(1) + guid = g.get("guid") + r = requests.delete(f"{url}/api/device-groups/{guid}", headers=headers) + check_response(r) + return "Success" + + +# ---------- Device group assign APIs (name -> guid) ---------- + +def view_devices(url, token, group_name=None, id=None, device_name=None, + user_name=None, device_username=None, page_size=50): + """View devices in a device group with filters""" + headers = headers_with(token) + + # Separate exact match and fuzzy match params + params = {} + fuzzy_params = { + "id": id, + "device_name": device_name, + "user_name": user_name, + "device_username": device_username, + } + + # Add device_group_name without wildcard (exact match) + if group_name: + params["device_group_name"] = group_name + + # Add wildcard for fuzzy search to other params + for k, v in fuzzy_params.items(): + if v is not None: + params[k] = "%" + v + "%" if (v != "-" and "%" not in v) else v + + params["pageSize"] = page_size + + data, current = [], 1 + while True: + params["current"] = current + r = requests.get(f"{url}/api/devices", headers=headers, params=params) + if r.status_code != 200: + return check_response(r) + res = r.json() + rows = res.get("data", []) + data.extend(rows) + total = res.get("total", 0) + current += page_size + if len(rows) < page_size or current > total: + break + return data + + +def add_devices(url, token, group_name, device_ids): + headers = headers_with(token) + g = get_group_by_name(url, token, group_name) + if not g: + return f"Group '{group_name}' not found" + guid = g.get("guid") + payload = device_ids if isinstance(device_ids, list) else [device_ids] + r = requests.post(f"{url}/api/device-groups/{guid}", headers=headers, json=payload) + return check_response(r) + + +def remove_devices(url, token, group_name, device_ids): + headers = headers_with(token) + g = get_group_by_name(url, token, group_name) + if not g: + return f"Group '{group_name}' not found" + guid = g.get("guid") + payload = device_ids if isinstance(device_ids, list) else [device_ids] + r = requests.delete(f"{url}/api/device-groups/{guid}/devices", headers=headers, json=payload) + return check_response(r) + + +def parse_rules(s): + if not s: + return None + try: + v = json.loads(s) + if isinstance(v, list): + # expect list of {"type": number, "name": string} + return v + except Exception: + pass + return None + + +def main(): + parser = argparse.ArgumentParser(description="Device Group manager") + parser.add_argument("command", choices=[ + "view", "add", "update", "delete", + "view-devices", "add-devices", "remove-devices" + ], help=( + "Command to execute. " + "[view/add/update/delete/add-devices/remove-devices: require Device Group Permission] " + "[view-devices: require Device Permission]" + )) + parser.add_argument("--url", required=True) + parser.add_argument("--token", required=True) + + parser.add_argument("--name", help="Device group name (exact match)") + parser.add_argument("--new-name", help="New device group name (for update)") + parser.add_argument("--note", help="Note") + + parser.add_argument("--accessed-from", help="JSON array: '[{\"type\":0|2,\"name\":\"...\"}]' (0=User Group, 2=User)") + + parser.add_argument("--ids", help="Comma separated device IDs for add-devices/remove-devices") + + # Filters for view-devices command + parser.add_argument("--id", help="Device ID filter (for view-devices)") + parser.add_argument("--device-name", help="Device name filter (for view-devices)") + parser.add_argument("--user-name", help="User name filter (owner of device, for view-devices)") + parser.add_argument("--device-username", help="Device username filter (logged in user on device, for view-devices)") + + args = parser.parse_args() + while args.url.endswith("/"): args.url = args.url[:-1] + + if args.command == "view": + res = list_groups(args.url, args.token, args.name) + print(json.dumps(res, indent=2)) + elif args.command == "add": + if not args.name: + print("Error: --name is required") + exit(1) + print(create_group( + args.url, args.token, args.name, args.note, + parse_rules(args.accessed_from) + )) + elif args.command == "update": + if not args.name: + print("Error: --name is required") + exit(1) + print(update_group( + args.url, args.token, args.name, args.new_name, args.note, + parse_rules(args.accessed_from) + )) + elif args.command == "delete": + if not args.name: + print("Error: --name is required (supports comma separated)") + exit(1) + names = [x.strip() for x in args.name.split(",") if x.strip()] + print(delete_groups(args.url, args.token, names)) + elif args.command == "view-devices": + res = view_devices( + args.url, + args.token, + group_name=args.name, + id=args.id, + device_name=args.device_name, + user_name=args.user_name, + device_username=args.device_username + ) + print(json.dumps(res, indent=2)) + elif args.command in ("add-devices", "remove-devices"): + if not args.name or not args.ids: + print("Error: --name and --ids are required for add/remove devices") + exit(1) + ids = [x.strip() for x in args.ids.split(",") if x.strip()] + if args.command == "add-devices": + print(add_devices(args.url, args.token, args.name, ids)) + else: + print(remove_devices(args.url, args.token, args.name, ids)) + + +if __name__ == "__main__": + main() diff --git a/res/devices.py b/res/devices.py index fce68ad8f..ba11866e5 100755 --- a/res/devices.py +++ b/res/devices.py @@ -39,7 +39,14 @@ def view( while True: params["current"] = current response = requests.get(f"{url}/api/devices", headers=headers, params=params) + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) data = response_json.get("data", []) @@ -62,14 +69,18 @@ def view( def check(response): - if response.status_code == 200: - try: - response_json = response.json() - return response_json - except ValueError: - return response.text or "Success" - else: - return "Failed", response.status_code, response.text + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + + try: + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) + return response_json + except ValueError: + return response.text or "Success" def disable(url, token, guid, id): diff --git a/res/strategies.py b/res/strategies.py new file mode 100755 index 000000000..178d8d9e7 --- /dev/null +++ b/res/strategies.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 + +import requests +import argparse +import json + + +def check_response(response): + """ + Check API response and handle errors. + + Two error cases: + 1. Status code is not 200 -> exit with error + 2. Response contains {"error": "xxx"} -> exit with error + """ + if response.status_code != 200: + print(f"Error: HTTP {response.status_code}: {response.text}") + exit(1) + + # Check for {"error": "xxx"} in response + if response.text and response.text.strip(): + try: + json_data = response.json() + if isinstance(json_data, dict) and "error" in json_data: + print(f"Error: {json_data['error']}") + exit(1) + return json_data + except ValueError: + return response.text + + return None + + +def headers_with(token): + return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + +# ---------- Strategies APIs ---------- + +def list_strategies(url, token): + """List all strategies""" + headers = headers_with(token) + r = requests.get(f"{url}/api/strategies", headers=headers) + return check_response(r) + + +def get_strategy_by_guid(url, token, guid): + """Get strategy by GUID""" + headers = headers_with(token) + r = requests.get(f"{url}/api/strategies/{guid}", headers=headers) + return check_response(r) + + +def get_strategy_by_name(url, token, name): + """Get strategy by name""" + strategies = list_strategies(url, token) + if not strategies: + return None + for s in strategies: + if str(s.get("name")) == name: + return s + return None + + +def enable_strategy(url, token, name): + """Enable a strategy""" + headers = headers_with(token) + strategy = get_strategy_by_name(url, token, name) + if not strategy: + print(f"Error: Strategy '{name}' not found") + exit(1) + guid = strategy.get("guid") + r = requests.put(f"{url}/api/strategies/{guid}/status", headers=headers, json=True) + check_response(r) + return "Success" + + +def disable_strategy(url, token, name): + """Disable a strategy""" + headers = headers_with(token) + strategy = get_strategy_by_name(url, token, name) + if not strategy: + print(f"Error: Strategy '{name}' not found") + exit(1) + guid = strategy.get("guid") + r = requests.put(f"{url}/api/strategies/{guid}/status", headers=headers, json=False) + check_response(r) + return "Success" + + +def get_device_guid_by_id(url, token, device_id): + """Get device GUID by device ID (exact match)""" + headers = headers_with(token) + params = {"id": device_id, "pageSize": 50} + r = requests.get(f"{url}/api/devices", headers=headers, params=params) + res = check_response(r) + if not res: + return None + + devices_data = res.get("data", []) if isinstance(res, dict) else res + for d in devices_data: + if d.get("id") == device_id: + return d.get("guid") + return None + + +def get_user_guid_by_name(url, token, name): + """Get user GUID by exact name match""" + headers = headers_with(token) + params = {"name": name, "pageSize": 50} + r = requests.get(f"{url}/api/users", headers=headers, params=params) + res = check_response(r) + if not res: + return None + + users_data = res.get("data", []) if isinstance(res, dict) else res + for u in users_data: + if u.get("name") == name: + return u.get("guid") + return None + + +def get_device_group_guid_by_name(url, token, name): + """Get device group GUID by exact name match""" + headers = headers_with(token) + params = {"pageSize": 50, "name": name} + r = requests.get(f"{url}/api/device-groups", headers=headers, params=params) + res = check_response(r) + if not res: + return None + + groups_data = res.get("data", []) if isinstance(res, dict) else res + for g in groups_data: + if g.get("name") == name: + return g.get("guid") + return None + + +def assign_strategy(url, token, strategy_name, peers=None, users=None, device_groups=None): + """ + Assign strategy to peers, users, or device groups + + Args: + strategy_name: Name of the strategy (or None to unassign) + peers: List of device IDs or GUIDs + users: List of user names or GUIDs + device_groups: List of device group names or GUIDs + """ + headers = headers_with(token) + + # Get strategy GUID if strategy_name is provided + strategy_guid = None + if strategy_name: + strategy = get_strategy_by_name(url, token, strategy_name) + if not strategy: + print(f"Error: Strategy '{strategy_name}' not found") + exit(1) + strategy_guid = strategy.get("guid") + + # Convert device IDs to GUIDs + peer_guids = [] + if peers: + for peer in peers: + # Check if it's already a GUID format + if len(peer) == 36 and peer.count('-') == 4: + peer_guids.append(peer) + else: + # Treat as device ID, look it up + guid = get_device_guid_by_id(url, token, peer) + if not guid: + print(f"Error: Device '{peer}' not found") + exit(1) + peer_guids.append(guid) + + # Convert user names to GUIDs + user_guids = [] + if users: + for user in users: + # Check if it's already a GUID format + if len(user) == 36 and user.count('-') == 4: + user_guids.append(user) + else: + # Treat as username, look it up + guid = get_user_guid_by_name(url, token, user) + if not guid: + print(f"Error: User '{user}' not found") + exit(1) + user_guids.append(guid) + + # Convert device group names to GUIDs + device_group_guids = [] + if device_groups: + for dg in device_groups: + # Check if it's already a GUID format + if len(dg) == 36 and dg.count('-') == 4: + device_group_guids.append(dg) + else: + # Treat as device group name, look it up + guid = get_device_group_guid_by_name(url, token, dg) + if not guid: + print(f"Error: Device group '{dg}' not found") + exit(1) + device_group_guids.append(guid) + + # Build payload + payload = {} + if strategy_guid: + payload["strategy"] = strategy_guid + + payload["peers"] = peer_guids + payload["users"] = user_guids + payload["groups"] = device_group_guids + + r = requests.post(f"{url}/api/strategies/assign", headers=headers, json=payload) + check_response(r) + + +def main(): + parser = argparse.ArgumentParser(description="Strategy manager") + parser.add_argument("command", choices=[ + "list", "view", "enable", "disable", "assign", "unassign" + ]) + parser.add_argument("--url", required=True, help="Server URL") + parser.add_argument("--token", required=True, help="API token") + + parser.add_argument("--name", help="Strategy name (for view/enable/disable/assign commands)") + parser.add_argument("--guid", help="Strategy GUID (for view command, alternative to --name)") + + # For assign/unassign commands + parser.add_argument("--peers", help="Comma separated device IDs or GUIDs (requires Device Permission:r)") + parser.add_argument("--users", help="Comma separated user names or GUIDs (requires User Permission:r)") + parser.add_argument("--device-groups", help="Comma separated device group names or GUIDs (requires Device Group Permission:r)") + + args = parser.parse_args() + while args.url.endswith("/"): args.url = args.url[:-1] + + if args.command == "list": + res = list_strategies(args.url, args.token) + print(json.dumps(res, indent=2)) + + elif args.command == "view": + if args.guid: + res = get_strategy_by_guid(args.url, args.token, args.guid) + print(json.dumps(res, indent=2)) + elif args.name: + strategy = get_strategy_by_name(args.url, args.token, args.name) + if not strategy: + print(f"Error: Strategy '{args.name}' not found") + exit(1) + # Get full details by GUID + guid = strategy.get("guid") + res = get_strategy_by_guid(args.url, args.token, guid) + print(json.dumps(res, indent=2)) + else: + print("Error: --name or --guid is required for view command") + exit(1) + + elif args.command == "enable": + if not args.name: + print("Error: --name is required") + exit(1) + print(enable_strategy(args.url, args.token, args.name)) + + elif args.command == "disable": + if not args.name: + print("Error: --name is required") + exit(1) + print(disable_strategy(args.url, args.token, args.name)) + + elif args.command == "assign": + if not args.name: + print("Error: --name is required") + exit(1) + if not args.peers and not args.users and not args.device_groups: + print("Error: at least one of --peers, --users, or --device-groups is required") + exit(1) + + peers = [x.strip() for x in args.peers.split(",") if x.strip()] if args.peers else None + users = [x.strip() for x in args.users.split(",") if x.strip()] if args.users else None + device_groups = [x.strip() for x in args.device_groups.split(",") if x.strip()] if args.device_groups else None + + assign_strategy(args.url, args.token, args.name, peers=peers, users=users, device_groups=device_groups) + count = (len(peers) if peers else 0) + (len(users) if users else 0) + (len(device_groups) if device_groups else 0) + print(f"Success: Assigned strategy '{args.name}' to {count} target(s)") + + elif args.command == "unassign": + if not args.peers and not args.users and not args.device_groups: + print("Error: at least one of --peers, --users, or --device-groups is required") + exit(1) + + peers = [x.strip() for x in args.peers.split(",") if x.strip()] if args.peers else None + users = [x.strip() for x in args.users.split(",") if x.strip()] if args.users else None + device_groups = [x.strip() for x in args.device_groups.split(",") if x.strip()] if args.device_groups else None + + assign_strategy(args.url, args.token, None, peers=peers, users=users, device_groups=device_groups) + count = (len(peers) if peers else 0) + (len(users) if users else 0) + (len(device_groups) if device_groups else 0) + print(f"Success: Unassigned strategy from {count} target(s)") + + +if __name__ == "__main__": + main() diff --git a/res/user_group.py b/res/user_group.py new file mode 100755 index 000000000..909123e4e --- /dev/null +++ b/res/user_group.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 + +import requests +import argparse +import json + + +def check_response(response): + """ + Check API response and handle errors. + + Two error cases: + 1. Status code is not 200 -> exit with error + 2. Response contains {"error": "xxx"} -> exit with error + """ + if response.status_code != 200: + print(f"Error: HTTP {response.status_code}: {response.text}") + exit(1) + + # Check for {"error": "xxx"} in response + if response.text and response.text.strip(): + try: + json_data = response.json() + if isinstance(json_data, dict) and "error" in json_data: + print(f"Error: {json_data['error']}") + exit(1) + return json_data + except ValueError: + return response.text + + return None + + +def headers_with(token): + return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + +# ---------- User Group APIs ---------- + +def list_groups(url, token, name=None, page_size=50): + headers = headers_with(token) + params = {"pageSize": page_size} + if name: + params["name"] = name + data, current = [], 1 + while True: + params["current"] = current + r = requests.get(f"{url}/api/user-groups", headers=headers, params=params) + if r.status_code != 200: + print(f"Error: HTTP {r.status_code} - {r.text}") + exit(1) + res = r.json() + if "error" in res: + print(f"Error: {res['error']}") + exit(1) + rows = res.get("data", []) + data.extend(rows) + total = res.get("total", 0) + current += page_size + if len(rows) < page_size or current > total: + break + return data + + +def get_group_by_name(url, token, name): + groups = list_groups(url, token, name) + for g in groups: + if str(g.get("name")) == name: + return g + return None + + +def create_group(url, token, name, note=None, accessed_from=None, access_to=None): + headers = headers_with(token) + payload = {"name": name} + if note: + payload["note"] = note + if accessed_from: + payload["allowed_incomings"] = accessed_from + if access_to: + payload["allowed_outgoings"] = access_to + r = requests.post(f"{url}/api/user-groups", headers=headers, json=payload) + return check_response(r) + + +def update_group(url, token, name, new_name=None, note=None, accessed_from=None, access_to=None): + headers = headers_with(token) + g = get_group_by_name(url, token, name) + if not g: + print(f"Error: Group '{name}' not found") + exit(1) + guid = g.get("guid") + payload = {} + if new_name is not None: + payload["name"] = new_name + if note is not None: + payload["note"] = note + if accessed_from is not None: + payload["allowed_incomings"] = accessed_from + if access_to is not None: + payload["allowed_outgoings"] = access_to + r = requests.patch(f"{url}/api/user-groups/{guid}", headers=headers, json=payload) + check_response(r) + return "Success" + + +def delete_groups(url, token, names): + headers = headers_with(token) + if isinstance(names, str): + names = [names] + for n in names: + g = get_group_by_name(url, token, n) + if not g: + print(f"Error: Group '{n}' not found") + exit(1) + guid = g.get("guid") + r = requests.delete(f"{url}/api/user-groups/{guid}", headers=headers) + check_response(r) + return "Success" + + +# ---------- User management in group ---------- + +def view_users(url, token, group_name=None, name=None, page_size=50): + """View users in a user group with filters""" + headers = headers_with(token) + + # Separate exact match and fuzzy match params + params = {} + fuzzy_params = { + "name": name, + } + + # Add group_name without wildcard (exact match) + if group_name: + params["group_name"] = group_name + + # Add wildcard for fuzzy search to other params + for k, v in fuzzy_params.items(): + if v is not None: + params[k] = "%" + v + "%" if (v != "-" and "%" not in v) else v + + params["pageSize"] = page_size + + data, current = [], 1 + while True: + params["current"] = current + r = requests.get(f"{url}/api/users", headers=headers, params=params) + if r.status_code != 200: + return check_response(r) + res = r.json() + rows = res.get("data", []) + data.extend(rows) + total = res.get("total", 0) + current += page_size + if len(rows) < page_size or current > total: + break + return data + + +def add_users(url, token, group_name, user_names): + """Add users to a user group""" + headers = headers_with(token) + if isinstance(user_names, str): + user_names = [user_names] + + # Get the user group guid + g = get_group_by_name(url, token, group_name) + if not g: + print(f"Error: Group '{group_name}' not found") + exit(1) + guid = g.get("guid") + + # Get user GUIDs + user_guids = [] + errors = [] + + for user_name in user_names: + # Get user by exact name match + params = {"name": user_name, "pageSize": 50} + r = requests.get(f"{url}/api/users", headers=headers, params=params) + if r.status_code != 200: + errors.append(f"{user_name}: HTTP {r.status_code}") + continue + + users_data = r.json() + users_list = users_data.get("data", []) + user = None + for u in users_list: + if u.get("name") == user_name: + user = u + break + + if not user: + errors.append(f"{user_name}: User not found") + continue + + user_guids.append(user["guid"]) + + if not user_guids: + msg = "Error: No valid users found" + if errors: + msg += ". " + "; ".join(errors) + print(msg) + exit(1) + + # Add users to group using POST /api/user-groups/:guid + r = requests.post(f"{url}/api/user-groups/{guid}", headers=headers, json=user_guids) + check_response(r) + + success_msg = f"Success: Added {len(user_guids)} user(s) to group '{group_name}'" + if errors: + return success_msg + " (with errors: " + "; ".join(errors) + ")" + return success_msg + + +def parse_rules(s): + if not s: + return None + try: + v = json.loads(s) + if isinstance(v, list): + # expect list of {"type": number, "name": string} + return v + except Exception: + pass + return None + + +def main(): + parser = argparse.ArgumentParser(description="User Group manager") + parser.add_argument("command", choices=[ + "view", "add", "update", "delete", + "view-users", "add-users" + ], help=( + "Command to execute. " + "[view/add/update/delete/add-users: require User Group Permission] " + "[view-users: require User Permission]" + )) + parser.add_argument("--url", required=True) + parser.add_argument("--token", required=True) + + parser.add_argument("--name", help="User group name (exact match)") + parser.add_argument("--new-name", help="New user group name (for update)") + parser.add_argument("--note", help="Note") + + parser.add_argument("--accessed-from", help="JSON array: '[{\"type\":0|2,\"name\":\"...\"}]' (0=User Group, 2=User)") + parser.add_argument("--access-to", help="JSON array: '[{\"type\":0|1,\"name\":\"...\"}]' (0=User Group, 1=Device Group)") + + parser.add_argument("--users", help="Comma separated usernames for add-users") + + # Filters for view-users command + parser.add_argument("--user-name", help="User name filter (for view-users, supports fuzzy search)") + + args = parser.parse_args() + while args.url.endswith("/"): args.url = args.url[:-1] + + if args.command == "view": + res = list_groups(args.url, args.token, args.name) + print(json.dumps(res, indent=2)) + elif args.command == "add": + if not args.name: + print("Error: --name is required") + exit(1) + print(create_group( + args.url, args.token, args.name, args.note, + parse_rules(args.accessed_from), + parse_rules(args.access_to) + )) + elif args.command == "update": + if not args.name: + print("Error: --name is required") + exit(1) + print(update_group( + args.url, args.token, args.name, args.new_name, args.note, + parse_rules(args.accessed_from), + parse_rules(args.access_to) + )) + elif args.command == "delete": + if not args.name: + print("Error: --name is required (supports comma separated)") + exit(1) + names = [x.strip() for x in args.name.split(",") if x.strip()] + print(delete_groups(args.url, args.token, names)) + elif args.command == "view-users": + res = view_users( + args.url, + args.token, + group_name=args.name, + name=args.user_name + ) + print(json.dumps(res, indent=2)) + elif args.command == "add-users": + if not args.name or not args.users: + print("Error: --name and --users are required") + exit(1) + users = [x.strip() for x in args.users.split(",") if x.strip()] + print(add_users(args.url, args.token, args.name, users)) + + +if __name__ == "__main__": + main() diff --git a/res/users.py b/res/users.py index 54297f06a..86e562afd 100755 --- a/res/users.py +++ b/res/users.py @@ -5,6 +5,28 @@ import argparse from datetime import datetime, timedelta +def check_response(response): + """ + Check API response and handle errors properly. + Exit with code 1 if there's an error. + """ + if response.status_code != 200: + print(f"Error: HTTP {response.status_code}: {response.text}") + exit(1) + + if response.text and response.text.strip(): + try: + json_data = response.json() + if isinstance(json_data, dict) and "error" in json_data: + print(f"Error: {json_data['error']}") + exit(1) + return json_data + except ValueError: + return response.text + + return None + + def view( url, token, @@ -32,7 +54,14 @@ def view( while True: params["current"] = current response = requests.get(f"{url}/api/users", headers=headers, params=params) + if response.status_code != 200: + print(f"Error: HTTP {response.status_code} - {response.text}") + exit(1) + response_json = response.json() + if "error" in response_json: + print(f"Error: {response_json['error']}") + exit(1) data = response_json.get("data", []) users.extend(data) @@ -45,43 +74,122 @@ def view( return users -def check(response): - if response.status_code == 200: - try: - response_json = response.json() - return response_json - except ValueError: - return response.text or "Success" - else: - return "Failed", response.status_code, response.text - - def disable(url, token, guid, name): print("Disable", name) headers = {"Authorization": f"Bearer {token}"} response = requests.post(f"{url}/api/users/{guid}/disable", headers=headers) - return check(response) + check_response(response) def enable(url, token, guid, name): print("Enable", name) headers = {"Authorization": f"Bearer {token}"} response = requests.post(f"{url}/api/users/{guid}/enable", headers=headers) - return check(response) + check_response(response) -def delete(url, token, guid, name): +def delete_user(url, token, guid, name): print("Delete", name) headers = {"Authorization": f"Bearer {token}"} response = requests.delete(f"{url}/api/users/{guid}", headers=headers) - return check(response) + check_response(response) + + +def new_user(url, token, name, password, group_name=None, email=None, note=None): + """Create a new user""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "name": name, + "password": password, + } + if group_name: + payload["group_name"] = group_name + if email: + payload["email"] = email + if note: + payload["note"] = note + response = requests.post(f"{url}/api/users", headers=headers, json=payload) + check_response(response) + + +def invite_user(url, token, email, name, group_name=None, note=None): + """Invite a user by email""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "email": email, + "name": name, + } + if group_name: + payload["group_name"] = group_name + if note: + payload["note"] = note + response = requests.post(f"{url}/api/users/invite", headers=headers, json=payload) + check_response(response) + + +def enable_2fa_enforce(url, token, user_guids, base_url): + """Enable 2FA enforcement for users""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "user_guids": user_guids if isinstance(user_guids, list) else [user_guids], + "enforce": True, + "url": base_url + } + response = requests.put(f"{url}/api/users/tfa/totp/enforce", headers=headers, json=payload) + check_response(response) + + +def disable_2fa_enforce(url, token, user_guids, base_url=""): + """Disable 2FA enforcement for users""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "user_guids": user_guids if isinstance(user_guids, list) else [user_guids], + "enforce": False, + "url": base_url + } + response = requests.put(f"{url}/api/users/tfa/totp/enforce", headers=headers, json=payload) + check_response(response) + + +def disable_email_verification(url, token, user_guids): + """Disable email login verification for users""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "user_guids": user_guids if isinstance(user_guids, list) else [user_guids], + "type": "email" + } + response = requests.put(f"{url}/api/users/disable_login_verification", headers=headers, json=payload) + check_response(response) + + +def reset_2fa(url, token, user_guids): + """Reset 2FA for users""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "user_guids": user_guids if isinstance(user_guids, list) else [user_guids], + "type": "2fa" + } + response = requests.put(f"{url}/api/users/disable_login_verification", headers=headers, json=payload) + check_response(response) + + +def force_logout(url, token, user_guids): + """Force logout users""" + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + payload = { + "user_guids": user_guids if isinstance(user_guids, list) else [user_guids], + } + response = requests.post(f"{url}/api/users/force-logout", headers=headers, json=payload) + check_response(response) def main(): parser = argparse.ArgumentParser(description="User manager") parser.add_argument( "command", - choices=["view", "disable", "enable", "delete"], + choices=["view", "disable", "enable", "delete", "new", "invite", + "enable-2fa-enforce", "disable-2fa-enforce", + "disable-email-verification", "reset-2fa", "force-logout"], help="Command to execute", ) parser.add_argument("--url", required=True, help="URL of the API") @@ -89,12 +197,32 @@ def main(): "--token", required=True, help="Bearer token for authentication" ) parser.add_argument("--name", help="User name") - parser.add_argument("--group_name", help="Group name") + parser.add_argument("--group_name", help="Group name (for filtering in view, or for new/invite command)") + parser.add_argument("--password", help="User password (for new command)") + parser.add_argument("--email", help="User email (for invite command)") + parser.add_argument("--note", help="User note (for new/invite command)") + parser.add_argument("--web-console-url", help="Web console URL (for 2FA enforce commands)") args = parser.parse_args() while args.url.endswith("/"): args.url = args.url[:-1] + if args.command == "new": + if not args.name or not args.password or not args.group_name: + print("Error: --name and --password and --group_name are required for new command") + exit(1) + new_user(args.url, args.token, args.name, args.password, args.group_name, args.email, args.note) + print("Success: User created") + return + + if args.command == "invite": + if not args.email or not args.name or not args.group_name: + print("Error: --email and --name and --group_name are required for invite command") + exit(1) + invite_user(args.url, args.token, args.email, args.name, args.group_name, args.note) + print("Success: Invitation sent") + return + users = view( args.url, args.token, @@ -103,20 +231,61 @@ def main(): ) if args.command == "view": - for user in users: - print(user) - elif args.command == "disable": - for user in users: - response = disable(args.url, args.token, user["guid"], user["name"]) - print(response) - elif args.command == "enable": - for user in users: - response = enable(args.url, args.token, user["guid"], user["name"]) - print(response) - elif args.command == "delete": - for user in users: - response = delete(args.url, args.token, user["guid"], user["name"]) - print(response) + if len(users) == 0: + print("Found 0 users") + else: + for user in users: + print(user) + elif args.command in ["disable", "enable", "delete", "enable-2fa-enforce", + "disable-2fa-enforce", "disable-email-verification", "reset-2fa", "force-logout"]: + if len(users) == 0: + print("Found 0 users") + return + + # Check if we need user confirmation for multiple users + if len(users) > 1: + print(f"Found {len(users)} users. Do you want to proceed with {args.command} operation on the users? (Y/N)") + confirmation = input("Type 'Y' to confirm: ").strip() + if confirmation.upper() != 'Y': + print("Operation cancelled.") + return + + if args.command == "disable": + for user in users: + disable(args.url, args.token, user["guid"], user["name"]) + print("Success") + elif args.command == "enable": + for user in users: + enable(args.url, args.token, user["guid"], user["name"]) + print("Success") + elif args.command == "delete": + for user in users: + delete_user(args.url, args.token, user["guid"], user["name"]) + print("Success") + elif args.command == "enable-2fa-enforce": + if not args.web_console_url: + print("Error: --web-console-url is required for enable-2fa-enforce") + exit(1) + user_guids = [user["guid"] for user in users] + enable_2fa_enforce(args.url, args.token, user_guids, args.web_console_url) + print(f"Success: Enabled 2FA enforcement for {len(users)} user(s)") + elif args.command == "disable-2fa-enforce": + user_guids = [user["guid"] for user in users] + web_url = args.web_console_url or "" + disable_2fa_enforce(args.url, args.token, user_guids, web_url) + print(f"Success: Disabled 2FA enforcement for {len(users)} user(s)") + elif args.command == "disable-email-verification": + user_guids = [user["guid"] for user in users] + disable_email_verification(args.url, args.token, user_guids) + print(f"Success: Disabled email verification for {len(users)} user(s)") + elif args.command == "reset-2fa": + user_guids = [user["guid"] for user in users] + reset_2fa(args.url, args.token, user_guids) + print(f"Success: Reset 2FA for {len(users)} user(s)") + elif args.command == "force-logout": + user_guids = [user["guid"] for user in users] + force_logout(args.url, args.token, user_guids) + print(f"Success: Force logout for {len(users)} user(s)") if __name__ == "__main__":