Skip to content

API reference

Reference for the technical implementation of the arkiverse project code.

assets

This package provides utilities for managing archive assets as they are written into a graph database, or converted into arkiverse objects for downstream processing.

Modules:

Name Description
database

Interface to interact with a Neo4J database using the Neo4J Python driver.

ml

Defines wrapper classes for datasets and AI models.

objects

Arkiverse object definitions: File, Document, Ontology, etc.

assets.database

This module provides an interface to interact with a Neo4J database using the Neo4J Python driver. It includes a Driver class for managing database connections and executing queries, as well as several utility functions for running queries and managing nodes.

Class

Driver: A class to manage Neo4J database connections and execute queries.

Driver

This class interacts with the Neo4J database. Written specifically to avoid SQL injection attacks.

Source code in arkiverse\assets\database.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class Driver:
    """
    This class interacts with the Neo4J database. Written specifically to avoid SQL injection attacks.
    """

    from neo4j import GraphDatabase
    from neo4j.exceptions import ServiceUnavailable


    def __init__(self, uri: str, user: str, password: str) -> None:
        """
        Initialize the Driver instance.

        Args:
            uri: The URI for the Neo4J database.
            user: The username for the Neo4J database.
            password: The password for the Neo4J database.
        """

        self.driver = GraphDatabase.driver(uri, auth=(user, password))

        self.database_name = "neo4j"


    def close(self) -> None:
        """
        Closes the driver connection.

        This method should be called when you are finished with the driver to ensure
        that the connection is properly closed and resources are released.
        """

        self.driver.close()


    @staticmethod
    def enable_log(level: int, output_stream: object) -> None:
        """
        Enable logging for the Neo4j driver.

        This method sets up a logging handler for the Neo4j driver,
        allowing log messages to be output to the specified stream at the given log level.

        Args:
            level: The logging level (e.g., logging.DEBUG, logging.INFO).
            output_stream: The stream to which log messages should be written (e.g., sys.stdout, sys.stderr).

        Returns:
            None
        """

        handler = logging.StreamHandler(output_stream)
        handler.setLevel(level)
        logging.getLogger("neo4j").addHandler(handler)
        logging.getLogger("neo4j").setLevel(level)


    # this class method is a subset of query_write, but with more structured attributes and fields
    def generic_action(
        self,
        nodeID: str,
        label: str, 
        parentID: str, 
        relationship: str, 
        attributes: dict, 
        database: str = None
    ) -> Any:
        """
        Perform a generic action on the database, creating or updating a node and its relationship.

        Args:
            nodeID: The ID of the node to be created or updated.
            label: The label of the node.
            parentID: The ID of the parent node.
            relationship: The type of relationship between the node and the parent node.
            attributes: A dictionary of attributes to be set on the node.
            database: The name of the database to use. Defaults to None.

        Returns:
            Any: The result of the transaction.
        """

        if database is None:
            with self.driver.session() as session:
                result = session.write_transaction(
                    self._generic_action, nodeID, label, parentID, relationship, attributes
                )

                return result
        else:
            with self.driver.session(database=database) as session:
                result = session.write_transaction(
                    self._generic_action, nodeID, label, parentID, relationship, attributes
                )

                return result


    def update_metadata(self, nodeID: str, attributes: dict, database: str = None) -> Any:
        """
        Update the metadata for an existing node in the database.

        Args:
            nodeID: The ID of the node to be updated.
            attributes: A dictionary of attributes to be set on the node.
            database: The name of the database to use. Defaults to None.

        Returns:
            Any: The result of the transaction.
        """

        if database is None:
            with self.driver.session() as session:
                result = session.write_transaction(self._update_metadata, nodeID, attributes)

                return result
        else:
            with self.driver.session(database=database) as session:
                result = session.write_transaction(self._update_metadata, nodeID, attributes)

                return result


    # writes any cypher transaction for the neo4j database
    def query_write(self, query: str, database: str = None) -> dict:
        """
        Executes a write Cypher query on the specified database or the default database if none is provided.

        Args:
            query: The Cypher query to be executed.
            database: The name of the database to run the query against. Defaults to None.

        Returns:
            dict: The result of the query as a dictionary.
        """

        if database is None:
            with self.driver.session() as session:
                result = session.run(query)
                record = result.single()
                return record
        else:
            with self.driver.session(database=database) as session:
                result = session.run(query)
                record = result.single()
                return record


    # asks the neo4j database to return any query
    def query(self, query: str, database: str = None) -> Any:
        """
        Executes a read Cypher query on the specified database or the default database if none is provided.

        Args:
            query: The Cypher query to be executed.
            database: The name of the database to run the query against. Defaults to None.

        Returns:
            Any: The result of the query.
        """

        if database is None:
            with self.driver.session() as session:
                result = session.read_transaction(self._query, query)
        else:
            with self.driver.session(database=database) as session:
                result = session.read_transaction(self._query, query)
        return result


    @staticmethod
    def _generic_action(
        tx: Any,
        nodeID: str,
        label: str,
        parentID: str,
        relationship: str,
        attributes: Optional[Dict[str, Any]]
        ) -> List[Dict[str, Any]]:
        """
        Perform a generic action to merge a node and create a relationship in the database.
        This method merges a node with the given `nodeID` and `label`, sets its attributes if provided,
        and creates a relationship of the specified type with a parent node identified by `parentID`.

        Args:
            tx: The transaction context to run the query.
            nodeID: The unique identifier of the node to be merged.
            label: The label to be assigned to the node.
            parentID: The unique identifier of the parent node.
            relationship: The type of relationship to be created between the nodes.
            attributes: A dictionary of attributes to set on the node. Can be None.

        Returns:
            List[Dict[str, Any]]: A list of dictionaries representing the result rows of the query.

        Raises:
            ServiceUnavailable: If the query fails to execute due to a service unavailability error.
        """

        relationship = relationship.upper()
        query = "MERGE (p1 {nodeID: $nodeID}) "

        if attributes is not None:
            query += f"SET p1:{label} " "SET p1 += $attributes "

        query += (
            "WITH p1 "
            "MATCH (p2) "
            "WHERE p2.nodeID = $parentID "
            "CALL apoc.create.relationship(p2, $relationship, NULL, p1) YIELD rel "
            "RETURN p2, p1"
        )

        result = tx.run(query, nodeID=nodeID, parentID=parentID, relationship=relationship, attributes=attributes)

        try:
            return [row for row in result]
        # Capture any errors along with the query and data for traceability
        except ServiceUnavailable as exception:
            logging.exception(f"{query} raised an error: \n {exception}")
            raise


    @staticmethod
    def _update_metadata(tx: Any, nodeID: str, attributes: dict) -> list:
        """
        Update the metadata of a node in the database.
        This method executes a Cypher query to match a node by its nodeID and update its attributes.
        It returns the updated node.

        Args:
            tx: The transaction object to run the query.
            nodeID: The ID of the node to be updated.
            attributes: A dictionary of attributes to update the node with.

        Returns:
            list: A list of rows returned by the query, each representing the updated node.

        Raises:
            ServiceUnavailable: If the query execution fails, an exception is logged and re-raised.
        """

        query = "MATCH (p1 {nodeID: $nodeID}) "

        query += "SET p1 += $attributes "

        query += "RETURN p1"
        result = tx.run(query, nodeID=nodeID, attributes=attributes)

        try:
            return [row for row in result]
        # Capture any errors along with the query and data for traceability
        except ServiceUnavailable as exception:
            logging.exception(f"{query} raised an error: \n {exception}")
            raise


    @staticmethod
    def _query_write(tx: Any, query: str) -> None:
        """
        Executes a write query within a transaction.

        Args:
            tx: The transaction object to run the query on.
            query: The query string to be executed.

        Raises:
            ServiceUnavailable: If the query execution fails due to a service unavailability.
        """

        try:
            result = tx.run(query)
        except ServiceUnavailable as exception:
            logging.exception(f"{query} raised an error: \n {exception}")
            raise


    @staticmethod
    def _query(tx: Any, query: str) -> List[Dict[str, Any]]:
        """
        Executes a given query within a transaction and returns the results.

        Args:
            tx: The transaction object used to run the query.
            query: The query string to be executed.

        Returns:
            List[Dict[str, Any]]: A list of dictionaries containing the query results.

        Raises:
            ServiceUnavailable: If the query execution fails due to a service unavailability.
        """

        try:
            results = []
            result = tx.run(query)

            for i in result:
                results.append(i.data())

            return results

        except ServiceUnavailable as exception:
            logging.exception(f"{query} raised an error: \n {exception}")
            raise

__init__(uri, user, password)

Initialize the Driver instance.

Parameters:

Name Type Description Default
uri str

The URI for the Neo4J database.

required
user str

The username for the Neo4J database.

required
password str

The password for the Neo4J database.

required
Source code in arkiverse\assets\database.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, uri: str, user: str, password: str) -> None:
    """
    Initialize the Driver instance.

    Args:
        uri: The URI for the Neo4J database.
        user: The username for the Neo4J database.
        password: The password for the Neo4J database.
    """

    self.driver = GraphDatabase.driver(uri, auth=(user, password))

    self.database_name = "neo4j"

close()

Closes the driver connection.

This method should be called when you are finished with the driver to ensure that the connection is properly closed and resources are released.

Source code in arkiverse\assets\database.py
58
59
60
61
62
63
64
65
66
def close(self) -> None:
    """
    Closes the driver connection.

    This method should be called when you are finished with the driver to ensure
    that the connection is properly closed and resources are released.
    """

    self.driver.close()

enable_log(level, output_stream) staticmethod

Enable logging for the Neo4j driver.

This method sets up a logging handler for the Neo4j driver, allowing log messages to be output to the specified stream at the given log level.

Parameters:

Name Type Description Default
level int

The logging level (e.g., logging.DEBUG, logging.INFO).

required
output_stream object

The stream to which log messages should be written (e.g., sys.stdout, sys.stderr).

required

Returns:

Type Description
None

None

Source code in arkiverse\assets\database.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@staticmethod
def enable_log(level: int, output_stream: object) -> None:
    """
    Enable logging for the Neo4j driver.

    This method sets up a logging handler for the Neo4j driver,
    allowing log messages to be output to the specified stream at the given log level.

    Args:
        level: The logging level (e.g., logging.DEBUG, logging.INFO).
        output_stream: The stream to which log messages should be written (e.g., sys.stdout, sys.stderr).

    Returns:
        None
    """

    handler = logging.StreamHandler(output_stream)
    handler.setLevel(level)
    logging.getLogger("neo4j").addHandler(handler)
    logging.getLogger("neo4j").setLevel(level)

generic_action(nodeID, label, parentID, relationship, attributes, database=None)

Perform a generic action on the database, creating or updating a node and its relationship.

Parameters:

Name Type Description Default
nodeID str

The ID of the node to be created or updated.

required
label str

The label of the node.

required
parentID str

The ID of the parent node.

required
relationship str

The type of relationship between the node and the parent node.

required
attributes dict

A dictionary of attributes to be set on the node.

required
database str

The name of the database to use. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the transaction.

Source code in arkiverse\assets\database.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def generic_action(
    self,
    nodeID: str,
    label: str, 
    parentID: str, 
    relationship: str, 
    attributes: dict, 
    database: str = None
) -> Any:
    """
    Perform a generic action on the database, creating or updating a node and its relationship.

    Args:
        nodeID: The ID of the node to be created or updated.
        label: The label of the node.
        parentID: The ID of the parent node.
        relationship: The type of relationship between the node and the parent node.
        attributes: A dictionary of attributes to be set on the node.
        database: The name of the database to use. Defaults to None.

    Returns:
        Any: The result of the transaction.
    """

    if database is None:
        with self.driver.session() as session:
            result = session.write_transaction(
                self._generic_action, nodeID, label, parentID, relationship, attributes
            )

            return result
    else:
        with self.driver.session(database=database) as session:
            result = session.write_transaction(
                self._generic_action, nodeID, label, parentID, relationship, attributes
            )

            return result

query(query, database=None)

Executes a read Cypher query on the specified database or the default database if none is provided.

Parameters:

Name Type Description Default
query str

The Cypher query to be executed.

required
database str

The name of the database to run the query against. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the query.

Source code in arkiverse\assets\database.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def query(self, query: str, database: str = None) -> Any:
    """
    Executes a read Cypher query on the specified database or the default database if none is provided.

    Args:
        query: The Cypher query to be executed.
        database: The name of the database to run the query against. Defaults to None.

    Returns:
        Any: The result of the query.
    """

    if database is None:
        with self.driver.session() as session:
            result = session.read_transaction(self._query, query)
    else:
        with self.driver.session(database=database) as session:
            result = session.read_transaction(self._query, query)
    return result

query_write(query, database=None)

Executes a write Cypher query on the specified database or the default database if none is provided.

Parameters:

Name Type Description Default
query str

The Cypher query to be executed.

required
database str

The name of the database to run the query against. Defaults to None.

None

Returns:

Name Type Description
dict dict

The result of the query as a dictionary.

Source code in arkiverse\assets\database.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def query_write(self, query: str, database: str = None) -> dict:
    """
    Executes a write Cypher query on the specified database or the default database if none is provided.

    Args:
        query: The Cypher query to be executed.
        database: The name of the database to run the query against. Defaults to None.

    Returns:
        dict: The result of the query as a dictionary.
    """

    if database is None:
        with self.driver.session() as session:
            result = session.run(query)
            record = result.single()
            return record
    else:
        with self.driver.session(database=database) as session:
            result = session.run(query)
            record = result.single()
            return record

update_metadata(nodeID, attributes, database=None)

Update the metadata for an existing node in the database.

Parameters:

Name Type Description Default
nodeID str

The ID of the node to be updated.

required
attributes dict

A dictionary of attributes to be set on the node.

required
database str

The name of the database to use. Defaults to None.

None

Returns:

Name Type Description
Any Any

The result of the transaction.

Source code in arkiverse\assets\database.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def update_metadata(self, nodeID: str, attributes: dict, database: str = None) -> Any:
    """
    Update the metadata for an existing node in the database.

    Args:
        nodeID: The ID of the node to be updated.
        attributes: A dictionary of attributes to be set on the node.
        database: The name of the database to use. Defaults to None.

    Returns:
        Any: The result of the transaction.
    """

    if database is None:
        with self.driver.session() as session:
            result = session.write_transaction(self._update_metadata, nodeID, attributes)

            return result
    else:
        with self.driver.session(database=database) as session:
            result = session.write_transaction(self._update_metadata, nodeID, attributes)

            return result

assets.ml

This module provides general classes for handling datasets, data loaders, and models using PyTorch and Hugging Face's Transformers library.

Classes:

Name Description
BaseDataset

General dataset class for preprocessing and tokenizing data samples to be inherited by specific datasets.

BaseDataLoader

General data loader class to handle data batching.

BaseModel

General model class for both Hugging Face and PyTorch models.

BaseDataLoader

General DataLoader class to handle data batching.

Source code in arkiverse\assets\ml.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class BaseDataLoader:
    """
    General DataLoader class to handle data batching.
    """

    def __init__(self, dataset: Dataset, batch_size: int, shuffle: bool = True):
        """
        Args:
            dataset: Dataset instance.
            batch_size: Number of samples per batch.
            shuffle: Whether to shuffle the dataset each epoch.
        """
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

    def get_loader(self) -> DataLoader:
        """
        Returns the PyTorch DataLoader instance.
        """

        return self.loader

__init__(dataset, batch_size, shuffle=True)

Parameters:

Name Type Description Default
dataset Dataset

Dataset instance.

required
batch_size int

Number of samples per batch.

required
shuffle bool

Whether to shuffle the dataset each epoch.

True
Source code in arkiverse\assets\ml.py
74
75
76
77
78
79
80
81
82
83
84
def __init__(self, dataset: Dataset, batch_size: int, shuffle: bool = True):
    """
    Args:
        dataset: Dataset instance.
        batch_size: Number of samples per batch.
        shuffle: Whether to shuffle the dataset each epoch.
    """
    self.dataset = dataset
    self.batch_size = batch_size
    self.shuffle = shuffle
    self.loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

get_loader()

Returns the PyTorch DataLoader instance.

Source code in arkiverse\assets\ml.py
86
87
88
89
90
91
def get_loader(self) -> DataLoader:
    """
    Returns the PyTorch DataLoader instance.
    """

    return self.loader

BaseDataset

Bases: Dataset

General dataset class to be inherited by specific datasets.

Source code in arkiverse\assets\ml.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class BaseDataset(Dataset):
    """
    General dataset class to be inherited by specific datasets.
    """

    def __init__(self, data: list, tokenizer: PreTrainedTokenizer = None, max_length: int = None, transform: callable = None):
        """
        Args:
            data: List of data samples (e.g., texts, features, etc.).
            tokenizer: Hugging Face tokenizer for preprocessing.
            max_length: Maximum sequence length for tokenization.
            transform: Transformations for general PyTorch models (e.g., image augmentations).
        """

        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.transform = transform


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """
        Retrieve and preprocess the data sample.
        """

        sample = self.data[idx]
        if self.tokenizer:
            encoded = self.tokenizer(
                sample, padding="max_length", truncation=True, max_length=self.max_length, return_tensors="pt"
            )
            return {key: val.squeeze(0) for key, val in encoded.items()}
        elif self.transform:
            return self.transform(sample)
        else:
            return sample

__getitem__(idx)

Retrieve and preprocess the data sample.

Source code in arkiverse\assets\ml.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __getitem__(self, idx):
    """
    Retrieve and preprocess the data sample.
    """

    sample = self.data[idx]
    if self.tokenizer:
        encoded = self.tokenizer(
            sample, padding="max_length", truncation=True, max_length=self.max_length, return_tensors="pt"
        )
        return {key: val.squeeze(0) for key, val in encoded.items()}
    elif self.transform:
        return self.transform(sample)
    else:
        return sample

__init__(data, tokenizer=None, max_length=None, transform=None)

Parameters:

Name Type Description Default
data list

List of data samples (e.g., texts, features, etc.).

required
tokenizer PreTrainedTokenizer

Hugging Face tokenizer for preprocessing.

None
max_length int

Maximum sequence length for tokenization.

None
transform callable

Transformations for general PyTorch models (e.g., image augmentations).

None
Source code in arkiverse\assets\ml.py
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, data: list, tokenizer: PreTrainedTokenizer = None, max_length: int = None, transform: callable = None):
    """
    Args:
        data: List of data samples (e.g., texts, features, etc.).
        tokenizer: Hugging Face tokenizer for preprocessing.
        max_length: Maximum sequence length for tokenization.
        transform: Transformations for general PyTorch models (e.g., image augmentations).
    """

    self.data = data
    self.tokenizer = tokenizer
    self.max_length = max_length
    self.transform = transform

BaseModel

General model class for both Hugging Face and PyTorch models.

Source code in arkiverse\assets\ml.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class BaseModel:
    """
    General model class for both Hugging Face and PyTorch models.
    """

    def __init__(self, model, device: str = "cpu"):
        """
        Args:
            model: Pretrained Hugging Face model or general PyTorch model.
            device: Device for computation ('cpu' or 'cuda').
        """

        self.model = model.to(device)
        self.device = device


    def predict(self, inputs):
        """
        Perform inference on the inputs.

        Args:
            inputs (dict or torch.Tensor): Tokenized inputs (for Hugging Face) or batched tensor inputs (for PyTorch).
        Returns:
            Outputs from the model.
        """

        self.model.eval()
        with torch.no_grad():
            if isinstance(inputs, dict):
                inputs = {key: val.to(self.device) for key, val in inputs.items()}
                outputs = self.model(**inputs)
            else:
                inputs = inputs.to(self.device)
                outputs = self.model(inputs)

        return outputs


    def save(self, path: str):
        """
        Save the model to the specified path.
        """

        if isinstance(self.model, PreTrainedModel):
            self.model.save_pretrained(path)
        else:
            torch.save(self.model.state_dict(), path)


    @classmethod
    def load(cls, path: str, model_class: type, device: str = "cpu") -> 'BaseModel':
        """
        Load the model from a specified path.

        Args:
            path (str): Path to the saved model.
            model_class: Class of the model to be loaded.
            device (str): Device for computation ('cpu' or 'cuda').

        Returns:
            BaseModel: An instance of the BaseModel class with the loaded model.
        """

        if issubclass(model_class, PreTrainedModel):
            model = model_class.from_pretrained(path)
        else:
            model = model_class()
            model.load_state_dict(torch.load(path))

        return cls(model, device=device)

__init__(model, device='cpu')

Parameters:

Name Type Description Default
model

Pretrained Hugging Face model or general PyTorch model.

required
device str

Device for computation ('cpu' or 'cuda').

'cpu'
Source code in arkiverse\assets\ml.py
 99
100
101
102
103
104
105
106
107
def __init__(self, model, device: str = "cpu"):
    """
    Args:
        model: Pretrained Hugging Face model or general PyTorch model.
        device: Device for computation ('cpu' or 'cuda').
    """

    self.model = model.to(device)
    self.device = device

load(path, model_class, device='cpu') classmethod

Load the model from a specified path.

Parameters:

Name Type Description Default
path str

Path to the saved model.

required
model_class type

Class of the model to be loaded.

required
device str

Device for computation ('cpu' or 'cuda').

'cpu'

Returns:

Name Type Description
BaseModel BaseModel

An instance of the BaseModel class with the loaded model.

Source code in arkiverse\assets\ml.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@classmethod
def load(cls, path: str, model_class: type, device: str = "cpu") -> 'BaseModel':
    """
    Load the model from a specified path.

    Args:
        path (str): Path to the saved model.
        model_class: Class of the model to be loaded.
        device (str): Device for computation ('cpu' or 'cuda').

    Returns:
        BaseModel: An instance of the BaseModel class with the loaded model.
    """

    if issubclass(model_class, PreTrainedModel):
        model = model_class.from_pretrained(path)
    else:
        model = model_class()
        model.load_state_dict(torch.load(path))

    return cls(model, device=device)

predict(inputs)

Perform inference on the inputs.

Parameters:

Name Type Description Default
inputs dict or Tensor

Tokenized inputs (for Hugging Face) or batched tensor inputs (for PyTorch).

required

Returns: Outputs from the model.

Source code in arkiverse\assets\ml.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def predict(self, inputs):
    """
    Perform inference on the inputs.

    Args:
        inputs (dict or torch.Tensor): Tokenized inputs (for Hugging Face) or batched tensor inputs (for PyTorch).
    Returns:
        Outputs from the model.
    """

    self.model.eval()
    with torch.no_grad():
        if isinstance(inputs, dict):
            inputs = {key: val.to(self.device) for key, val in inputs.items()}
            outputs = self.model(**inputs)
        else:
            inputs = inputs.to(self.device)
            outputs = self.model(inputs)

    return outputs

save(path)

Save the model to the specified path.

Source code in arkiverse\assets\ml.py
132
133
134
135
136
137
138
139
140
def save(self, path: str):
    """
    Save the model to the specified path.
    """

    if isinstance(self.model, PreTrainedModel):
        self.model.save_pretrained(path)
    else:
        torch.save(self.model.state_dict(), path)

assets.objects

This module defines the core objects used in the Arkiverse framework.

Classes:

Name Description
Parser

Enum for different types of parsers.

Object

Base object in Arkiverse framework.

File

Represents a file in the Arkiverse framework.

Ontology

Describes the ontology chosen for mapping.

Document

Represents a document in the Arkiverse framework.

Document

Bases: Object

Document object in Arkiverse framework.

Attributes:

Name Type Description
metadata dict

A dictionary containing metadata information about the document.

ontology Ontology

An instance of the Ontology class representing the document's ontology.

Source code in arkiverse\assets\objects.py
88
89
90
91
92
93
94
95
96
97
98
99
@dataclass(kw_only=True)
class Document(Object):
    """
    Document object in Arkiverse framework.

    Attributes:
        metadata (dict): A dictionary containing metadata information about the document.
        ontology (Ontology): An instance of the Ontology class representing the document's ontology.
    """

    metadata: dict
    ontology: Ontology

File

Bases: Object

File object in Arkiverse framework.

Attributes:

Name Type Description
fileType str

Type of the file (e.g., pdf, docx).

path str

Path to the file in the filesystem.

Source code in arkiverse\assets\objects.py
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass(kw_only=True)   
class File(Object):
    """
    File object in Arkiverse framework.

    Attributes:
        fileType (str): Type of the file (e.g., pdf, docx).
        path (str): Path to the file in the filesystem.
    """

    fileType: str
    path: str 
    meta: field(default_factory=dict)

Object

Base object in Arkiverse framework.

Attributes:

Name Type Description
nodeID str

Unique identifier for the object.

origin str

Origin of the object.

Source code in arkiverse\assets\objects.py
44
45
46
47
48
49
50
51
52
53
54
55
@dataclass(kw_only=True)
class Object:
    """
    Base object in Arkiverse framework.

    Attributes:
        nodeID (str): Unique identifier for the object.
        origin (str): Origin of the object.
    """

    nodeID: str
    origin: str

Ontology

Describes the ontology chosen for mapping.

Attributes:

Name Type Description
name str

Name of the ontology.

ID str

Unique identifier for the ontology.

namespace str

Namespace of the ontology.

Source code in arkiverse\assets\objects.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@dataclass(kw_only=True)
class Ontology:
    """
    Describes the ontology chosen for mapping.

    Attributes:
        name (str): Name of the ontology.
        ID (str): Unique identifier for the ontology.
        namespace (str): Namespace of the ontology.
    """

    name: str
    ID: str
    namespace: str

Parser

Bases: Enum

Parser is an enumeration that defines the available parsers for processing documents.

Attributes:

Name Type Description
TIKA str

Represents the Tika parser.

TESSERACT str

Represents the Tesseract parser.

Source code in arkiverse\assets\objects.py
29
30
31
32
33
34
35
36
37
38
39
class Parser(Enum):
    """
    Parser is an enumeration that defines the available parsers for processing documents.

    Attributes:
        TIKA (str): Represents the Tika parser.
        TESSERACT (str): Represents the Tesseract parser.
    """

    TIKA = "tika"
    TESSERACT = "tesseract"

file

This package provides utilities for file conversion and metadata extraction.

Modules:

Name Description
_convert

Functions for converting file formats.

convert

Contains a function that orchestrates functions from _convert module.

extract_metadata

Functions for extracting metadata from files.

file.convert

Module for converting various document types to different formats. This module provides a function convert that allows converting documents to specified formats such as image, text, pdf, and pacer. The conversion functions are imported from a private module _convert.

convert(doc_object, convert_to, output_dir=None)

Convert a document to a specified format.

Parameters:

Name Type Description Default
doc_object File

The document object to be converted.

required
convert_to str

The format to convert the document to. Options include "image", "text", "pdf", "pacer".

required
output_dir Optional[Union[str, Path]]

The directory to save the converted file. Defaults to None.

None

Returns:

Type Description
Union[str, Path]

Union[str, Path]: The path to the converted file or the converted content as a string.

Raises:

Type Description
NotImplementedError

If the conversion type or file type is not supported.

Source code in arkiverse\file\convert.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def convert(doc_object: File, convert_to: str, output_dir: Optional[Union[str, Path]] = None) -> Union[str, Path]:
    """
    Convert a document to a specified format.

    Args:
        doc_object (File): The document object to be converted.
        convert_to (str): The format to convert the document to.
                          Options include "image", "text", "pdf", "pacer".
        output_dir (Optional[Union[str, Path]]): The directory to save the converted file.
                                                 Defaults to None.

    Returns:
        Union[str, Path]: The path to the converted file or the converted content as a string.

    Raises:
        NotImplementedError: If the conversion type or file type is not supported.
    """

    conversion_map = {
        "pdf": {
            "image": _convert_pdf_to_image,
            "text": _convert_pdf_to_text,
        },
        "word": {
            "pdf": _convert_ms_to_pdf,
            "text": _convert_ms_to_text,
        },
        "powerpoint": {
            "pdf": _convert_ms_to_pdf,
            "text": _convert_ms_to_text,
        },
        "excel": {
            "pdf": _convert_ms_to_pdf,
            "text": _convert_ms_to_text,
        },
        "html": {
            "pacer": _convert_pacer_to_json,
            "text": _convert_html_to_text,
        },
        "markdown": {
            "text": _convert_markdown_to_text,
        },
    }

    try:
        convert_function = conversion_map[doc_object.fileType][convert_to]
    except KeyError as exc:
        raise NotImplementedError(f"Conversion from {doc_object.fileType} to {convert_to} is not supported.") from exc

    return convert_function(doc_object, output_dir)