Added pagination for SIEM

This commit is contained in:
Peter Annabel 2025-07-23 15:10:41 -05:00
parent 06a73785bb
commit ca5d0b40c7
9 changed files with 379 additions and 43 deletions

View File

@ -3,7 +3,7 @@ from pyhuntress.interfaces import (
IGettable, IGettable,
IPaginateable, IPaginateable,
) )
from pyhuntress.models.siem import SIEMAgentsResponse, SIEMAgents from pyhuntress.models.siem import SIEMAgents
from pyhuntress.responses.paginated_response import PaginatedResponse from pyhuntress.responses.paginated_response import PaginatedResponse
from pyhuntress.types import ( from pyhuntress.types import (
JSON, JSON,
@ -18,8 +18,8 @@ class AgentsEndpoint(
): ):
def __init__(self, client, parent_endpoint=None) -> None: def __init__(self, client, parent_endpoint=None) -> None:
HuntressEndpoint.__init__(self, client, "agents", parent_endpoint=parent_endpoint) HuntressEndpoint.__init__(self, client, "agents", parent_endpoint=parent_endpoint)
IGettable.__init__(self, SIEMAgentsResponse) IGettable.__init__(self, SIEMAgents)
IPaginateable.__init__(self, SIEMAgentsResponse) IPaginateable.__init__(self, SIEMAgents)
def id(self, id: int) -> HuntressEndpoint: def id(self, id: int) -> HuntressEndpoint:
""" """
@ -48,17 +48,18 @@ class AgentsEndpoint(
limit (int): The number of results to return per page. limit (int): The number of results to return per page.
params (dict[str, int | str]): The parameters to send in the request query string. params (dict[str, int | str]): The parameters to send in the request query string.
Returns: Returns:
PaginatedResponse[SIEMAgentsResponse]: The initialized PaginatedResponse object. PaginatedResponse[SIEMAgents]: The initialized PaginatedResponse object.
""" """
if params: if params:
params["page"] = page params["page"] = page
params["pageSize"] = limit params["limit"] = limit
else: else:
params = {"page": page, "pageSize": limit} params = {"page": page, "limit": limit}
return PaginatedResponse( return PaginatedResponse(
super()._make_request("GET", params=params), super()._make_request("GET", params=params),
SIEMAgents, SIEMAgents,
self, self,
"agents",
page, page,
limit, limit,
params, params,

View File

@ -1,8 +1,10 @@
from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint
from pyhuntress.interfaces import ( from pyhuntress.interfaces import (
IGettable, IGettable,
IPaginateable,
) )
from pyhuntress.models.siem import SIEMBillingReports from pyhuntress.models.siem import SIEMBillingReports
from pyhuntress.responses.paginated_response import PaginatedResponse
from pyhuntress.types import ( from pyhuntress.types import (
JSON, JSON,
HuntressSIEMRequestParams, HuntressSIEMRequestParams,
@ -12,10 +14,56 @@ from pyhuntress.types import (
class BillingreportsEndpoint( class BillingreportsEndpoint(
HuntressEndpoint, HuntressEndpoint,
IGettable[SIEMBillingReports, HuntressSIEMRequestParams], IGettable[SIEMBillingReports, HuntressSIEMRequestParams],
IPaginateable[SIEMBillingReports, HuntressSIEMRequestParams],
): ):
def __init__(self, client, parent_endpoint=None) -> None: def __init__(self, client, parent_endpoint=None) -> None:
HuntressEndpoint.__init__(self, client, "billing_reports", parent_endpoint=parent_endpoint) HuntressEndpoint.__init__(self, client, "billing_reports", parent_endpoint=parent_endpoint)
IGettable.__init__(self, SIEMBillingReports) IGettable.__init__(self, SIEMBillingReports)
IPaginateable.__init__(self, SIEMBillingReports)
def id(self, id: int) -> HuntressEndpoint:
"""
Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain.
Parameters:
id (int): The ID to set.
Returns:
HuntressEndpoint: The initialized HuntressEndpoint object.
"""
child = HuntressEndpoint(self.client, parent_endpoint=self)
child._id = id
return child
def paginated(
self,
page: int,
limit: int,
params: HuntressSIEMRequestParams | None = None,
) -> PaginatedResponse[SIEMBillingReports]:
"""
Performs a GET request against the /billing_reports endpoint and returns an initialized PaginatedResponse object.
Parameters:
page (int): The page number to request.
limit (int): The number of results to return per page.
params (dict[str, int | str]): The parameters to send in the request query string.
Returns:
PaginatedResponse[SIEMBillingReports]: The initialized PaginatedResponse object.
"""
if params:
params["page"] = page
params["limit"] = limit
else:
params = {"page": page, "limit": limit}
return PaginatedResponse(
super()._make_request("GET", params=params),
SIEMBillingReports,
self,
"billing_reports",
page,
limit,
params,
)
def get( def get(
self, self,
@ -23,7 +71,7 @@ class BillingreportsEndpoint(
params: HuntressSIEMRequestParams | None = None, params: HuntressSIEMRequestParams | None = None,
) -> SIEMBillingReports: ) -> SIEMBillingReports:
""" """
Performs a GET request against the /Billing_reports endpoint. Performs a GET request against the /billing_reports endpoint.
Parameters: Parameters:
data (dict[str, Any]): The data to send in the request body. data (dict[str, Any]): The data to send in the request body.

View File

@ -1,8 +1,10 @@
from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint
from pyhuntress.interfaces import ( from pyhuntress.interfaces import (
IGettable, IGettable,
IPaginateable,
) )
from pyhuntress.models.siem import SIEMIncidentReports from pyhuntress.models.siem import SIEMIncidentReports
from pyhuntress.responses.paginated_response import PaginatedResponse
from pyhuntress.types import ( from pyhuntress.types import (
JSON, JSON,
HuntressSIEMRequestParams, HuntressSIEMRequestParams,
@ -12,10 +14,56 @@ from pyhuntress.types import (
class IncidentreportsEndpoint( class IncidentreportsEndpoint(
HuntressEndpoint, HuntressEndpoint,
IGettable[SIEMIncidentReports, HuntressSIEMRequestParams], IGettable[SIEMIncidentReports, HuntressSIEMRequestParams],
IPaginateable[SIEMIncidentReports, HuntressSIEMRequestParams],
): ):
def __init__(self, client, parent_endpoint=None) -> None: def __init__(self, client, parent_endpoint=None) -> None:
HuntressEndpoint.__init__(self, client, "incident_reports", parent_endpoint=parent_endpoint) HuntressEndpoint.__init__(self, client, "incident_reports", parent_endpoint=parent_endpoint)
IGettable.__init__(self, SIEMIncidentReports) IGettable.__init__(self, SIEMIncidentReports)
IPaginateable.__init__(self, SIEMIncidentReports)
def id(self, id: int) -> HuntressEndpoint:
"""
Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain.
Parameters:
id (int): The ID to set.
Returns:
HuntressEndpoint: The initialized HuntressEndpoint object.
"""
child = HuntressEndpoint(self.client, parent_endpoint=self)
child._id = id
return child
def paginated(
self,
page: int,
limit: int,
params: HuntressSIEMRequestParams | None = None,
) -> PaginatedResponse[SIEMIncidentReports]:
"""
Performs a GET request against the /incident_reports endpoint and returns an initialized PaginatedResponse object.
Parameters:
page (int): The page number to request.
limit (int): The number of results to return per page.
params (dict[str, int | str]): The parameters to send in the request query string.
Returns:
PaginatedResponse[SIEMIncidentReports]: The initialized PaginatedResponse object.
"""
if params:
params["page"] = page
params["limit"] = limit
else:
params = {"page": page, "limit": limit}
return PaginatedResponse(
super()._make_request("GET", params=params),
SIEMIncidentReports,
self,
"incident_reports",
page,
limit,
params,
)
def get( def get(
self, self,
@ -23,7 +71,7 @@ class IncidentreportsEndpoint(
params: HuntressSIEMRequestParams | None = None, params: HuntressSIEMRequestParams | None = None,
) -> SIEMIncidentReports: ) -> SIEMIncidentReports:
""" """
Performs a GET request against the /Incident_reports endpoint. Performs a GET request against the /incident_reports endpoint.
Parameters: Parameters:
data (dict[str, Any]): The data to send in the request body. data (dict[str, Any]): The data to send in the request body.

View File

@ -1,8 +1,10 @@
from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint
from pyhuntress.interfaces import ( from pyhuntress.interfaces import (
IGettable, IGettable,
IPaginateable,
) )
from pyhuntress.models.siem import SIEMOrganizations from pyhuntress.models.siem import SIEMOrganizations
from pyhuntress.responses.paginated_response import PaginatedResponse
from pyhuntress.types import ( from pyhuntress.types import (
JSON, JSON,
HuntressSIEMRequestParams, HuntressSIEMRequestParams,
@ -12,10 +14,56 @@ from pyhuntress.types import (
class OrganizationsEndpoint( class OrganizationsEndpoint(
HuntressEndpoint, HuntressEndpoint,
IGettable[SIEMOrganizations, HuntressSIEMRequestParams], IGettable[SIEMOrganizations, HuntressSIEMRequestParams],
IPaginateable[SIEMOrganizations, HuntressSIEMRequestParams],
): ):
def __init__(self, client, parent_endpoint=None) -> None: def __init__(self, client, parent_endpoint=None) -> None:
HuntressEndpoint.__init__(self, client, "organizations", parent_endpoint=parent_endpoint) HuntressEndpoint.__init__(self, client, "organizations", parent_endpoint=parent_endpoint)
IGettable.__init__(self, SIEMOrganizations) IGettable.__init__(self, SIEMOrganizations)
IPaginateable.__init__(self, SIEMOrganizations)
def id(self, id: int) -> HuntressEndpoint:
"""
Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain.
Parameters:
id (int): The ID to set.
Returns:
HuntressEndpoint: The initialized HuntressEndpoint object.
"""
child = HuntressEndpoint(self.client, parent_endpoint=self)
child._id = id
return child
def paginated(
self,
page: int,
limit: int,
params: HuntressSIEMRequestParams | None = None,
) -> PaginatedResponse[SIEMOrganizations]:
"""
Performs a GET request against the /organizations endpoint and returns an initialized PaginatedResponse object.
Parameters:
page (int): The page number to request.
limit (int): The number of results to return per page.
params (dict[str, int | str]): The parameters to send in the request query string.
Returns:
PaginatedResponse[SIEMOrganizations]: The initialized PaginatedResponse object.
"""
if params:
params["page"] = page
params["limit"] = limit
else:
params = {"page": page, "limit": limit}
return PaginatedResponse(
super()._make_request("GET", params=params),
SIEMOrganizations,
self,
"organizations",
page,
limit,
params,
)
def get( def get(
self, self,

View File

@ -1,8 +1,10 @@
from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint
from pyhuntress.interfaces import ( from pyhuntress.interfaces import (
IGettable, IGettable,
IPaginateable,
) )
from pyhuntress.models.siem import SIEMReports from pyhuntress.models.siem import SIEMReports
from pyhuntress.responses.paginated_response import PaginatedResponse
from pyhuntress.types import ( from pyhuntress.types import (
JSON, JSON,
HuntressSIEMRequestParams, HuntressSIEMRequestParams,
@ -12,10 +14,56 @@ from pyhuntress.types import (
class ReportsEndpoint( class ReportsEndpoint(
HuntressEndpoint, HuntressEndpoint,
IGettable[SIEMReports, HuntressSIEMRequestParams], IGettable[SIEMReports, HuntressSIEMRequestParams],
IPaginateable[SIEMReports, HuntressSIEMRequestParams],
): ):
def __init__(self, client, parent_endpoint=None) -> None: def __init__(self, client, parent_endpoint=None) -> None:
HuntressEndpoint.__init__(self, client, "reports", parent_endpoint=parent_endpoint) HuntressEndpoint.__init__(self, client, "reports", parent_endpoint=parent_endpoint)
IGettable.__init__(self, SIEMReports) IGettable.__init__(self, SIEMReports)
IPaginateable.__init__(self, SIEMReports)
def id(self, id: int) -> HuntressEndpoint:
"""
Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain.
Parameters:
id (int): The ID to set.
Returns:
HuntressEndpoint: The initialized HuntressEndpoint object.
"""
child = HuntressEndpoint(self.client, parent_endpoint=self)
child._id = id
return child
def paginated(
self,
page: int,
limit: int,
params: HuntressSIEMRequestParams | None = None,
) -> PaginatedResponse[SIEMReports]:
"""
Performs a GET request against the /reports endpoint and returns an initialized PaginatedResponse object.
Parameters:
page (int): The page number to request.
limit (int): The number of results to return per page.
params (dict[str, int | str]): The parameters to send in the request query string.
Returns:
PaginatedResponse[SIEMReports]: The initialized PaginatedResponse object.
"""
if params:
params["page"] = page
params["limit"] = limit
else:
params = {"page": page, "limit": limit}
return PaginatedResponse(
super()._make_request("GET", params=params),
SIEMReports,
self,
"reports",
page,
limit,
params,
)
def get( def get(
self, self,
@ -23,7 +71,7 @@ class ReportsEndpoint(
params: HuntressSIEMRequestParams | None = None, params: HuntressSIEMRequestParams | None = None,
) -> SIEMReports: ) -> SIEMReports:
""" """
Performs a GET request against the /Reports endpoint. Performs a GET request against the /reports endpoint.
Parameters: Parameters:
data (dict[str, Any]): The data to send in the request body. data (dict[str, Any]): The data to send in the request body.

View File

@ -1,8 +1,10 @@
from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint from pyhuntress.endpoints.base.huntress_endpoint import HuntressEndpoint
from pyhuntress.interfaces import ( from pyhuntress.interfaces import (
IGettable, IGettable,
IPaginateable,
) )
from pyhuntress.models.siem import SIEMSignals from pyhuntress.models.siem import SIEMSignals
from pyhuntress.responses.paginated_response import PaginatedResponse
from pyhuntress.types import ( from pyhuntress.types import (
JSON, JSON,
HuntressSIEMRequestParams, HuntressSIEMRequestParams,
@ -12,10 +14,56 @@ from pyhuntress.types import (
class SignalsEndpoint( class SignalsEndpoint(
HuntressEndpoint, HuntressEndpoint,
IGettable[SIEMSignals, HuntressSIEMRequestParams], IGettable[SIEMSignals, HuntressSIEMRequestParams],
IPaginateable[SIEMSignals, HuntressSIEMRequestParams],
): ):
def __init__(self, client, parent_endpoint=None) -> None: def __init__(self, client, parent_endpoint=None) -> None:
HuntressEndpoint.__init__(self, client, "signals", parent_endpoint=parent_endpoint) HuntressEndpoint.__init__(self, client, "signals", parent_endpoint=parent_endpoint)
IGettable.__init__(self, SIEMSignals) IGettable.__init__(self, SIEMSignals)
IPaginateable.__init__(self, SIEMSignals)
def id(self, id: int) -> HuntressEndpoint:
"""
Sets the ID for this endpoint and returns an initialized HuntressEndpoint object to move down the chain.
Parameters:
id (int): The ID to set.
Returns:
HuntressEndpoint: The initialized HuntressEndpoint object.
"""
child = HuntressEndpoint(self.client, parent_endpoint=self)
child._id = id
return child
def paginated(
self,
page: int,
limit: int,
params: HuntressSIEMRequestParams | None = None,
) -> PaginatedResponse[SIEMSignals]:
"""
Performs a GET request against the /signals endpoint and returns an initialized PaginatedResponse object.
Parameters:
page (int): The page number to request.
limit (int): The number of results to return per page.
params (dict[str, int | str]): The parameters to send in the request query string.
Returns:
PaginatedResponse[SIEMSignals]: The initialized PaginatedResponse object.
"""
if params:
params["page"] = page
params["limit"] = limit
else:
params = {"page": page, "limit": limit}
return PaginatedResponse(
super()._make_request("GET", params=params),
SIEMSignals,
self,
"signals",
page,
limit,
params,
)
def get( def get(
self, self,
@ -23,7 +71,7 @@ class SignalsEndpoint(
params: HuntressSIEMRequestParams | None = None, params: HuntressSIEMRequestParams | None = None,
) -> SIEMSignals: ) -> SIEMSignals:
""" """
Performs a GET request against the /Signals endpoint. Performs a GET request against the /signals endpoint.
Parameters: Parameters:
data (dict[str, Any]): The data to send in the request body. data (dict[str, Any]): The data to send in the request body.

View File

@ -1,8 +1,9 @@
from __future__ import annotations from __future__ import annotations
import json
from typing import TYPE_CHECKING, Generic, TypeVar from typing import TYPE_CHECKING, Generic, TypeVar
from pyhuntress.utils.helpers import parse_link_headers from pyhuntress.utils.helpers import parse_link_headers, parse_response_body
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Iterable from collections.abc import Iterable
@ -38,9 +39,10 @@ class PaginatedResponse(Generic[TModel]):
self, self,
response: Response, response: Response,
response_model: type[TModel], response_model: type[TModel],
endpoint: IPaginateable, endpointmodel: IPaginateable,
endpoint: str,
page: int, page: int,
page_size: int, limit: int,
params: RequestParams | None = None, params: RequestParams | None = None,
) -> None: ) -> None:
""" """
@ -56,40 +58,44 @@ class PaginatedResponse(Generic[TModel]):
expected model type for the response data. This allows for type-safe handling expected model type for the response data. This allows for type-safe handling
of model instances throughout the class. of model instances throughout the class.
""" """
self._initialize(response, response_model, endpoint, page, page_size, params) self._initialize(response, response_model, endpointmodel, endpoint, page, limit, params)
def _initialize( # noqa: ANN202 def _initialize(
self, self,
response: Response, response: Response,
response_model: type[TModel], response_model: type[TModel],
endpoint: IPaginateable, endpointmodel: IPaginateable,
endpoint: str,
page: int, page: int,
page_size: int, limit: int,
params: RequestParams | None = None, params: RequestParams | None = None,
): ):
""" """
Initialize the instance variables using the provided response, endpoint, and page size. Initialize the instance variables using the provided response, endpointmodel, and page size.
Args: Args:
response: The raw response object from the API. response: The raw response object from the API.
endpoint (HuntressEndpoint[TModel]): The endpoint associated with the response. endpointmodel (HuntressEndpoint[TModel]): The endpointmodel associated with the response.
page_size (int): The number of items per page. endpoint: The endpoint url to extract the data
limit (int): The number of items per page.
""" """
self.response = response self.response = response
self.response_model = response_model self.response_model = response_model
self.endpointmodel = endpointmodel
self.endpoint = endpoint self.endpoint = endpoint
self.page_size = page_size self.limit = limit
print(self.endpoint)
# The following for SIEM is in the response body, not the headers # The following for SIEM is in the response body, not the headers
self.parsed_pagination_response = None #parse_link_headers(response.headers) self.parsed_pagination_response = parse_response_body(json.loads(response.content.decode('utf-8')).get('pagination', {}))
self.params = params self.params = params
if self.parsed_pagination_response is not None: if self.parsed_pagination_response is not None:
# Huntress SIEM API gives us a handy response to parse for Pagination # Huntress SIEM API gives us a handy response to parse for Pagination
self.has_next_page: bool = self.parsed_link_headers.get("has_next_page", False) self.has_next_page: bool = self.parsed_pagination_response.get("has_next_page", False)
self.has_prev_page: bool = self.parsed_link_headers.get("has_prev_page", False) self.has_prev_page: bool = self.parsed_pagination_response.get("has_prev_page", False)
self.first_page: int = self.parsed_link_headers.get("first_page", None) self.first_page: int = self.parsed_pagination_response.get("first_page", None)
self.prev_page: int = self.parsed_link_headers.get("prev_page", None) self.prev_page: int = self.parsed_pagination_response.get("prev_page", None)
self.next_page: int = self.parsed_link_headers.get("next_page", None) self.next_page: int = self.parsed_pagination_response.get("next_page", None)
self.last_page: int = self.parsed_link_headers.get("last_page", None) self.last_page: int = self.parsed_pagination_response.get("last_page", None)
else: else:
# Huntress Managed SAT might, haven't worked on this yet # Huntress Managed SAT might, haven't worked on this yet
self.has_next_page: bool = True self.has_next_page: bool = True
@ -98,7 +104,7 @@ class PaginatedResponse(Generic[TModel]):
self.prev_page = page - 1 if page > 1 else 1 self.prev_page = page - 1 if page > 1 else 1
self.next_page = page + 1 self.next_page = page + 1
self.last_page = 999999 self.last_page = 999999
self.data: list[TModel] = [response_model.model_validate(d) for d in response.json()] self.data: list[TModel] = [response_model.model_validate(d) for d in response.json().get(endpoint, {})]
self.has_data = self.data and len(self.data) > 0 self.has_data = self.data and len(self.data) > 0
self.index = 0 self.index = 0
@ -114,13 +120,13 @@ class PaginatedResponse(Generic[TModel]):
self.has_data = False self.has_data = False
return self return self
next_response = self.endpoint.paginated(self.next_page, self.page_size, self.params) next_response = self.endpointmodel.paginated(self.next_page, self.limit, self.params)
self._initialize( self._initialize(
next_response.response, next_response.response,
next_response.response_model, next_response.response_model,
next_response.endpoint, next_response.endpointmodel,
self.next_page, self.next_page,
next_response.page_size, next_response.limit,
self.params, self.params,
) )
return self return self
@ -137,18 +143,18 @@ class PaginatedResponse(Generic[TModel]):
self.has_data = False self.has_data = False
return self return self
prev_response = self.endpoint.paginated(self.prev_page, self.page_size, self.params) prev_response = self.endpointmodel.paginated(self.prev_page, self.limit, self.params)
self._initialize( self._initialize(
prev_response.response, prev_response.response,
prev_response.response_model, prev_response.response_model,
prev_response.endpoint, prev_response.endpointmodel,
self.prev_page, self.prev_page,
prev_response.page_size, prev_response.limit,
self.params, self.params,
) )
return self return self
def all(self) -> Iterable[TModel]: # noqa: A003 def all(self) -> Iterable[TModel]:
""" """
Iterate through all items in the paginated response, across all pages. Iterate through all items in the paginated response, across all pages.
@ -159,7 +165,7 @@ class PaginatedResponse(Generic[TModel]):
yield from self.data yield from self.data
self.get_next_page() self.get_next_page()
def __iter__(self): # noqa: ANN204 def __iter__(self):
""" """
Implement the iterator protocol for the PaginatedResponse class. Implement the iterator protocol for the PaginatedResponse class.
@ -168,7 +174,7 @@ class PaginatedResponse(Generic[TModel]):
""" """
return self return self
def __dict__(self): # noqa: ANN204 def __dict__(self):
""" """
Implement the iterator protocol for the PaginatedResponse class. Implement the iterator protocol for the PaginatedResponse class.
@ -177,7 +183,7 @@ class PaginatedResponse(Generic[TModel]):
""" """
return self.data return self.data
def __next__(self): # noqa: ANN204 def __next__(self):
""" """
Implement the iterator protocol by getting the next item in the data. Implement the iterator protocol by getting the next item in the data.
@ -191,5 +197,5 @@ class PaginatedResponse(Generic[TModel]):
result = self.data[self.index] result = self.data[self.index]
self.index += 1 self.index += 1
return result return result
else: # noqa: RET505 else:
raise StopIteration raise StopIteration

View File

@ -1,4 +1,5 @@
import re import re
import math
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
@ -23,8 +24,84 @@ def cw_format_datetime(dt: datetime) -> str:
""" """
return dt.strftime("%Y-%m-%dT%H:%M:%SZ") return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def parse_response_body(
body: CaseInsensitiveDict,
) -> dict[str, Any] | None:
"""
Parses response body to extract pagination information.
def parse_link_headers( # noqa: C901 Arguments:
- body: content.json().get('pagination', {}) A dictionary containing the headers of an HTTP response.
Returns:
- A dictionary containing the extracted pagination information. The keys in the dictionary include:
- "first_page": An optional integer representing the number of the first page.
- "prev_page": An optional integer representing the number of the previous page.
- "next_page": An optional integer representing the number of the next page.
- "last_page": An optional integer representing the number of the last page.
- "has_next_page": A boolean indicating whether there is a next page.
- "has_prev_page": A boolean indicating whether there is a previous page.
If the "Link" header is not present in the headers dictionary, None is returned.
Example Usage:
headers = {
"Link": '<https://example.com/api?page=1>; rel="first", <https://example.com/api?page=2>; rel="next"'
}
pagination_info = parse_link_headers(headers)
print(pagination_info)
# Output: {'first_page': 1, 'next_page': 2, 'has_next_page': True}
"""
if body.get("current_page") is None:
return None
has_next_page: bool = False
has_prev_page: bool = False
first_page: int | None = None
prev_page: int | None = None
current_page: int | None = None
current_page_count: int | None = None
limit: int | None = None
total_count: int | None = None
next_page: int | None = None
next_page_url: str | None = None
next_page_token: str | None = None
last_page: int | None = None
result = {}
if first_page is not None:
result["first_page"] = first_page
if prev_page is not None:
result["prev_page"] = prev_page
elif current_page is not None:
if current_page > 1:
result["prev_page"] = current_page - 1
if next_page is not None:
result["next_page"] = next_page
if last_page is not None:
result["last_page"] = last_page
elif last_page is None and current_page is not None:
result["last_page"] = math.ceil(total_count/limit)
if has_next_page:
result["has_next_page"] = has_next_page
elif current_page is not None and next_page is not None:
result["has_next_page"] = True
elif current_page is not None and next_page is None:
result["has_next_page"] = False
if has_prev_page:
result["has_prev_page"] = has_prev_page
elif current_page is not None:
if current_page > 1:
result["has_prev_page"] = True
return result
def parse_link_headers(
headers: CaseInsensitiveDict, headers: CaseInsensitiveDict,
) -> dict[str, Any] | None: ) -> dict[str, Any] | None:
""" """

View File

@ -39,5 +39,17 @@ siem_api_client = HuntressSIEMAPIClient(
#signals = siem_api_client.signals.get() #signals = siem_api_client.signals.get()
#print(signals) #print(signals)
#paginated_agents = siem_api_client.agents.paginated(1, 10) paginated_billingreports = siem_api_client.billing_reports.paginated(1, 10)
#print(paginated_agents) print(paginated_billingreports.data)
paginated_incidentreports = siem_api_client.incident_reports.paginated(1, 10)
print(paginated_incidentreports.data)
paginated_organizations = siem_api_client.organizations.paginated(1, 10)
print(paginated_organizations.data)
paginated_reports = siem_api_client.reports.paginated(1, 10)
print(paginated_reports.data)
paginated_signals = siem_api_client.signals.paginated(1, 10)
print(paginated_signals.data)