diff --git a/src/pyhuntress/endpoints/siem/AgentsEndpoint.py b/src/pyhuntress/endpoints/siem/AgentsEndpoint.py index da7fcab..09ff0b4 100644 --- a/src/pyhuntress/endpoints/siem/AgentsEndpoint.py +++ b/src/pyhuntress/endpoints/siem/AgentsEndpoint.py @@ -3,7 +3,7 @@ from pyhuntress.interfaces import ( IGettable, IPaginateable, ) -from pyhuntress.models.siem import SIEMAgentsResponse, SIEMAgents +from pyhuntress.models.siem import SIEMAgents from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.types import ( JSON, @@ -18,8 +18,8 @@ class AgentsEndpoint( ): def __init__(self, client, parent_endpoint=None) -> None: HuntressEndpoint.__init__(self, client, "agents", parent_endpoint=parent_endpoint) - IGettable.__init__(self, SIEMAgentsResponse) - IPaginateable.__init__(self, SIEMAgentsResponse) + IGettable.__init__(self, SIEMAgents) + IPaginateable.__init__(self, SIEMAgents) def id(self, id: int) -> HuntressEndpoint: """ @@ -48,17 +48,18 @@ class AgentsEndpoint( limit (int): The number of results to return per page. params (dict[str, int | str]): The parameters to send in the request query string. Returns: - PaginatedResponse[SIEMAgentsResponse]: The initialized PaginatedResponse object. + PaginatedResponse[SIEMAgents]: The initialized PaginatedResponse object. """ if params: params["page"] = page - params["pageSize"] = limit + params["limit"] = limit else: - params = {"page": page, "pageSize": limit} + params = {"page": page, "limit": limit} return PaginatedResponse( super()._make_request("GET", params=params), SIEMAgents, self, + "agents", page, limit, params, diff --git a/src/pyhuntress/endpoints/siem/BillingreportsEndpoint.py b/src/pyhuntress/endpoints/siem/BillingreportsEndpoint.py index 0003d97..8e4a15b 100644 --- a/src/pyhuntress/endpoints/siem/BillingreportsEndpoint.py +++ b/src/pyhuntress/endpoints/siem/BillingreportsEndpoint.py @@ -1,8 +1,10 @@ from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.interfaces import ( IGettable, + IPaginateable, ) from pyhuntress.models.siem import SIEMBillingReports +from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.types import ( JSON, HuntressSIEMRequestParams, @@ -12,10 +14,56 @@ from pyhuntress.types import ( class BillingreportsEndpoint( HuntressEndpoint, IGettable[SIEMBillingReports, HuntressSIEMRequestParams], + IPaginateable[SIEMBillingReports, HuntressSIEMRequestParams], ): def __init__(self, client, parent_endpoint=None) -> None: HuntressEndpoint.__init__(self, client, "billing_reports", parent_endpoint=parent_endpoint) IGettable.__init__(self, SIEMBillingReports) + IPaginateable.__init__(self, SIEMBillingReports) + + def id(self, id: int) -> HuntressEndpoint: + """ + Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain. + + Parameters: + id (int): The ID to set. + Returns: + HuntressEndpoint: The initialized HuntressEndpoint object. + """ + child = HuntressEndpoint(self.client, parent_endpoint=self) + child._id = id + return child + + def paginated( + self, + page: int, + limit: int, + params: HuntressSIEMRequestParams | None = None, + ) -> PaginatedResponse[SIEMBillingReports]: + """ + Performs a GET request against the /billing_reports endpoint and returns an initialized PaginatedResponse object. + + Parameters: + page (int): The page number to request. + limit (int): The number of results to return per page. + params (dict[str, int | str]): The parameters to send in the request query string. + Returns: + PaginatedResponse[SIEMBillingReports]: The initialized PaginatedResponse object. + """ + if params: + params["page"] = page + params["limit"] = limit + else: + params = {"page": page, "limit": limit} + return PaginatedResponse( + super()._make_request("GET", params=params), + SIEMBillingReports, + self, + "billing_reports", + page, + limit, + params, + ) def get( self, @@ -23,7 +71,7 @@ class BillingreportsEndpoint( params: HuntressSIEMRequestParams | None = None, ) -> SIEMBillingReports: """ - Performs a GET request against the /Billing_reports endpoint. + Performs a GET request against the /billing_reports endpoint. Parameters: data (dict[str, Any]): The data to send in the request body. diff --git a/src/pyhuntress/endpoints/siem/IncidentreportsEndpoint.py b/src/pyhuntress/endpoints/siem/IncidentreportsEndpoint.py index 13cfb1b..7f2b735 100644 --- a/src/pyhuntress/endpoints/siem/IncidentreportsEndpoint.py +++ b/src/pyhuntress/endpoints/siem/IncidentreportsEndpoint.py @@ -1,8 +1,10 @@ from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.interfaces import ( IGettable, + IPaginateable, ) from pyhuntress.models.siem import SIEMIncidentReports +from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.types import ( JSON, HuntressSIEMRequestParams, @@ -12,10 +14,56 @@ from pyhuntress.types import ( class IncidentreportsEndpoint( HuntressEndpoint, IGettable[SIEMIncidentReports, HuntressSIEMRequestParams], + IPaginateable[SIEMIncidentReports, HuntressSIEMRequestParams], ): def __init__(self, client, parent_endpoint=None) -> None: HuntressEndpoint.__init__(self, client, "incident_reports", parent_endpoint=parent_endpoint) IGettable.__init__(self, SIEMIncidentReports) + IPaginateable.__init__(self, SIEMIncidentReports) + + def id(self, id: int) -> HuntressEndpoint: + """ + Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain. + + Parameters: + id (int): The ID to set. + Returns: + HuntressEndpoint: The initialized HuntressEndpoint object. + """ + child = HuntressEndpoint(self.client, parent_endpoint=self) + child._id = id + return child + + def paginated( + self, + page: int, + limit: int, + params: HuntressSIEMRequestParams | None = None, + ) -> PaginatedResponse[SIEMIncidentReports]: + """ + Performs a GET request against the /incident_reports endpoint and returns an initialized PaginatedResponse object. + + Parameters: + page (int): The page number to request. + limit (int): The number of results to return per page. + params (dict[str, int | str]): The parameters to send in the request query string. + Returns: + PaginatedResponse[SIEMIncidentReports]: The initialized PaginatedResponse object. + """ + if params: + params["page"] = page + params["limit"] = limit + else: + params = {"page": page, "limit": limit} + return PaginatedResponse( + super()._make_request("GET", params=params), + SIEMIncidentReports, + self, + "incident_reports", + page, + limit, + params, + ) def get( self, @@ -23,7 +71,7 @@ class IncidentreportsEndpoint( params: HuntressSIEMRequestParams | None = None, ) -> SIEMIncidentReports: """ - Performs a GET request against the /Incident_reports endpoint. + Performs a GET request against the /incident_reports endpoint. Parameters: data (dict[str, Any]): The data to send in the request body. diff --git a/src/pyhuntress/endpoints/siem/OrganizationsEndpoint.py b/src/pyhuntress/endpoints/siem/OrganizationsEndpoint.py index 6eb75e6..a452bf8 100644 --- a/src/pyhuntress/endpoints/siem/OrganizationsEndpoint.py +++ b/src/pyhuntress/endpoints/siem/OrganizationsEndpoint.py @@ -1,8 +1,10 @@ from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.interfaces import ( IGettable, + IPaginateable, ) from pyhuntress.models.siem import SIEMOrganizations +from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.types import ( JSON, HuntressSIEMRequestParams, @@ -12,10 +14,56 @@ from pyhuntress.types import ( class OrganizationsEndpoint( HuntressEndpoint, IGettable[SIEMOrganizations, HuntressSIEMRequestParams], + IPaginateable[SIEMOrganizations, HuntressSIEMRequestParams], ): def __init__(self, client, parent_endpoint=None) -> None: HuntressEndpoint.__init__(self, client, "organizations", parent_endpoint=parent_endpoint) IGettable.__init__(self, SIEMOrganizations) + IPaginateable.__init__(self, SIEMOrganizations) + + def id(self, id: int) -> HuntressEndpoint: + """ + Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain. + + Parameters: + id (int): The ID to set. + Returns: + HuntressEndpoint: The initialized HuntressEndpoint object. + """ + child = HuntressEndpoint(self.client, parent_endpoint=self) + child._id = id + return child + + def paginated( + self, + page: int, + limit: int, + params: HuntressSIEMRequestParams | None = None, + ) -> PaginatedResponse[SIEMOrganizations]: + """ + Performs a GET request against the /organizations endpoint and returns an initialized PaginatedResponse object. + + Parameters: + page (int): The page number to request. + limit (int): The number of results to return per page. + params (dict[str, int | str]): The parameters to send in the request query string. + Returns: + PaginatedResponse[SIEMOrganizations]: The initialized PaginatedResponse object. + """ + if params: + params["page"] = page + params["limit"] = limit + else: + params = {"page": page, "limit": limit} + return PaginatedResponse( + super()._make_request("GET", params=params), + SIEMOrganizations, + self, + "organizations", + page, + limit, + params, + ) def get( self, diff --git a/src/pyhuntress/endpoints/siem/ReportsEndpoint.py b/src/pyhuntress/endpoints/siem/ReportsEndpoint.py index 350d2b0..95dc5b6 100644 --- a/src/pyhuntress/endpoints/siem/ReportsEndpoint.py +++ b/src/pyhuntress/endpoints/siem/ReportsEndpoint.py @@ -1,8 +1,10 @@ from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.interfaces import ( IGettable, + IPaginateable, ) from pyhuntress.models.siem import SIEMReports +from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.types import ( JSON, HuntressSIEMRequestParams, @@ -12,10 +14,56 @@ from pyhuntress.types import ( class ReportsEndpoint( HuntressEndpoint, IGettable[SIEMReports, HuntressSIEMRequestParams], + IPaginateable[SIEMReports, HuntressSIEMRequestParams], ): def __init__(self, client, parent_endpoint=None) -> None: HuntressEndpoint.__init__(self, client, "reports", parent_endpoint=parent_endpoint) IGettable.__init__(self, SIEMReports) + IPaginateable.__init__(self, SIEMReports) + + def id(self, id: int) -> HuntressEndpoint: + """ + Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain. + + Parameters: + id (int): The ID to set. + Returns: + HuntressEndpoint: The initialized HuntressEndpoint object. + """ + child = HuntressEndpoint(self.client, parent_endpoint=self) + child._id = id + return child + + def paginated( + self, + page: int, + limit: int, + params: HuntressSIEMRequestParams | None = None, + ) -> PaginatedResponse[SIEMReports]: + """ + Performs a GET request against the /reports endpoint and returns an initialized PaginatedResponse object. + + Parameters: + page (int): The page number to request. + limit (int): The number of results to return per page. + params (dict[str, int | str]): The parameters to send in the request query string. + Returns: + PaginatedResponse[SIEMReports]: The initialized PaginatedResponse object. + """ + if params: + params["page"] = page + params["limit"] = limit + else: + params = {"page": page, "limit": limit} + return PaginatedResponse( + super()._make_request("GET", params=params), + SIEMReports, + self, + "reports", + page, + limit, + params, + ) def get( self, @@ -23,7 +71,7 @@ class ReportsEndpoint( params: HuntressSIEMRequestParams | None = None, ) -> SIEMReports: """ - Performs a GET request against the /Reports endpoint. + Performs a GET request against the /reports endpoint. Parameters: data (dict[str, Any]): The data to send in the request body. diff --git a/src/pyhuntress/endpoints/siem/SignalsEndpoint.py b/src/pyhuntress/endpoints/siem/SignalsEndpoint.py index dfc6b71..fdd4876 100644 --- a/src/pyhuntress/endpoints/siem/SignalsEndpoint.py +++ b/src/pyhuntress/endpoints/siem/SignalsEndpoint.py @@ -1,8 +1,10 @@ from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.interfaces import ( IGettable, + IPaginateable, ) from pyhuntress.models.siem import SIEMSignals +from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.types import ( JSON, HuntressSIEMRequestParams, @@ -12,10 +14,56 @@ from pyhuntress.types import ( class SignalsEndpoint( HuntressEndpoint, IGettable[SIEMSignals, HuntressSIEMRequestParams], + IPaginateable[SIEMSignals, HuntressSIEMRequestParams], ): def __init__(self, client, parent_endpoint=None) -> None: HuntressEndpoint.__init__(self, client, "signals", parent_endpoint=parent_endpoint) IGettable.__init__(self, SIEMSignals) + IPaginateable.__init__(self, SIEMSignals) + + def id(self, id: int) -> HuntressEndpoint: + """ + Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain. + + Parameters: + id (int): The ID to set. + Returns: + HuntressEndpoint: The initialized HuntressEndpoint object. + """ + child = HuntressEndpoint(self.client, parent_endpoint=self) + child._id = id + return child + + def paginated( + self, + page: int, + limit: int, + params: HuntressSIEMRequestParams | None = None, + ) -> PaginatedResponse[SIEMSignals]: + """ + Performs a GET request against the /signals endpoint and returns an initialized PaginatedResponse object. + + Parameters: + page (int): The page number to request. + limit (int): The number of results to return per page. + params (dict[str, int | str]): The parameters to send in the request query string. + Returns: + PaginatedResponse[SIEMSignals]: The initialized PaginatedResponse object. + """ + if params: + params["page"] = page + params["limit"] = limit + else: + params = {"page": page, "limit": limit} + return PaginatedResponse( + super()._make_request("GET", params=params), + SIEMSignals, + self, + "signals", + page, + limit, + params, + ) def get( self, @@ -23,7 +71,7 @@ class SignalsEndpoint( params: HuntressSIEMRequestParams | None = None, ) -> SIEMSignals: """ - Performs a GET request against the /Signals endpoint. + Performs a GET request against the /signals endpoint. Parameters: data (dict[str, Any]): The data to send in the request body. diff --git a/src/pyhuntress/responses/paginated_response.py b/src/pyhuntress/responses/paginated_response.py index d6a3e01..0f529c9 100644 --- a/src/pyhuntress/responses/paginated_response.py +++ b/src/pyhuntress/responses/paginated_response.py @@ -1,8 +1,9 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING, Generic, TypeVar -from pyhuntress.utils.helpers import parse_link_headers +from pyhuntress.utils.helpers import parse_link_headers, parse_response_body if TYPE_CHECKING: from collections.abc import Iterable @@ -38,9 +39,10 @@ class PaginatedResponse(Generic[TModel]): self, response: Response, response_model: type[TModel], - endpoint: IPaginateable, + endpointmodel: IPaginateable, + endpoint: str, page: int, - page_size: int, + limit: int, params: RequestParams | None = None, ) -> None: """ @@ -56,40 +58,44 @@ class PaginatedResponse(Generic[TModel]): expected model type for the response data. This allows for type-safe handling of model instances throughout the class. """ - self._initialize(response, response_model, endpoint, page, page_size, params) + self._initialize(response, response_model, endpointmodel, endpoint, page, limit, params) - def _initialize( # noqa: ANN202 + def _initialize( self, response: Response, response_model: type[TModel], - endpoint: IPaginateable, + endpointmodel: IPaginateable, + endpoint: str, page: int, - page_size: int, + limit: int, params: RequestParams | None = None, ): """ - Initialize the instance variables using the provided response, endpoint, and page size. + Initialize the instance variables using the provided response, endpointmodel, and page size. Args: response: The raw response object from the API. - endpoint (HuntressEndpoint[TModel]): The endpoint associated with the response. - page_size (int): The number of items per page. + endpointmodel (HuntressEndpoint[TModel]): The endpointmodel associated with the response. + endpoint: The endpoint url to extract the data + limit (int): The number of items per page. """ self.response = response self.response_model = response_model + self.endpointmodel = endpointmodel self.endpoint = endpoint - self.page_size = page_size + self.limit = limit + print(self.endpoint) # The following for SIEM is in the response body, not the headers - self.parsed_pagination_response = None #parse_link_headers(response.headers) + self.parsed_pagination_response = parse_response_body(json.loads(response.content.decode('utf-8')).get('pagination', {})) self.params = params if self.parsed_pagination_response is not None: # Huntress SIEM API gives us a handy response to parse for Pagination - self.has_next_page: bool = self.parsed_link_headers.get("has_next_page", False) - self.has_prev_page: bool = self.parsed_link_headers.get("has_prev_page", False) - self.first_page: int = self.parsed_link_headers.get("first_page", None) - self.prev_page: int = self.parsed_link_headers.get("prev_page", None) - self.next_page: int = self.parsed_link_headers.get("next_page", None) - self.last_page: int = self.parsed_link_headers.get("last_page", None) + self.has_next_page: bool = self.parsed_pagination_response.get("has_next_page", False) + self.has_prev_page: bool = self.parsed_pagination_response.get("has_prev_page", False) + self.first_page: int = self.parsed_pagination_response.get("first_page", None) + self.prev_page: int = self.parsed_pagination_response.get("prev_page", None) + self.next_page: int = self.parsed_pagination_response.get("next_page", None) + self.last_page: int = self.parsed_pagination_response.get("last_page", None) else: # Huntress Managed SAT might, haven't worked on this yet self.has_next_page: bool = True @@ -98,7 +104,7 @@ class PaginatedResponse(Generic[TModel]): self.prev_page = page - 1 if page > 1 else 1 self.next_page = page + 1 self.last_page = 999999 - self.data: list[TModel] = [response_model.model_validate(d) for d in response.json()] + self.data: list[TModel] = [response_model.model_validate(d) for d in response.json().get(endpoint, {})] self.has_data = self.data and len(self.data) > 0 self.index = 0 @@ -114,13 +120,13 @@ class PaginatedResponse(Generic[TModel]): self.has_data = False return self - next_response = self.endpoint.paginated(self.next_page, self.page_size, self.params) + next_response = self.endpointmodel.paginated(self.next_page, self.limit, self.params) self._initialize( next_response.response, next_response.response_model, - next_response.endpoint, + next_response.endpointmodel, self.next_page, - next_response.page_size, + next_response.limit, self.params, ) return self @@ -137,18 +143,18 @@ class PaginatedResponse(Generic[TModel]): self.has_data = False return self - prev_response = self.endpoint.paginated(self.prev_page, self.page_size, self.params) + prev_response = self.endpointmodel.paginated(self.prev_page, self.limit, self.params) self._initialize( prev_response.response, prev_response.response_model, - prev_response.endpoint, + prev_response.endpointmodel, self.prev_page, - prev_response.page_size, + prev_response.limit, self.params, ) return self - def all(self) -> Iterable[TModel]: # noqa: A003 + def all(self) -> Iterable[TModel]: """ Iterate through all items in the paginated response, across all pages. @@ -159,7 +165,7 @@ class PaginatedResponse(Generic[TModel]): yield from self.data self.get_next_page() - def __iter__(self): # noqa: ANN204 + def __iter__(self): """ Implement the iterator protocol for the PaginatedResponse class. @@ -168,7 +174,7 @@ class PaginatedResponse(Generic[TModel]): """ return self - def __dict__(self): # noqa: ANN204 + def __dict__(self): """ Implement the iterator protocol for the PaginatedResponse class. @@ -177,7 +183,7 @@ class PaginatedResponse(Generic[TModel]): """ return self.data - def __next__(self): # noqa: ANN204 + def __next__(self): """ Implement the iterator protocol by getting the next item in the data. @@ -191,5 +197,5 @@ class PaginatedResponse(Generic[TModel]): result = self.data[self.index] self.index += 1 return result - else: # noqa: RET505 + else: raise StopIteration diff --git a/src/pyhuntress/utils/helpers.py b/src/pyhuntress/utils/helpers.py index 2c49a49..827a8e0 100644 --- a/src/pyhuntress/utils/helpers.py +++ b/src/pyhuntress/utils/helpers.py @@ -1,4 +1,5 @@ import re +import math from datetime import datetime from typing import Any @@ -23,8 +24,84 @@ def cw_format_datetime(dt: datetime) -> str: """ return dt.strftime("%Y-%m-%dT%H:%M:%SZ") +def parse_response_body( + body: CaseInsensitiveDict, +) -> dict[str, Any] | None: + """ + Parses response body to extract pagination information. -def parse_link_headers( # noqa: C901 + Arguments: + - body: content.json().get('pagination', {}) A dictionary containing the headers of an HTTP response. + + Returns: + - A dictionary containing the extracted pagination information. The keys in the dictionary include: + - "first_page": An optional integer representing the number of the first page. + - "prev_page": An optional integer representing the number of the previous page. + - "next_page": An optional integer representing the number of the next page. + - "last_page": An optional integer representing the number of the last page. + - "has_next_page": A boolean indicating whether there is a next page. + - "has_prev_page": A boolean indicating whether there is a previous page. + + If the "Link" header is not present in the headers dictionary, None is returned. + + Example Usage: + headers = { + "Link": '; rel="first", ; rel="next"' + } + pagination_info = parse_link_headers(headers) + print(pagination_info) + # Output: {'first_page': 1, 'next_page': 2, 'has_next_page': True} + """ + if body.get("current_page") is None: + return None + has_next_page: bool = False + has_prev_page: bool = False + first_page: int | None = None + prev_page: int | None = None + current_page: int | None = None + current_page_count: int | None = None + limit: int | None = None + total_count: int | None = None + next_page: int | None = None + next_page_url: str | None = None + next_page_token: str | None = None + last_page: int | None = None + + result = {} + + if first_page is not None: + result["first_page"] = first_page + + if prev_page is not None: + result["prev_page"] = prev_page + elif current_page is not None: + if current_page > 1: + result["prev_page"] = current_page - 1 + + if next_page is not None: + result["next_page"] = next_page + + if last_page is not None: + result["last_page"] = last_page + elif last_page is None and current_page is not None: + result["last_page"] = math.ceil(total_count/limit) + + if has_next_page: + result["has_next_page"] = has_next_page + elif current_page is not None and next_page is not None: + result["has_next_page"] = True + elif current_page is not None and next_page is None: + result["has_next_page"] = False + + if has_prev_page: + result["has_prev_page"] = has_prev_page + elif current_page is not None: + if current_page > 1: + result["has_prev_page"] = True + + return result + +def parse_link_headers( headers: CaseInsensitiveDict, ) -> dict[str, Any] | None: """ diff --git a/src/scratchpad.py b/src/scratchpad.py index 6745054..8fd793d 100644 --- a/src/scratchpad.py +++ b/src/scratchpad.py @@ -39,5 +39,17 @@ siem_api_client = HuntressSIEMAPIClient( #signals = siem_api_client.signals.get() #print(signals) -#paginated_agents = siem_api_client.agents.paginated(1, 10) -#print(paginated_agents) \ No newline at end of file +paginated_billingreports = siem_api_client.billing_reports.paginated(1, 10) +print(paginated_billingreports.data) + +paginated_incidentreports = siem_api_client.incident_reports.paginated(1, 10) +print(paginated_incidentreports.data) + +paginated_organizations = siem_api_client.organizations.paginated(1, 10) +print(paginated_organizations.data) + +paginated_reports = siem_api_client.reports.paginated(1, 10) +print(paginated_reports.data) + +paginated_signals = siem_api_client.signals.paginated(1, 10) +print(paginated_signals.data)