Skip to content
43 changes: 36 additions & 7 deletions sdk/basyx/aas/backend/couchdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,17 @@ def get_identifiable_by_couchdb_id(self, couchdb_id: str) -> model.Identifiable:
raise KeyError("No Identifiable with couchdb-id {} found in CouchDB database".format(couchdb_id)) from e
raise

# Add CouchDB metadata (for later commits) to object
obj = data['data']
if not isinstance(obj, model.Identifiable):
raise CouchDBResponseError("The CouchDB document with id {} does not contain an identifiable AAS object."
.format(couchdb_id))
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(couchdb_id, safe='')),
data["_rev"])

# If we still have a local replication of that object (since it is referenced from anywhere else), update that
# replication and return it.
with self._object_cache_lock:
if obj.id in self._object_cache:
old_obj = self._object_cache[obj.id]
old_obj.update_from(obj)
return old_obj
self._object_cache[obj.id] = obj
return self._object_cache[obj.id]
self._object_cache[obj.id] = obj
return obj

def get_item(self, identifier: model.Identifier) -> model.Identifiable:
Expand All @@ -186,6 +181,9 @@ def get_item(self, identifier: model.Identifier) -> model.Identifiable:
:raises CouchDBError: If error occur during the request to the CouchDB server
(see ``_do_request()`` for details)
"""
with self._object_cache_lock:
if identifier in self._object_cache:
return self._object_cache[identifier]
try:
return self.get_identifiable_by_couchdb_id(self._transform_id(identifier, False))
except KeyError as e:
Expand Down Expand Up @@ -220,6 +218,37 @@ def add(self, x: model.Identifiable) -> None:
with self._object_cache_lock:
self._object_cache[x.id] = x

def commit(self, x: model.Identifiable) -> None:
"""
Write the current in-memory state of a stored object back to the CouchDB.

:param x: The object to persist
:raises KeyError: If the object is not present in the store or no revision is known
:raises CouchDBConflictError: If the object was modified in the database since it was last fetched
:raises CouchDBError: If error occur during the request to the CouchDB server
(see ``_do_request()`` for details)
"""
doc_url = "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.id))
rev = get_couchdb_revision(doc_url)
if rev is None:
Comment thread
zrgt marked this conversation as resolved.
raise KeyError("No revision found for object with id {} — not fetched from this store".format(x.id))
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
try:
response = self._do_request(
"{}?rev={}".format(doc_url, rev),
'PUT',
{'Content-type': 'application/json'},
data.encode('utf-8'))
set_couchdb_revision(doc_url, response["rev"])
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.id)) from e
elif e.code == 409:
raise CouchDBConflictError(
"Object with id {} has been modified in the database since it was last fetched."
.format(x.id)) from e
raise

def discard(self, x: model.Identifiable, safe_delete=False) -> None:
"""
Delete an :class:`~basyx.aas.model.base.Identifiable` AAS object from the CouchDB database
Expand Down
57 changes: 46 additions & 11 deletions sdk/basyx/aas/backend/local_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import os
import hashlib
import tempfile
import threading
import warnings
import weakref
Expand All @@ -31,6 +32,13 @@ class LocalFileIdentifiableStore(model.AbstractObjectStore[model.Identifier, mod
"""
An ObjectStore implementation for :class:`~basyx.aas.model.base.Identifiable` BaSyx Python SDK objects backed
by a local file based local backend

.. warning::
This backend is intended for development and testing only. It provides no
concurrency control across processes: concurrent writes to the same object
(e.g. under a multi-worker WSGI server) will silently overwrite each other,
with the last writer winning and no error raised. Use a dedicated database
backend for any production deployment.
"""
def __init__(self, directory_path: str):
"""
Expand Down Expand Up @@ -68,21 +76,16 @@ def get_identifiable_by_hash(self, hash_: str) -> model.Identifiable:

:raises KeyError: If the respective file could not be found
"""
# Try to get the correct file
try:
with open("{}/{}.json".format(self.directory_path, hash_), "r") as file:
data = json.load(file, cls=json_deserialization.AASFromJsonDecoder)
obj = data["data"]
except FileNotFoundError as e:
raise KeyError("No Identifiable with hash {} found in local file database".format(hash_)) from e
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
# replication and return it.
with self._object_cache_lock:
if obj.id in self._object_cache:
old_obj = self._object_cache[obj.id]
old_obj.update_from(obj)
return old_obj
self._object_cache[obj.id] = obj
return self._object_cache[obj.id]
self._object_cache[obj.id] = obj
return obj

def get_item(self, identifier: model.Identifier) -> model.Identifiable:
Expand All @@ -91,11 +94,33 @@ def get_item(self, identifier: model.Identifier) -> model.Identifiable:

:raises KeyError: If the respective file could not be found
"""
with self._object_cache_lock:
if identifier in self._object_cache:
return self._object_cache[identifier]
try:
return self.get_identifiable_by_hash(self._transform_id(identifier))
except KeyError as e:
raise KeyError("No Identifiable with id {} found in local file database".format(identifier)) from e

def _write_atomic(self, x: model.Identifiable) -> None:
"""
Serialize x to a temp file in the store directory, then atomically replace the final file.

Using os.replace() (rename(2) on POSIX) ensures readers always see a complete file — never
a partially-written one from a crash or concurrent access mid-write.
"""
final_path = "{}/{}.json".format(self.directory_path, self._transform_id(x.id))
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.directory_path, suffix=".tmp")
try:
with os.fdopen(tmp_fd, "w") as tmp_file:
json.dump({"data": x}, tmp_file, cls=json_serialization.AASToJsonEncoder, indent=4)
os.replace(tmp_path, final_path)
# Catch all `Exception`s, as well as `KeyboardInterrupt` and `SystemExit` too, so the temp
# file is never left behind even if the process is being torn down:
except BaseException:
os.unlink(tmp_path)
raise

def add(self, x: model.Identifiable) -> None:
"""
Add an object to the store
Expand All @@ -105,10 +130,20 @@ def add(self, x: model.Identifiable) -> None:
logger.debug("Adding object %s to Local File Store ...", repr(x))
if os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
raise KeyError("Identifiable with id {} already exists in local file database".format(x.id))
with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file:
json.dump({"data": x}, file, cls=json_serialization.AASToJsonEncoder, indent=4)
with self._object_cache_lock:
self._object_cache[x.id] = x
self._write_atomic(x)
with self._object_cache_lock:
self._object_cache[x.id] = x

def commit(self, x: model.Identifiable) -> None:
Comment thread
s-heppner marked this conversation as resolved.
"""
Write the current in-memory state of a stored object back to its file.

:param x: The object to persist
:raises KeyError: If the object is not present in the store
"""
if not os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
raise KeyError("No AAS object with id {} exists in local file database".format(x.id))
self._write_atomic(x)

def discard(self, x: model.Identifiable) -> None:
"""
Expand Down
18 changes: 18 additions & 0 deletions sdk/basyx/aas/model/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ class AbstractObjectStore(AbstractObjectProvider[_KEY, _VALUE], MutableSet[_VALU
def __init__(self):
pass

def commit(self, x: _VALUE) -> None:
Comment thread
s-heppner marked this conversation as resolved.
"""
Persist an in-memory mutation of a stored object back to the underlying storage.

Persistent backends (e.g. file-based or database-backed stores) must override this to
write the updated object back to storage. In-memory stores should override this with an
explicit no-op to make the intent clear.

:param x: The object whose current in-memory state should be persisted
"""
raise NotImplementedError()

def update(self, other: Iterable[_VALUE]) -> None:
for x in other:
self.add(x)
Expand Down Expand Up @@ -146,6 +158,9 @@ def add(self, x: _IDENTIFIABLE) -> None:
.format(x.id))
self._backend[x.id] = x

def commit(self, x: _IDENTIFIABLE) -> None:
pass

def discard(self, x: _IDENTIFIABLE) -> None:
if self._backend.get(x.id) is x:
del self._backend[x.id]
Expand Down Expand Up @@ -223,6 +238,9 @@ def add(self, x: _IDENTIFIABLE) -> None:
else:
raise KeyError(f"Identifiable object with same id {x.id} is already stored in this store")

def commit(self, x: _IDENTIFIABLE) -> None:
pass

def discard(self, x: _IDENTIFIABLE) -> None:
self._backend.discard(x)

Expand Down
27 changes: 27 additions & 0 deletions sdk/test/backend/test_local_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# the LICENSE file of this project.
#
# SPDX-License-Identifier: MIT
import gc
import os.path
import shutil

Expand Down Expand Up @@ -134,6 +135,32 @@ def test_iter_ignores_non_json_files(self) -> None:
self.assertEqual(5, len(items))
os.remove(stray)

def test_mutation_persistence(self) -> None:
submodel = model.Submodel(
id_='https://example.org/MutationTest',
submodel_element={
model.Property(id_short='Prop', value_type=model.datatypes.String, value='before')
}
)
self.identifiable_store.add(submodel)

retrieved = self.identifiable_store.get_item('https://example.org/MutationTest')
assert isinstance(retrieved, model.Submodel)
prop = retrieved.get_referable(['Prop'])
assert isinstance(prop, model.Property)
prop.update_from(model.Property(id_short='Prop', value_type=model.datatypes.String, value='after'))
self.identifiable_store.commit(retrieved)

# Drop all strong references to evict the WeakValueDictionary cache
del submodel, retrieved, prop
gc.collect()

fresh = self.identifiable_store.get_item('https://example.org/MutationTest')
assert isinstance(fresh, model.Submodel)
fresh_prop = fresh.get_referable(['Prop'])
assert isinstance(fresh_prop, model.Property)
self.assertEqual('after', fresh_prop.value)

def test_reload_discard(self) -> None:
# Load example submodel
example_submodel = create_example_submodel()
Expand Down
26 changes: 19 additions & 7 deletions server/app/backend/local_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,15 @@ def get_descriptor_by_hash(self, hash_: str) -> _DESCRIPTOR_TYPE:

:raises KeyError: If the respective file could not be found
"""
# Try to get the correct file
try:
with open("{}/{}.json".format(self.directory_path, hash_), "r") as file:
obj = json.load(file, cls=jsonization.ServerAASFromJsonDecoder)
except FileNotFoundError as e:
raise KeyError("No Descriptor with hash {} found in local file database".format(hash_)) from e
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
# replication and return it.
with self._object_cache_lock:
if obj.id in self._object_cache:
old_obj = self._object_cache[obj.id]
old_obj.update_from(obj)
return old_obj
self._object_cache[obj.id] = obj
return self._object_cache[obj.id]
self._object_cache[obj.id] = obj
return obj

def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE:
Expand All @@ -89,6 +84,9 @@ def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE:

:raises KeyError: If the respective file could not be found
"""
with self._object_cache_lock:
if identifier in self._object_cache:
return self._object_cache[identifier]
try:
return self.get_descriptor_by_hash(self._transform_id(identifier))
except KeyError as e:
Expand All @@ -113,6 +111,20 @@ def add(self, x: _DESCRIPTOR_TYPE) -> None:
with self._object_cache_lock:
self._object_cache[x.id] = x

def commit(self, x: _DESCRIPTOR_TYPE) -> None:
"""
Write the current in-memory state of a stored descriptor back to its file.

:param x: The descriptor to persist
:raises KeyError: If the descriptor is not present in the store
"""
if not os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
raise KeyError("No AAS Descriptor object with id {} exists in local file database".format(x.id))
with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file:
serialized = json.loads(json.dumps(x, cls=jsonization.ServerAASToJsonEncoder))
serialized["modelType"] = DESCRIPTOR_TYPE_TO_STRING[type(x)]
json.dump(serialized, file, indent=4)

def discard(self, x: _DESCRIPTOR_TYPE) -> None:
"""
Delete an :class:`~app.model.descriptor.Descriptor` AAS object from the local file store
Expand Down
20 changes: 10 additions & 10 deletions server/app/interfaces/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def post_aas_descriptor(
self.object_store.add(descriptor)
except KeyError as e:
raise Conflict(f"AssetAdministrationShellDescriptor with Identifier {descriptor.id} already exists!") from e
descriptor.commit()
self.object_store.commit(descriptor)
created_resource_url = map_adapter.build(
self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True
)
Expand All @@ -202,12 +202,12 @@ def put_aas_descriptor_by_id(
request, server_model.AssetAdministrationShellDescriptor, is_stripped_request(request)
)
)
descriptor.commit()
self.object_store.commit(descriptor)
return response_t()
except NotFound:
descriptor = HTTPApiDecoder.request_body(request, server_model.AssetAdministrationShellDescriptor, False)
self.object_store.add(descriptor)
descriptor.commit()
self.object_store.commit(descriptor)
created_resource_url = map_adapter.build(
self.get_aas_descriptor_by_id, {"aas_id": descriptor.id}, force_external=True
)
Expand Down Expand Up @@ -247,7 +247,7 @@ def post_submodel_descriptor_through_superpath(
if any(sd.id == submodel_descriptor.id for sd in aas_descriptor.submodel_descriptors):
raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!")
aas_descriptor.submodel_descriptors.append(submodel_descriptor)
aas_descriptor.commit()
self.object_store.commit(aas_descriptor)
created_resource_url = map_adapter.build(
self.get_submodel_descriptor_by_id_through_superpath,
{"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id},
Expand All @@ -269,14 +269,14 @@ def put_submodel_descriptor_by_id_through_superpath(
submodel_descriptor.update_from(
HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request))
)
aas_descriptor.commit()
self.object_store.commit(aas_descriptor)
return response_t()
except NotFound:
submodel_descriptor = HTTPApiDecoder.request_body(
request, server_model.SubmodelDescriptor, is_stripped_request(request)
)
aas_descriptor.submodel_descriptors.append(submodel_descriptor)
aas_descriptor.commit()
self.object_store.commit(aas_descriptor)
created_resource_url = map_adapter.build(
self.get_submodel_descriptor_by_id_through_superpath,
{"aas_id": aas_descriptor.id, "submodel_id": submodel_descriptor.id},
Expand All @@ -293,7 +293,7 @@ def delete_submodel_descriptor_by_id_through_superpath(
if submodel_descriptor is None:
raise NotFound(f"Submodel Descriptor with Identifier {submodel_id} not found in AssetAdministrationShell!")
aas_descriptor.submodel_descriptors.remove(submodel_descriptor)
aas_descriptor.commit()
self.object_store.commit(aas_descriptor)
return response_t()

# ------ Submodel REGISTRY ROUTES -------
Expand Down Expand Up @@ -321,7 +321,7 @@ def post_submodel_descriptor(
self.object_store.add(submodel_descriptor)
except KeyError as e:
raise Conflict(f"Submodel Descriptor with Identifier {submodel_descriptor.id} already exists!") from e
submodel_descriptor.commit()
self.object_store.commit(submodel_descriptor)
created_resource_url = map_adapter.build(
self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True
)
Expand All @@ -335,14 +335,14 @@ def put_submodel_descriptor_by_id(
submodel_descriptor.update_from(
HTTPApiDecoder.request_body(request, server_model.SubmodelDescriptor, is_stripped_request(request))
)
submodel_descriptor.commit()
self.object_store.commit(submodel_descriptor)
return response_t()
except NotFound:
submodel_descriptor = HTTPApiDecoder.request_body(
request, server_model.SubmodelDescriptor, is_stripped_request(request)
)
self.object_store.add(submodel_descriptor)
submodel_descriptor.commit()
self.object_store.commit(submodel_descriptor)
created_resource_url = map_adapter.build(
self.get_submodel_descriptor_by_id, {"submodel_id": submodel_descriptor.id}, force_external=True
)
Expand Down
Loading
Loading