Skip to content

API Client Reference

MinifluxClient

MinifluxClient is the async wrapper around the official Miniflux API library.

Wrapper around official Miniflux client for our app.

Source code in miniflux_tui/api/client.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
class MinifluxClient:
    """Wrapper around official Miniflux client for our app."""

    def __init__(
        self,
        base_url: str,
        api_key: str,
        allow_invalid_certs: bool = False,
        timeout: float = 30.0,
    ):
        """
        Initialize the Miniflux API client.

        Args:
            base_url: Base URL of the Miniflux server
            api_key: API key for authentication
            allow_invalid_certs: Whether to allow invalid SSL certificates (not supported by official client)
            timeout: Request timeout in seconds (not supported by official client)
        """
        self.base_url = base_url.rstrip("/")

        # Create official Miniflux client (synchronous)
        # The official client expects api_key as a keyword argument
        self.client = MinifluxClientBase(base_url, api_key=api_key)

        # Allow invalid certs
        self.allow_invalid_certs: bool = allow_invalid_certs

        # Timeout for network calls
        self.timeout: float = timeout

    async def close(self):
        """Close the HTTP client (no-op for official client)."""

    async def __aenter__(self):
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        await self.close()

    async def _run_sync(self, func, *args, **kwargs):
        """Run a synchronous function in an executor."""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, partial(func, *args, **kwargs))

    async def _call_with_retry(
        self,
        func: Callable[..., T],
        *args,
        max_retries: int = MAX_RETRIES,
        backoff_factor: float = BACKOFF_FACTOR,
        **kwargs,
    ) -> T:
        """Call function with exponential backoff retry logic.

        Automatically retries on network errors (ConnectionError, TimeoutError)
        with exponential backoff. Other exceptions are raised immediately.

        Backoff calculation:
        - Attempt 0: Immediate retry
        - Attempt 1: Wait backoff_factor^1 = 1 second
        - Attempt 2: Wait backoff_factor^2 = 1 second (with factor=1.0)

        Example with backoff_factor=2.0:
        - Attempt 1: Wait 2 seconds
        - Attempt 2: Wait 4 seconds
        - Attempt 3: Wait 8 seconds

        Args:
            func: Synchronous function to call
            *args: Positional arguments for func
            max_retries: Maximum number of retry attempts (default 3)
            backoff_factor: Multiplier for exponential backoff (default 1.0)
            **kwargs: Keyword arguments for func

        Returns:
            Result from func call

        Raises:
            ConnectionError/TimeoutError: Last network error if all retries fail
            Exception: Other exceptions are raised immediately without retry
        """
        last_exception = None

        for attempt in range(max_retries):
            try:
                # Try the function call
                return await self._run_sync(func, *args, **kwargs)
            except (ConnectionError, TimeoutError) as e:
                # Transient network errors - retry with backoff
                last_exception = e
                if attempt < max_retries - 1:
                    # Calculate exponential backoff delay
                    wait_time = backoff_factor**attempt
                    await asyncio.sleep(wait_time)
            except Exception:
                # Non-network errors - don't retry, raise immediately
                raise

        # All retries exhausted - raise last exception
        raise last_exception or Exception("Unknown error in retry logic")

    async def get_unread_entries(self, limit: int = 100, offset: int = 0) -> list[Entry]:
        """
        Get unread feed entries with retry logic.

        Args:
            limit: Maximum number of entries to retrieve
            offset: Offset for pagination

        Returns:
            List of unread Entry objects
        """
        response = await self._call_with_retry(
            self.client.get_entries, status=["unread"], limit=limit, offset=offset, order="published_at", direction="desc"
        )

        return [Entry.from_dict(entry) for entry in response.get("entries", [])]

    async def get_starred_entries(self, limit: int = 100, offset: int = 0) -> list[Entry]:
        """
        Get starred feed entries with retry logic.

        Args:
            limit: Maximum number of entries to retrieve
            offset: Offset for pagination

        Returns:
            List of starred Entry objects
        """
        response = await self._call_with_retry(
            self.client.get_entries, starred=True, limit=limit, offset=offset, order="published_at", direction="desc"
        )

        return [Entry.from_dict(entry) for entry in response.get("entries", [])]

    async def change_entry_status(self, entry_id: int, status: str) -> None:
        """
        Change the read status of an entry with retry logic.

        Args:
            entry_id: ID of the entry
            status: New status ("read" or "unread")
        """
        await self._call_with_retry(self.client.update_entries, entry_ids=[entry_id], status=status)

    async def mark_as_read(self, entry_id: int) -> None:
        """Mark an entry as read with retry logic."""
        await self.change_entry_status(entry_id, "read")

    async def mark_as_unread(self, entry_id: int) -> None:
        """Mark an entry as unread with retry logic."""
        await self.change_entry_status(entry_id, "unread")

    async def toggle_starred(self, entry_id: int) -> None:
        """
        Toggle the starred status of an entry with retry logic.

        Args:
            entry_id: ID of the entry
        """
        await self._call_with_retry(self.client.toggle_bookmark, entry_id)

    async def save_entry(self, entry_id: int) -> None:
        """
        Save an entry to third-party service (e.g., Wallabag, Shiori, Shaarli) with retry logic.

        Args:
            entry_id: ID of the entry
        """
        await self._call_with_retry(self.client.save_entry, entry_id)

    async def mark_all_as_read(self, entry_ids: list[int]) -> None:
        """
        Mark multiple entries as read with retry logic.

        Args:
            entry_ids: List of entry IDs to mark as read
        """
        await self._call_with_retry(self.client.update_entries, entry_ids=entry_ids, status="read")

    async def refresh_all_feeds(self) -> None:
        """Trigger a refresh of all feeds with retry logic."""
        await self._call_with_retry(self.client.refresh_all_feeds)

    async def refresh_feed(self, feed_id: int) -> None:
        """Refresh a specific feed with retry logic.

        Args:
            feed_id: ID of the feed to refresh
        """
        await self._call_with_retry(self.client.refresh_feed, feed_id)

    async def get_categories(self) -> list[Category]:
        """Get all categories with retry logic.

        Returns:
            List of Category objects
        """
        response = await self._call_with_retry(self.client.get_categories)
        # The official client returns a list directly, not wrapped in a dict
        if isinstance(response, list):
            return [Category.from_dict(cat) for cat in response]
        # Fallback for dict response with 'categories' key
        categories_data = response.get("categories", []) if isinstance(response, dict) else []
        return [Category.from_dict(cat) for cat in categories_data]

    async def create_category(self, title: str) -> Category:
        """Create a new category with retry logic.

        Args:
            title: Title of the new category

        Returns:
            The created Category object
        """
        response = await self._call_with_retry(self.client.create_category, title)
        return Category.from_dict(response)

    async def update_category(self, category_id: int, title: str) -> Category:
        """Update a category with retry logic.

        Args:
            category_id: ID of the category to update
            title: New title for the category

        Returns:
            The updated Category object
        """
        response = await self._call_with_retry(self.client.update_category, category_id, title)
        return Category.from_dict(response)

    async def delete_category(self, category_id: int) -> None:
        """Delete a category with retry logic.

        Args:
            category_id: ID of the category to delete
        """
        await self._call_with_retry(self.client.delete_category, category_id)

    async def create_feed(
        self,
        feed_url: str,
        category_id: int | None = None,
    ) -> Feed:
        """Create a new feed with retry logic (Issue #58 - Feed Management).

        Args:
            feed_url: URL of the feed to add
            category_id: Optional category ID to assign feed to

        Returns:
            The created Feed object
        """
        # Build kwargs for create_feed
        kwargs: dict = {}
        if category_id is not None:
            kwargs["category_id"] = category_id

        response = await self._call_with_retry(
            self.client.create_feed,
            feed_url,
            **kwargs,
        )
        return Feed.from_dict(response)  # type: ignore[arg-type]

    async def update_feed(
        self,
        feed_id: int,
        **kwargs,
    ) -> Feed:
        """Update feed settings with retry logic (Issue #58 - Feed Management).

        Args:
            feed_id: ID of the feed to update
            **kwargs: Feed attributes to update (title, category_id, etc.)

        Returns:
            The updated Feed object
        """
        response = await self._call_with_retry(
            self.client.update_feed,
            feed_id,
            **kwargs,
        )
        return Feed.from_dict(response)  # type: ignore[arg-type]

    async def get_feed(self, feed_id: int) -> Feed:
        """Get feed details with retry logic (Issue #58 - Feed Management).

        Args:
            feed_id: ID of the feed to retrieve

        Returns:
            The Feed object
        """
        response = await self._call_with_retry(
            self.client.get_feed,
            feed_id,
        )
        return Feed.from_dict(response)  # type: ignore[arg-type]

    async def delete_feed(self, feed_id: int) -> None:
        """Delete a feed with retry logic (Issue #58 - Feed Management).

        Args:
            feed_id: ID of the feed to delete
        """
        await self._call_with_retry(self.client.delete_feed, feed_id)

    async def fetch_original_content(self, entry_id: int) -> str:
        """
        Fetch the original content of an entry with retry logic.

        Args:
            entry_id: ID of the entry

        Returns:
            Original content HTML
        """
        response = await self._call_with_retry(self.client.fetch_entry_content, entry_id)
        return response.get("content", "")

    async def get_version(self) -> dict:
        """Get Miniflux server version information.

        Returns:
            Dictionary with version information from the server
        """
        return await self._call_with_retry(self.client.get_version)

    async def get_user_info(self) -> dict:
        """Get current user information.

        Returns:
            Dictionary with current user details (username, timezone, language, etc.)
        """
        return await self._call_with_retry(self.client.me)

    async def get_feeds(self) -> list[Feed]:
        """Get all feeds with retry logic.

        Returns:
            List of Feed objects with error/status information
        """
        response = await self._call_with_retry(self.client.get_feeds)
        # The official client returns a list directly
        if isinstance(response, list):
            return [Feed.from_dict(feed) for feed in response]
        # Fallback for dict response with 'feeds' key
        feeds_data = response.get("feeds", []) if isinstance(response, dict) else []
        return [Feed.from_dict(feed) for feed in feeds_data]

__init__(base_url, api_key, allow_invalid_certs=False, timeout=30.0)

Initialize the Miniflux API client.

Parameters:

Name Type Description Default
base_url str

Base URL of the Miniflux server

required
api_key str

API key for authentication

required
allow_invalid_certs bool

Whether to allow invalid SSL certificates (not supported by official client)

False
timeout float

Request timeout in seconds (not supported by official client)

30.0
Source code in miniflux_tui/api/client.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    base_url: str,
    api_key: str,
    allow_invalid_certs: bool = False,
    timeout: float = 30.0,
):
    """
    Initialize the Miniflux API client.

    Args:
        base_url: Base URL of the Miniflux server
        api_key: API key for authentication
        allow_invalid_certs: Whether to allow invalid SSL certificates (not supported by official client)
        timeout: Request timeout in seconds (not supported by official client)
    """
    self.base_url = base_url.rstrip("/")

    # Create official Miniflux client (synchronous)
    # The official client expects api_key as a keyword argument
    self.client = MinifluxClientBase(base_url, api_key=api_key)

    # Allow invalid certs
    self.allow_invalid_certs: bool = allow_invalid_certs

    # Timeout for network calls
    self.timeout: float = timeout

change_entry_status(entry_id, status) async

Change the read status of an entry with retry logic.

Parameters:

Name Type Description Default
entry_id int

ID of the entry

required
status str

New status ("read" or "unread")

required
Source code in miniflux_tui/api/client.py
155
156
157
158
159
160
161
162
163
async def change_entry_status(self, entry_id: int, status: str) -> None:
    """
    Change the read status of an entry with retry logic.

    Args:
        entry_id: ID of the entry
        status: New status ("read" or "unread")
    """
    await self._call_with_retry(self.client.update_entries, entry_ids=[entry_id], status=status)

toggle_starred(entry_id) async

Toggle the starred status of an entry with retry logic.

Parameters:

Name Type Description Default
entry_id int

ID of the entry

required
Source code in miniflux_tui/api/client.py
173
174
175
176
177
178
179
180
async def toggle_starred(self, entry_id: int) -> None:
    """
    Toggle the starred status of an entry with retry logic.

    Args:
        entry_id: ID of the entry
    """
    await self._call_with_retry(self.client.toggle_bookmark, entry_id)

save_entry(entry_id) async

Save an entry to third-party service (e.g., Wallabag, Shiori, Shaarli) with retry logic.

Parameters:

Name Type Description Default
entry_id int

ID of the entry

required
Source code in miniflux_tui/api/client.py
182
183
184
185
186
187
188
189
async def save_entry(self, entry_id: int) -> None:
    """
    Save an entry to third-party service (e.g., Wallabag, Shiori, Shaarli) with retry logic.

    Args:
        entry_id: ID of the entry
    """
    await self._call_with_retry(self.client.save_entry, entry_id)

Connection

The client connects to your Miniflux server using:

  • Base URL: The server's URL (e.g., https://miniflux.example.com)
  • API Key: Your personal API token (from Miniflux Settings)
  • Certificate Validation: Configurable for self-signed certificates

Async Operations

All API calls are asynchronous and use asyncio for non-blocking operations.

Example:

client = MinifluxClient(
    base_url="https://miniflux.example.com",
    api_key="your-api-key"
)

# Fetch unread entries
entries = await client.get_entries("unread")

# Mark an entry as read
await client.change_entry_status(entry_id=123, status="read")

Error Handling

The client may raise exceptions for:

  • Network errors (connection failures)
  • Invalid credentials (wrong API key)
  • Server errors (5xx responses)
  • Invalid requests (malformed parameters)

Always wrap API calls in try-except blocks when appropriate.