Add AsyncPickleDB class. Updated README docs for new class.

Commit f307063 · patx · 2025-02-06T21:04:37-05:00

Changeset
f30706303af90d97177e04518dc84fa079491704
Parents
59245eb38ac4edbc974e077034471a456542b78e

View source at this commit

Comments

No comments yet.

Log in to comment

Diff

diff --git a/README.md b/README.md
index ddfe095..cd1043e 100644
--- a/README.md
+++ b/README.md
@@ -436,42 +436,94 @@ except KeyboardInterrupt:
     pass
 ```
 
-### ***Async For Web Frameworks***
-For frameworks like FastAPI, use async wrappers to handle requests without blocking the server:
+## **Using AsyncPickleDB**
 
+In addition to the standard `PickleDB` class, pickleDB now includes an asynchronous version called `AsyncPickleDB`. This class provides non-blocking operations, making it ideal for high-performance applications that require concurrent access to a key-value store without blocking the event loop. Ensure you have `aiofiles` installed, as it is required for async file operations:
+
+### **Initializing an Async Database**
 ```python
-from fastapi import FastAPI
 import asyncio
-from pickledb import PickleDB
+from pickledb import AsyncPickleDB
 
-app = FastAPI()
-db = PickleDB('web_db.db')
+async def main():
+    # Initialize an async database
+    db = AsyncPickleDB("async_db.json", batch_size=5, cleanup_interval=10)
 
[email protected]("/get/{key}")
-async def get_key(key: str):
-    loop = asyncio.get_event_loop()
-    value = await loop.run_in_executor(None, db.get, key)
-    return {"key": key, "value": value}
+    # Set a value
+    await db.set("username", "admin")
 
[email protected]("/set/")
-async def set_key(key: str, value: str):
-    loop = asyncio.get_event_loop()
-    await loop.run_in_executor(None, db.set, key, value)
-    db.save()
-    return {"message": "Key-value pair saved!"}
+    # Retrieve a value
+    value = await db.get("username")
+    print(value)  # Output: admin
+
+    # Save the database asynchronously
+    await db._save_batch()
+
+asyncio.run(main())
+```
+
+### **Key Features of AsyncPickleDB**
+- **Asynchronous Reads and Writes**: No blocking operations, ensuring smooth performance in async applications.
+- **Batch Writes**: Entries are written in batches to optimize disk I/O.
+- **Automatic Cleanup**: The database compacts itself periodically to remove stale entries.
+
+### **Async CRUD Operations**
+
+#### **`set(key, value)` - Store a Key-Value Pair**
+```python
+await db.set("language", "Python")
+```
+
+#### **`get(key)` - Retrieve a Value**
+```python
+value = await db.get("language")
+print(value)  # Output: Python
+```
+
+#### **`remove(key)` - Delete a Key**
+```python
+await db.remove("language")
+```
+
+#### **`purge()` - Clear All Data**
+```python
+await db.purge()
+```
+
+#### **`all()` - Get All Keys**
+```python
+keys = await db.all()
+print(keys)  # Output: [list of stored keys]
+```
+
+### **Example: Using AsyncPickleDB in a ASGI Application**
+```python
+from MicroPie import App
+from pickledb import AsyncPickleDB
+
+db = AsyncPickleDB("fastapi_db.json", batch_size=500)
+
+class Root(App):
+
+    async def get(self, key):
+        value = await db.get(key)
+        return f'<b>{key}:</b> {value}'
+
+    async def set(self, key, value):
+        await db.set(key, value)
+        return self._redirect('/get/{key}')
+
+app = Root()
 ```
 
-### **Asynchronous Operations**
-Want non-blocking saves? Thread-saftey? What about async execution? You can implement an async wrappers to handle saves in the background and more. This is particularly useful for applications that need high responsiveness without delaying due to disk operations, like small web applications. Check out examples [here](https://gist.github.com/patx/5c12d495ff142f3262325eeae81eb000).
 
 ## **Limitations**
 
 While pickleDB is powerful, it’s important to understand its limitations:
 
 - **Memory Usage**: The entire dataset is loaded into memory, which might be a constraint on systems with limited RAM for extremely large datasets.
-- **Single-Threaded**: The program is not thread-safe. For concurrent access, use external synchronization like Python's `RLock()`.
-- **Blocking Saves**: Saves are blocking by default. To achieve non-blocking saves, use [asynchronous wrappers](https://gist.github.com/patx/5c12d495ff142f3262325eeae81eb000).
 - **Lack of Advanced Features**: pickleDB is designed for simplicity, so it may not meet the needs of applications requiring advanced database features.
+- **Async support in alpha stages**: `AsyncPickleDB` is not stable, expect breaking changes. Not for use in production yet.
 
 For projects requiring more robust solutions, consider alternatives like **[kenobiDB](Https://github.com/patx/kenobi)**, [Redis](http://redis.io/), [SQLite](https://www.sqlite.org/), or [MongoDB](https://www.mongodb.com/).
 
diff --git a/pickledb-async.py b/pickledb-async.py
deleted file mode 100644
index c91a75b..0000000
--- a/pickledb-async.py
+++ /dev/null
@@ -1,196 +0,0 @@
-import os
-import asyncio
-import aiofiles
-import orjson
-
-
-class PickleDB:
-    """
-    A barebones, async, orjson-based key-value store.
-    """
-
-    def __init__(self, location, batch_size=1000, cleanup_interval=1000):
-        """
-        Initialize the PickleDB object.
-
-        Args:
-            location (str): Path to the JSON file.
-            batch_size (int): Number of operations to batch together.
-            cleanup_interval (int): Number of operations between cleanups.
-        """
-        self.location = os.path.expanduser(location)
-        self._lock = asyncio.Lock()
-        self.batch_size = batch_size
-        self.cleanup_interval = cleanup_interval
-        self._batch = []
-        self._operation_count = 0
-        self._cache = {}
-
-    async def __setitem__(self, key, value):
-        """
-        Allow the syntax db[key] = value.
-        """
-        return await self.set(key, value)
-
-    async def __getitem__(self, key):
-        """
-        Allow the syntax value = db[key].
-        """
-        return await self.get(key)
-
-    async def _save_batch(self):
-        """
-        Save a batch of entries to the file using JSON Lines format,
-        then trigger file compaction to remove stale entries.
-
-        Returns:
-            bool: True if save was successful, False otherwise.
-        """
-        try:
-            async with self._lock:
-                async with aiofiles.open(self.location, "ab") as f:
-                    data = b''.join([orjson.dumps(entry) + b'\n' for entry in self._batch])
-                    await f.write(data)
-                self._batch.clear()
-                self._operation_count += len(self._batch)
-            # Perform cleanup periodically
-            if self._operation_count >= self.cleanup_interval:
-                await self._cleanup_removed_keys()
-                self._operation_count = 0
-            return True
-        except Exception as e:
-            print(f"Failed to save batch: {e}")
-            return False
-
-    async def _cleanup_removed_keys(self):
-        """
-        Compact the file by reading all the operations (sets and removals),
-        applying them to derive the current state of the database,
-        and rewriting the file with only the latest state.
-        """
-        async with self._lock:
-            if not os.path.exists(self.location):
-                return
-            try:
-                state = {}
-                async with aiofiles.open(self.location, "rb") as infile:
-                    async for line in infile:
-                        try:
-                            entry = orjson.loads(line)
-                        except Exception as e:
-                            print(f"Error reading a line in cleanup: {e}")
-                            continue
-                        if "__remove__" in entry:
-                            rm_key = entry["__remove__"]
-                            if rm_key in state:
-                                del state[rm_key]
-                        else:
-                            state.update(entry)
-                async with aiofiles.open(self.location, "wb") as outfile:
-                    for key, value in state.items():
-                        await outfile.write(orjson.dumps({key: value}) + b'\n')
-                self._cache = state  # Update in-memory cache
-            except Exception as e:
-                print(f"Failed to cleanup removed keys: {e}")
-
-    async def set(self, key, value):
-        """
-        Set or update the value of a key.
-
-        Args:
-            key (any): The key to set (converted to string if needed).
-            value (any): The value to associate.
-
-        Returns:
-            bool: True if successful.
-        """
-        if not isinstance(key, str):
-            key = str(key)
-        self._batch.append({key: value})
-        self._cache[key] = value  # Update in-memory cache
-        if len(self._batch) >= self.batch_size:
-            return await self._save_batch()
-        return True
-
-    async def remove(self, key):
-        """
-        Mark a key for removal.
-
-        Args:
-            key (any): The key to remove (converted to string if needed).
-
-        Returns:
-            bool: True if the removal operation succeeds.
-        """
-        if not isinstance(key, str):
-            key = str(key)
-        self._batch.append({"__remove__": key})
-        if key in self._cache:
-            del self._cache[key]  # Update in-memory cache
-        if len(self._batch) >= self.batch_size:
-            return await self._save_batch()
-        return True
-
-    async def purge(self):
-        """
-        Clear the entire database by emptying the file.
-
-        Returns:
-            bool: True if purge is successful.
-        """
-        async with self._lock:
-            try:
-                async with aiofiles.open(self.location, "w") as f:
-                    await f.write("")
-                self._batch.clear()
-                self._cache.clear()
-                return True
-            except Exception as e:
-                print(f"Failed to purge database: {e}")
-                return False
-
-    async def get(self, key):
-        """
-        Retrieve the value associated with a given key.
-
-        Args:
-            key (any): The key to search for (converted to string if needed).
-
-        Returns:
-            any: The value if the key exists, or None otherwise.
-        """
-        if not isinstance(key, str):
-            key = str(key)
-        # Check in-memory cache first
-        if key in self._cache:
-            return self._cache[key]
-        async with self._lock:
-            if os.path.exists(self.location) and os.path.getsize(self.location) > 0:
-                try:
-                    async with aiofiles.open(self.location, "rb") as f:
-                        async for line in f:
-                            entry = orjson.loads(line)
-                            if key in entry:
-                                return entry[key]
-                except Exception as e:
-                    print(f"Failed to get key: {e}")
-            return None
-
-    async def all(self):
-        """
-        Retrieve a list of all keys present in the database.
-
-        Returns:
-            list: A list of keys with duplicates removed.
-        """
-        async with self._lock:
-            keys = set(self._cache.keys())
-            if os.path.exists(self.location) and os.path.getsize(self.location) > 0:
-                try:
-                    async with aiofiles.open(self.location, "rb") as f:
-                        async for line in f:
-                            entry = orjson.loads(line)
-                            keys.update(entry.keys())
-                except Exception as e:
-                    print(f"Failed to get all keys: {e}")
-        return list(keys)
diff --git a/pickledb.py b/pickledb.py
index 3535fb3..2b759b5 100644
--- a/pickledb.py
+++ b/pickledb.py
@@ -28,9 +28,10 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
-
+import asyncio
 import os
 
+import aiofiles
 import orjson
 
 
@@ -178,3 +179,195 @@ class PickleDB:
         """
         return list(self.db.keys())
 
+
+class AsyncPickleDB:
+    """
+    Provides an async version of pickleDB
+    """
+
+    def __init__(self, location, batch_size=1, cleanup_interval=5):
+        """
+        Initialize the PickleDB object.
+
+        Args:
+            location (str): Path to the JSON file.
+            batch_size (int): Number of operations to batch together.
+            cleanup_interval (int): Number of operations between cleanups.
+        """
+        self.location = os.path.expanduser(location)
+        self._lock = asyncio.Lock()
+        self.batch_size = batch_size
+        self.cleanup_interval = cleanup_interval
+        self._batch = []
+        self._operation_count = 0
+        self._cache = {}
+
+    async def __setitem__(self, key, value):
+        """
+        Allow the syntax db[key] = value.
+        """
+        return await self.set(key, value)
+
+    async def __getitem__(self, key):
+        """
+        Allow the syntax value = db[key].
+        """
+        return await self.get(key)
+
+    async def _save_batch(self):
+        """
+        Save a batch of entries to the file using JSON Lines format,
+        then trigger file compaction to remove stale entries.
+
+        Returns:
+            bool: True if save was successful, False otherwise.
+        """
+        try:
+            async with self._lock:
+                async with aiofiles.open(self.location, "ab") as f:
+                    data = b''.join([orjson.dumps(entry) + b'\n' for entry in self._batch])
+                    await f.write(data)
+                self._batch.clear()
+                self._operation_count += len(self._batch)
+            # Perform cleanup periodically
+            if self._operation_count >= self.cleanup_interval:
+                await self._cleanup_removed_keys()
+                self._operation_count = 0
+            return True
+        except Exception as e:
+            print(f"Failed to save batch: {e}")
+            return False
+
+    async def _cleanup_removed_keys(self):
+        """
+        Compact the file by reading all the operations (sets and removals),
+        applying them to derive the current state of the database,
+        and rewriting the file with only the latest state.
+        """
+        async with self._lock:
+            if not os.path.exists(self.location):
+                return
+            try:
+                state = {}
+                async with aiofiles.open(self.location, "rb") as infile:
+                    async for line in infile:
+                        try:
+                            entry = orjson.loads(line)
+                        except Exception as e:
+                            print(f"Error reading a line in cleanup: {e}")
+                            continue
+                        if "__remove__" in entry:
+                            rm_key = entry["__remove__"]
+                            if rm_key in state:
+                                del state[rm_key]
+                        else:
+                            state.update(entry)
+                async with aiofiles.open(self.location, "wb") as outfile:
+                    for key, value in state.items():
+                        await outfile.write(orjson.dumps({key: value}) + b'\n')
+                self._cache = state  # Update in-memory cache
+            except Exception as e:
+                print(f"Failed to cleanup removed keys: {e}")
+
+    async def set(self, key, value):
+        """
+        Set or update the value of a key.
+
+        Args:
+            key (any): The key to set (converted to string if needed).
+            value (any): The value to associate.
+
+        Returns:
+            bool: True if successful.
+        """
+        if not isinstance(key, str):
+            key = str(key)
+        self._batch.append({key: value})
+        self._cache[key] = value  # Update in-memory cache
+        if len(self._batch) >= self.batch_size:
+            return await self._save_batch()
+        return True
+
+    async def remove(self, key):
+        """
+        Mark a key for removal.
+
+        Args:
+            key (any): The key to remove (converted to string if needed).
+
+        Returns:
+            bool: True if the removal operation succeeds.
+        """
+        if not isinstance(key, str):
+            key = str(key)
+        self._batch.append({"__remove__": key})
+        if key in self._cache:
+            del self._cache[key]  # Update in-memory cache
+        if len(self._batch) >= self.batch_size:
+            return await self._save_batch()
+        return True
+
+    async def purge(self):
+        """
+        Clear the entire database by emptying the file.
+
+        Returns:
+            bool: True if purge is successful.
+        """
+        async with self._lock:
+            try:
+                async with aiofiles.open(self.location, "w") as f:
+                    await f.write("")
+                self._batch.clear()
+                self._cache.clear()
+                return True
+            except Exception as e:
+                print(f"Failed to purge database: {e}")
+                return False
+
+    async def get(self, key):
+        """
+        Retrieve the value associated with a given key.
+
+        Args:
+            key (any): The key to search for (converted to string if needed).
+
+        Returns:
+            any: The value if the key exists, or None otherwise.
+        """
+        if not isinstance(key, str):
+            key = str(key)
+        # Check in-memory cache first
+        if key in self._cache:
+            return self._cache[key]
+        async with self._lock:
+            if os.path.exists(self.location) and os.path.getsize(self.location) > 0:
+                try:
+                    async with aiofiles.open(self.location, "rb") as f:
+                        async for line in f:
+                            entry = orjson.loads(line)
+                            if key in entry:
+                                return entry[key]
+                except Exception as e:
+                    print(f"Failed to get key: {e}")
+            return None
+
+    async def all(self):
+        """
+        Retrieve a list of all keys present in the database.
+
+        Returns:
+            list: A list of keys with duplicates removed.
+        """
+        async with self._lock:
+            keys = set(self._cache.keys())
+            if os.path.exists(self.location) and os.path.getsize(self.location) > 0:
+                try:
+                    async with aiofiles.open(self.location, "rb") as f:
+                        async for line in f:
+                            entry = orjson.loads(line)
+                            keys.update(entry.keys())
+                except Exception as e:
+                    print(f"Failed to get all keys: {e}")
+        return list(keys)
+
diff --git a/setup.py b/setup.py
index 00572a0..1525dfa 100644
--- a/setup.py
+++ b/setup.py
@@ -38,8 +38,8 @@ Links
 * `Github Repo <https://github.com/patx/pickledb>`_
 
 
-Key Improvements in Version 1.0
-```````````````````````````````
+Key Improvements in Version 1.0+
+````````````````````````````````
 
 * pickleDB 1.0 is a reimagined version designed for speed, simplicity, and reliability. This version is NOT backwards compatible. Key changes include:
 * Atomic Saves: Ensures data integrity during writes, eliminating potential corruption issues.
@@ -47,14 +47,14 @@ Key Improvements in Version 1.0
 * Streamlined API: Removed legacy methods (e.g., `ladd`, `dmerge`) in favor of native Python operations.
 * Unified Handling of Data Types: Treats all Python-native types (lists, dicts, etc.) as first-class citizens.
 * Explicit Saves: The `auto_save` feature was removed to provide users greater control and optimize performance.
-
+* Added fully built in async class for use with even based applications.
 
 """
 
 from distutils.core import setup
 
 setup(name="pickleDB",
-    version="1.1.1",
+    version="1.2",
     description="A lightweight and simple database using json.",
     long_description=__doc__,
     author="Harrison Erd",
@@ -67,6 +67,6 @@ setup(name="pickleDB",
         "Intended Audience :: Developers",
         "Topic :: Database" ],
     py_modules=['pickledb'],
-    install_requires=['orjson'],
+    install_requires=['orjson', 'aiofiles'],
 )