pyhuntress/src/pyhuntress/clients/managedsat_client.py
2025-07-24 18:26:17 -05:00

172 lines
6.5 KiB
Python

import typing
from datetime import datetime, timedelta, timezone
from pyhuntress.clients.huntress_client import HuntressClient
from pyhuntress.config import Config
if typing.TYPE_CHECKING:
from pyhuntress.endpoints.managedsat.AccountsEndpoint import AccountsEndpoint
from pyhuntress.endpoints.managedsat.UsersEndpoint import UsersEndpoint
from pyhuntress.endpoints.managedsat.AssignmentsEndpoint import AssignmentsEndpoint
from pyhuntress.endpoints.managedsat.EpisodesEndpoint import EpisodesEndpoint
from pyhuntress.endpoints.managedsat.DepartmentsEndpoint import DepartmentsEndpoint
from pyhuntress.endpoints.managedsat.GroupsEndpoint import GroupsEndpoint
from pyhuntress.endpoints.managedsat.LearnersEndpoint import LearnersEndpoint
from pyhuntress.endpoints.managedsat.PhishingCampaignsEndpoint import PhishingCampaignsEndpoint
from pyhuntress.endpoints.managedsat.PhishingCampaignScenariosEndpoint import PhishingCampaignScenariosEndpoint
from pyhuntress.endpoints.managedsat.PhishingAttemptsEndpoint import PhishingAttemptsActionsEndpoint
from pyhuntress.endpoints.managedsat.PhishingScenariosEndpoint import PhishingScenariosEndpoint
class ManagedSATCodebaseError(Exception):
def __init__(self) -> None:
super().__init__("Could not retrieve codebase from API.")
class HuntressSATAPIClient(HuntressClient):
"""
Huntress Managed SAT API client. Handles the connection to the Huntress Managed SAT API
and the configuration of all the available endpoints.
"""
def __init__(
self,
client_id: str,
client_secret: str,
api_scopes: str,
) -> None:
"""
Initializes the client with the given credentials.
Parameters:
client_id (str): URL of the Huntress Managed SAT client id.
client_secret (str): Your Huntress Managed SAT API client secret.
api_scopes (str): Your Huntress Managed SAT API scope.
"""
self.client_id: str = client_id
self.client_secret: str = client_secret
self.api_scopes: str = api_scopes
self.token_expiry_time: datetime = datetime.now(tz=timezone.utc)
# Grab first access token
self.access_token: str = self._get_access_token()
# Initializing endpoints
@property
def accounts(self) -> "AccountsEndpoint":
from pyhuntress.endpoints.managedsat.AccountsEndpoint import AccountsEndpoint
return AccountsEndpoint(self)
@property
def users(self) -> "UsersEndpoint":
from pyhuntress.endpoints.managedsat.UsersEndpoint import UsersEndpoint
return UsersEndpoint(self)
@property
def assignments(self) -> "AssignmentsEndpoint":
from pyhuntress.endpoints.managedsat.AssignmentsEndpoint import AssignmentsEndpoint
return AssignmentsEndpoint(self)
@property
def episodes(self) -> "EpisodesEndpoint":
from pyhuntress.endpoints.managedsat.EpisodesEndpoint import EpisodesEndpoint
return EpisodesEndpoint(self)
@property
def departments(self) -> "DepartmentsEndpoint":
from pyhuntress.endpoints.managedsat.DepartmentsEndpoint import DepartmentsEndpoint
return DepartmentsEndpoint(self)
@property
def groups(self) -> "GroupsEndpoint":
from pyhuntress.endpoints.managedsat.GroupsEndpoint import GroupsEndpoint
return GroupsEndpoint(self)
@property
def learners(self) -> "LearnersEndpoint":
from pyhuntress.endpoints.managedsat.LearnersEndpoint import LearnersEndpoint
return LearnersEndpoint(self)
@property
def phishing_campaigns(self) -> "PhishingCampaignsEndpoint":
from pyhuntress.endpoints.managedsat.PhishingCampaignsEndpoint import PhishingCampaignsEndpoint
return PhishingCampaignsEndpoint(self)
@property
def phishing_campaign_scenarios(self) -> "PhishingCampaignScenariosEndpoint":
from pyhuntress.endpoints.managedsat.PhishingCampaignScenariosEndpoint import PhishingCampaignScenariosEndpoint
return PhishingCampaignScenariosEndpoint(self)
@property
def phishing_attempts(self) -> "PhishingAttemptsActionsEndpoint":
from pyhuntress.endpoints.managedsat.PhishingAttemptsActionsEndpoint import PhishingAttemptsActionsEndpoint
return PhishingAttemptsActionsEndpoint(self)
@property
def phishing_scenarios(self) -> "PhishingScenariosEndpoint":
from pyhuntress.endpoints.managedsat.PhishingScenariosEndpoint import PhishingScenariosEndpoint
return PhishingScenariosEndpoint(self)
def _get_url(self) -> str:
"""
Generates and returns the URL for the Huntress Managed SAT API endpoints based on the company url and codebase.
This only still exists incase Huntress eventually moves to some client specific URL format
Returns:
str: API URL.
"""
return f"https://mycurricula.com/api/v1"
def _get_access_token(self) -> str:
"""
Performs a request to the ConnectWise Automate API to obtain an access token.
"""
token_url = self._get_url().replace("api/v1", "oauth/token") #strip the API endpoints to use the oauth token url
auth_response = self._make_request(
"POST",
token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.api_scopes
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json"
},
)
auth_resp_json = auth_response.json()
token = auth_resp_json["access_token"]
expires_in_sec = auth_resp_json["expires_in"]
self.token_expiry_time = datetime.now(tz=timezone.utc) + timedelta(seconds=expires_in_sec)
return token
def _refresh_access_token_if_necessary(self):
if datetime.now(tz=timezone.utc) > self.token_expiry_time:
self.access_token = self._get_access_token()
def _get_headers(self) -> dict[str, str]:
"""
Generates and returns the headers required for making API requests.
Returns:
dict[str, str]: Dictionary of headers including Content-Type, Client ID, and Authorization.
"""
self._refresh_access_token_if_necessary()
return {
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
"Authorization": f"Bearer {self.access_token}",
}