Skip to content

Commit

Permalink
Merge pull request #83 from casework/support_strict_review_on_example_py
Browse files Browse the repository at this point in the history
Add type signatures in support of strict type-review on example.py
kchason authored Dec 2, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 1331422 + 9734853 commit 89649a3
Showing 8 changed files with 176 additions and 86 deletions.
44 changes: 22 additions & 22 deletions case.jsonld
Original file line number Diff line number Diff line change
@@ -16,25 +16,6 @@
"uco-core:createdBy": {
"@id": "kb:aef8e4c4-db83-59fd-8f71-b65cd7676c0a"
},
"@context": {
"@vocab": "http://caseontology.org/core#",
"case-investigation": "https://ontology.caseontology.org/case/investigation/",
"drafting": "http://example.org/ontology/drafting/",
"co": "http://purl.org/co/",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"uco-action": "https://ontology.unifiedcyberontology.org/uco/action/",
"uco-core": "https://ontology.unifiedcyberontology.org/uco/core/",
"uco-identity": "https://ontology.unifiedcyberontology.org/uco/identity/",
"uco-location": "https://ontology.unifiedcyberontology.org/uco/location/",
"uco-role": "https://ontology.unifiedcyberontology.org/uco/role/",
"uco-observable": "https://ontology.unifiedcyberontology.org/uco/observable/",
"uco-tool": "https://ontology.unifiedcyberontology.org/uco/tool/",
"uco-types": "https://ontology.unifiedcyberontology.org/uco/types/",
"uco-vocabulary": "https://ontology.unifiedcyberontology.org/uco/vocabulary/",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"kb": "http://example.org/kb/"
},
"uco-core:object": [
{
"@id": "kb:aef8e4c4-db83-59fd-8f71-b65cd7676c0a",
@@ -579,7 +560,6 @@
"@id": "kb:a19e1e1f-3953-5fb9-92b6-2b46f85752b2",
"@type": "case-investigation:Investigation",
"uco-core:name": "Crime A",
"case-investigation:focus": "Transfer of Illicit Materials",
"uco-core:description": "Inquiry into the transfer of illicit materials and the devices used to do so",
"uco-core:object": [
{
@@ -710,7 +690,8 @@
}
]
}
]
],
"case-investigation:focus": "Transfer of Illicit Materials"
},
{
"@id": "kb:6f79d4ae-d92c-5cad-bbe5-a0afde6f475a",
@@ -1853,5 +1834,24 @@
}
]
}
]
],
"@context": {
"@vocab": "http://caseontology.org/core#",
"case-investigation": "https://ontology.caseontology.org/case/investigation/",
"drafting": "http://example.org/ontology/drafting/",
"co": "http://purl.org/co/",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"uco-action": "https://ontology.unifiedcyberontology.org/uco/action/",
"uco-core": "https://ontology.unifiedcyberontology.org/uco/core/",
"uco-identity": "https://ontology.unifiedcyberontology.org/uco/identity/",
"uco-location": "https://ontology.unifiedcyberontology.org/uco/location/",
"uco-role": "https://ontology.unifiedcyberontology.org/uco/role/",
"uco-observable": "https://ontology.unifiedcyberontology.org/uco/observable/",
"uco-tool": "https://ontology.unifiedcyberontology.org/uco/tool/",
"uco-types": "https://ontology.unifiedcyberontology.org/uco/types/",
"uco-vocabulary": "https://ontology.unifiedcyberontology.org/uco/vocabulary/",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"kb": "http://example.org/kb/"
}
}
10 changes: 4 additions & 6 deletions case_mapping/case/investigation.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

from ..base import Facet, UcoObject
from ..uco.action import Action
from ..uco.core import ContextualCompilation
from ..uco.location import Location


@@ -46,8 +47,8 @@ def __init__(
self["@type"] = "case-investigation:InvestigativeAction"


class CaseInvestigation(UcoObject):
def __init__(self, name=None, focus=None, description=None, core_objects=None):
class CaseInvestigation(ContextualCompilation):
def __init__(self, *args: Any, focus=None, **kwargs: Any) -> None:
"""
An investigative action is a CASE object that represents the who, where, when of investigation
:param name: The name of an investigation (e.g., Murder of Suspect B,.)
@@ -57,16 +58,13 @@ def __init__(self, name=None, focus=None, description=None, core_objects=None):
object e.g., Persons involved in investigation, Investigation into a Murder, object refrences a
case-object for a phone investigative action
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "case-investigation:Investigation"
self._str_vars(
**{
"uco-core:name": name,
"case-investigation:focus": focus,
"uco-core:description": description,
}
)
self.append_core_objects(core_objects)


class ProvenanceRecord(UcoObject):
8 changes: 6 additions & 2 deletions case_mapping/drafting/entities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from ..base import Facet, UcoObject, unpack_args_array


@@ -175,6 +177,7 @@ def __init__(
class SocialMediaActivityFacet(Facet):
def __init__(
self,
*args: Any,
body=None,
page_title=None,
author_identifier=None,
@@ -187,7 +190,8 @@ def __init__(
created_time=None,
application=None,
url=None,
):
**kwargs: Any,
) -> None:
"""
Used to represent activity on social platfomrs
:param body: The text of the post/message
@@ -203,7 +207,7 @@ def __init__(
:param application: the application used for creating the post
:param application: the URL of the post
"""
super().__init__()
super().__init__(*args, **kwargs)

self["@type"] = ["drafting:SocialMediaActivityFacet", "uco-core:Facet"]

85 changes: 74 additions & 11 deletions case_mapping/uco/core.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,85 @@
from datetime import datetime
from typing import Any, List, Optional, Union
from typing import Any, Optional, Sequence, Union

from pytz import timezone

from ..base import UcoObject, unpack_args_array


class Bundle(UcoObject):
class Compilation(UcoObject):
def __init__(
self,
*args: Any,
core_objects: Optional[Sequence[UcoObject]] = None,
**kwargs: Any,
) -> None:
"""
A compilation is a grouping of things.
"""
super().__init__(*args, **kwargs)
self["@type"] = "uco-core:Compilation"
if core_objects is not None and len(core_objects) > 0:
self.append_core_objects(core_objects)

@unpack_args_array
def append_to_uco_object(self, *args) -> None:
"""
Add a single/tuple of result(s) to the list of outputs from an action
:param args: A CASE object, or objects, often an observable. (e.g., one of many devices from a search operation)
"""
self._append_observable_objects("uco-core:object", *args)


class ContextualCompilation(Compilation):
def __init__(
self,
*args: Any,
core_objects: Sequence[UcoObject],
**kwargs: Any,
) -> None:
"""
A contextual compilation is a grouping of things sharing some context (e.g., a set of network connections observed on a given day, all accounts associated with a given person).
Future implementation note: At and before CASE 1.3.0, at least one core:object must be supplied at instantiation time of a contextual compilation. At and after CASE 1.4.0, these objects will be optional.
"""
if len(core_objects) == 0:
raise ValueError(
"A ContextualCompilation is required to have at least one UcoObject to link at initiation time. This will become optional in CASE 1.4.0."
)
super().__init__(*args, **kwargs)
self["@type"] = "uco-core:ContextualCompilation"
self.append_core_objects(core_objects)


class EnclosingCompilation(Compilation):
def __init__(
self,
*args: Any,
core_objects: Sequence[UcoObject],
**kwargs: Any,
) -> None:
"""
An enclosing compilation is a container for a grouping of things.
"""
if len(core_objects) == 0:
raise ValueError(
"An EnclosingCompilation is required to have at least one UcoObject to link at initiation time."
)
super().__init__(*args, **kwargs)
self["@type"] = "uco-core:EnclosingCompilation"
self.append_core_objects(core_objects)


class Bundle(EnclosingCompilation):
def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
"""
The main CASE Object for representing a case and its activities and objects.
Instantiating this class requires a starter sequence (set, list, or tuple) to be passed using the core_objects parameter. (See EnclosingCompilation.) To confirm conformant CASE will be generated, at least one UcoObject must be passed in this list. However, this does not initially need to be the complete sequence of objects that will be in this Bundle. Other UcoObjects can be added after initialization with bundle.append_to_uco_object.
"""
super().__init__(*args, **kwargs)
self.build = [] # type: ignore
@@ -39,6 +105,7 @@ def __init__(
# Assign caller-selectible prefix label and IRI, after checking
# for conflicts with hard-coded prefixes.
# https://www.w3.org/TR/turtle/#prefixed-name
assert isinstance(self["@context"], dict)
if self.prefix_label in self["@context"]:
raise ValueError(
"Requested prefix label already in use in hard-coded dictionary: '%s'. Please revise caller to use another label."
@@ -51,14 +118,6 @@ def __init__(
def append_to_case_graph(self, *args):
self._append_observable_objects("@graph", *args)

@unpack_args_array
def append_to_uco_object(self, *args):
"""
Add a single/tuple of result(s) to the list of outputs from an action
:param args: A CASE object, or objects, often an observable. (e.g., one of many devices from a search operation)
"""
self._append_observable_objects("uco-core:object", *args)

@unpack_args_array
def append_to_rdfs_comments(self, *args):
self._append_strings("rdfs:comment", *args)
@@ -119,4 +178,8 @@ def _addtime(self, _type: str) -> None:
}


directory = {"uco-core:Bundle": Bundle}
directory = {
"uco-core:Bundle": Bundle,
"uco-core:Compilation": Compilation,
"uco-core:ContextualCompilation": ContextualCompilation,
}
12 changes: 7 additions & 5 deletions case_mapping/uco/identity.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from typing import Dict, Optional
from typing import Any, Dict, Optional

from ..base import Facet, IdentityAbstraction, UcoObject


class BirthInformationFacet(Facet):
def __init__(self, birthdate=None):
def __init__(self, *args: Any, birthdate=None, **kwargs: Any) -> None:
"""
:param birthdate: the date of birth of an identity
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-identity:BirthInformationFacet"
self._datetime_vars(**{"uco-identity:birthdate": birthdate})

@@ -31,12 +31,14 @@ def __init__(self, name: Optional[str] = None, facets=None):


class SimpleNameFacet(Facet):
def __init__(self, given_name=None, family_name=None):
def __init__(
self, *args: Any, given_name=None, family_name=None, **kwargs: Any
) -> None:
"""
:param given_name: Full name of the identity of person
:param family_name: Family name of identity of person
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-identity:SimpleNameFacet"
self._str_vars(
**{
7 changes: 3 additions & 4 deletions case_mapping/uco/location.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from typing import Optional
from typing import Any, Optional

from ..base import Facet, UcoObject


class Location(UcoObject):
def __init__(self, facets=None):
super().__init__()
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self["@type"] = "uco-location:Location"
self.append_facets(facets)


class LatLongCoordinatesFacet(Facet):
92 changes: 58 additions & 34 deletions case_mapping/uco/observable.py
Original file line number Diff line number Diff line change
@@ -209,15 +209,17 @@ def __init__(


class AccountFacet(Facet):
def __init__(self, identifier=None, is_active=True, issuer_id=None):
def __init__(
self, *args: Any, identifier=None, is_active=True, issuer_id=None, **kwargs: Any
) -> None:
"""
Used to represent user accounts
:param is_active: Active unless specified otherwise (False)
:param identifier: The idenitifier of the account (like a Skype username)
:param issuer_id: The id of issuing body for application
(e.g., kb:organization-skypeapp-cc44c2ae-bdd3-4df8-9ca3-1f58d682d62b)
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:AccountFacet"
self._bool_vars(**{"uco-observable:isActive": is_active})
self._str_vars(
@@ -292,6 +294,7 @@ def __init__(
class ContentDataFacet(Facet):
def __init__(
self,
*args: Any,
byte_order=None,
magic_number=None,
mime_type=None,
@@ -301,7 +304,8 @@ def __init__(
is_encrypted=None,
hash_method=None,
hash_value=None,
):
**kwargs: Any,
) -> None:
"""
The characteristics of a block of digital data.
:param byte_order: Byte order of data. Example - "BigEndian"
@@ -314,7 +318,7 @@ def __init__(
:param hash_method: The algorithm used to calculate the hash value
:param hash_value: The cryptographic hash of this content
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:ContentDataFacet"
self._str_vars(
**{
@@ -334,7 +338,7 @@ def __init__(
}

if hash_method is not None or hash_value is not None or hash_value != "-":
data = {
data: dict[str, Any] = {
"@id": self.prefix_label + ":" + str(local_uuid()),
"@type": "uco-types:Hash",
}
@@ -392,15 +396,23 @@ def __init__(self, range_offset=None, range_size=None):


class DeviceFacet(Facet):
def __init__(self, device_type=None, manufacturer=None, model=None, serial=None):
def __init__(
self,
*args: Any,
device_type=None,
manufacturer=None,
model=None,
serial=None,
**kwargs: Any,
) -> None:
"""
Characteristics of a piece of electronic equipment.
:param device_type: The type of device (e.g., "camera")
:param manufacturer: The producer of the device (e.g., "Canon")
:param model: The model of the device (e.g., "Powershot SX540")
:param serial: The serial phone_number of the device (e.g., "1296-3219-8792-CL918")
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:DeviceFacet"
self._node_reference_vars(**{"uco-observable:manufacturer": manufacturer})
self._str_vars(
@@ -519,7 +531,9 @@ def __init__(


class UrlHistoryFacet(Facet):
def __init__(self, browser=None, history_entries=None):
def __init__(
self, *args: Any, browser=None, history_entries=None, **kwargs: Any
) -> None:
"""
:param browser_info: An observable object containing a URLHistoryFacet
:param history_entries: A list of dictionaries, each dict has the
@@ -536,7 +550,7 @@ def __init__(self, browser=None, history_entries=None):
"uco-observable:url": url_object,
"uco-observable:visitCount": int,
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:URLHistoryFacet"
self._node_reference_vars(
**{
@@ -561,7 +575,7 @@ def __init__(self, browser=None, history_entries=None):

self["uco-observable:urlHistoryEntry"] = []
for entry in history_entries:
history_entry = {}
history_entry: dict[str, Any] = dict()
history_entry["@id"] = self.prefix_label + ":" + local_uuid()
history_entry["@type"] = "uco-observable:URLHistoryEntry"
for key, var in entry.items():
@@ -619,6 +633,7 @@ def __init__(self, browser=None, history_entries=None):
class UrlFacet(Facet):
def __init__(
self,
*args: Any,
url_address=None,
url_port=None,
url_host=None,
@@ -628,7 +643,8 @@ def __init__(
url_query=None,
url_scheme=None,
url_username=None,
):
**kwargs: Any,
) -> None:
"""
:param url_address: an address of a url (i.e. google.ie)
:param url_port: a tcp or udp port of a url for example 3000
@@ -640,7 +656,7 @@ def __init__(
:param url_scheme: Identifies the type of URL. (e.g. ssh://)
:param url_username: A username that may be required for authentication for a specific resource. (login)
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:URLFacet"
self._str_vars(
**{
@@ -701,13 +717,15 @@ def __init__(
class RasterPictureFacet(Facet):
def __init__(
self,
*args: Any,
camera_id=None,
bits_per_pixel=None,
picture_height=None,
picture_width=None,
image_compression_method=None,
picture_type=None,
):
**kwargs: Any,
) -> None:
"""
This CASEObject represents the contents of a file or device
:param camera_id: An observable cyberitem
@@ -717,7 +735,7 @@ def __init__(
:param image_compression_method: The compression method used
:param picture_type: The type of picture ("jpg", "png" etc.)
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:RasterPictureFacet"
self._str_vars(
**{
@@ -786,11 +804,11 @@ def __init__(


class PhoneAccountFacet(Facet):
def __init__(self, phone_number=None):
def __init__(self, *args: Any, phone_number=None, **kwargs: Any) -> None:
"""
:param phone_number: The number for this account (e.g., "+16503889249")
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:PhoneAccountFacet"
self._str_vars(
**{
@@ -800,22 +818,24 @@ def __init__(self, phone_number=None):


class EmailAccountFacet(Facet):
def __init__(self, email_address):
def __init__(self, *args: Any, email_address, **kwargs: Any) -> None:
"""
:param email_address: An ObservableObject (with EmailAdressFacet)
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:EmailAccountFacet"
self._node_reference_vars(**{"uco-observable:emailAddress": email_address})


class EmailAddressFacet(Facet):
def __init__(self, email_address_value=None, display_name=None):
def __init__(
self, *args: Any, email_address_value=None, display_name=None, **kwargs: Any
) -> None:
"""
Used to represent the value of an email address.
:param email_address_value: a single email address (e.g., "bob@example.com")
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:EmailAddressFacet"
self._str_vars(
**{
@@ -828,6 +848,7 @@ def __init__(self, email_address_value=None, display_name=None):
class EmailMessageFacet(Facet):
def __init__(
self,
*args: Any,
msg_to=None,
msg_from=None,
cc=None,
@@ -853,7 +874,8 @@ def __init__(
is_mime_encoded=None,
allocation_status=None,
is_multipart=None,
):
**kwargs: Any,
) -> None:
"""
An instance of an email message, corresponding to the internet message format described in RFC 5322 and related.
:param msg_to: A list of ObservableObjects (with EmailAccountFacet)
@@ -882,7 +904,7 @@ def __init__(
:param is_multipart: A boolean True/False
:param allocation_status:
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:EmailMessageFacet"
self._str_vars(
**{
@@ -928,20 +950,22 @@ def __init__(


class EXIFFacet(Facet):
def __init__(self, **kwargs):
def __init__(
self, *args: Any, exif_key_value_pairs: dict[str, str], **kwargs: Any
) -> None:
"""
Specifies exchangeable image file format (Exif) metadata tags for image and sound files recorded by digital cameras.
:param kwargs: The user provided key/value pairs of exif items (e.g., Make="Canon", etc.).
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:EXIFFacet"

self["uco-observable:exifData"] = {
"@id": self.prefix_label + ":" + str(local_uuid()),
"@type": "uco-types:ControlledDictionary",
"uco-types:entry": [],
}
for k, v in kwargs.items():
for k, v in exif_key_value_pairs.items():
if v not in ["", " "]:
item = {
"@id": self.prefix_label + ":" + str(local_uuid()),
@@ -1147,6 +1171,7 @@ def __init__(
class MessageFacet(Facet):
def __init__(
self,
*args: Any,
msg_to=None,
msg_from=None,
message_text=None,
@@ -1155,7 +1180,8 @@ def __init__(
message_type=None,
message_id=None,
session_id=None,
):
**kwargs: Any,
) -> None:
"""
Characteristics of an electronic message.
:param msg_to: A list of ObservableObjects
@@ -1167,7 +1193,7 @@ def __init__(
:param message_id: A unique identifier for the message.
:param session_id: The priority of the email.
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:MessageFacet"
self._str_vars(
**{
@@ -1434,13 +1460,13 @@ def __init__(


class ApplicationAccountFacet(Facet):
def __init__(self, application=None):
def __init__(self, *args: Any, application=None, **kwargs: Any) -> None:
"""
An application account facet is a grouping of characteristics unique to an account within a particular software
program designed for end users.
:param application: An Observable Object (containing an Application Facet)
"""
super().__init__()
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:ApplicationAccountFacet"
self._node_reference_vars(**{"uco-observable:application": application})

@@ -1620,12 +1646,10 @@ def append_participants(self, *args):
self._append_refs("uco-observable:participant", *args)


class MessageThread(UcoObject):
def __init__(self, name=None, facets=None):
super().__init__()
class MessageThread(ObservableObject):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self["@type"] = "uco-observable:MessageThread"
self._str_vars(**{"uco-core:name": name})
self.append_facets(facets)


class Message(ObservableObject):
4 changes: 2 additions & 2 deletions example.py
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ def _next_timestamp() -> datetime:
bundle_modified_time = datetime.strptime("2024-05-02T21:38:19", "%Y-%m-%dT%H:%M:%S")

bundle = uco.core.Bundle(
core_objects=[bundle_identity],
created_by=bundle_identity,
description="An Example Case File",
modified_time=bundle_modified_time,
@@ -47,7 +48,6 @@ def _next_timestamp() -> datetime:
spec_version="UCO/CASE 1.3",
tag="Artifacts extracted from a mobile phone",
)
bundle.append_to_uco_object(bundle_identity)

investigation_items: list[base.UcoObject] = []

@@ -117,7 +117,7 @@ def _next_timestamp() -> datetime:
)

exif = {"Make": "Canon", "Model": "Powershot"}
file_exif1 = uco.observable.EXIFFacet(**exif)
file_exif1 = uco.observable.EXIFFacet(exif_key_value_pairs=exif)
sd_card.append_facets(file1, file_content1, file_raster1, file_exif1)
bundle.append_to_uco_object(sd_card)

0 comments on commit 89649a3

Please sign in to comment.