Skip to content

API Client Reference

MinifluxClient

MinifluxClient is the async wrapper around the official Miniflux API library.

Wrapper around official Miniflux client for our app.

Source code in miniflux_tui/api/client.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class MinifluxClient:
    """Wrapper around official Miniflux client for our app."""

    def __init__(
        self,
        base_url: str,
        api_key: str,
        allow_invalid_certs: bool = False,
        timeout: float = 30.0,
    ):
        """
        Initialize the Miniflux API client.

        Args:
            base_url: Base URL of the Miniflux server
            api_key: API key for authentication
            allow_invalid_certs: Whether to allow invalid SSL certificates (not supported by official client)
            timeout: Request timeout in seconds (not supported by official client)
        """
        self.base_url = base_url.rstrip("/")

        # Create official Miniflux client (synchronous)
        # The official client expects api_key as a keyword argument
        self.client = MinifluxClientBase(base_url, api_key=api_key)

        # Allow invalid certs
        self.allow_invalid_certs: bool = allow_invalid_certs

        # Timeout for network calls
        self.timeout: float = timeout

    async def close(self):
        """Close the HTTP client (no-op for official client)."""

    async def __aenter__(self):
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        await self.close()

    @staticmethod
    async def _run_sync(func, *args, **kwargs):
        """Run a synchronous function in an executor."""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, partial(func, *args, **kwargs))

    async def _call_with_retry(
        self,
        func: Callable[..., T],
        *args,
        max_retries: int = MAX_RETRIES,
        backoff_factor: float = BACKOFF_FACTOR,
        **kwargs,
    ) -> T:
        """Call function with exponential backoff retry logic.

        Automatically retries on network errors (ConnectionError, TimeoutError)
        with exponential backoff. Other exceptions are raised immediately.

        Backoff calculation:
        - Attempt 0: Immediate retry
        - Attempt 1: Wait backoff_factor^1 = 1 second
        - Attempt 2: Wait backoff_factor^2 = 1 second (with factor=1.0)

        Example with backoff_factor=2.0:
        - Attempt 1: Wait 2 seconds
        - Attempt 2: Wait 4 seconds
        - Attempt 3: Wait 8 seconds

        Args:
            func: Synchronous function to call
            *args: Positional arguments for func
            max_retries: Maximum number of retry attempts (default 3)
            backoff_factor: Multiplier for exponential backoff (default 1.0)
            **kwargs: Keyword arguments for func

        Returns:
            Result from func call

        Raises:
            ConnectionError/TimeoutError: Last network error if all retries fail
            Exception: Other exceptions are raised immediately without retry
        """
        last_exception = None

        for attempt in range(max_retries):
            try:
                # Try the function call
                return await self._run_sync(func, *args, **kwargs)
            except (ConnectionError, TimeoutError) as e:
                # Transient network errors - retry with backoff
                last_exception = e
                if attempt < max_retries - 1:
                    # Calculate exponential backoff delay
                    wait_time = backoff_factor**attempt
                    await asyncio.sleep(wait_time)
            except Exception:
                # Non-network errors - don't retry, raise immediately
                raise

        # All retries exhausted - raise last exception
        raise last_exception or Exception("Unknown error in retry logic")

    async def get_unread_entries(self, limit: int = 100, offset: int = 0) -> list[Entry]:
        """
        Get unread feed entries with retry logic and automatic pagination.

        Fetches all available entries if limit > 100 by making multiple API calls.

        Args:
            limit: Maximum number of entries to retrieve (if > 100, fetches all)
            offset: Offset for pagination

        Returns:
            List of unread Entry objects
        """
        # If limit is exactly 100 (default), fetch all entries
        if limit == 100:
            all_entries = []
            current_offset = offset
            batch_size = 100

            while True:
                response = await self._call_with_retry(
                    self.client.get_entries,
                    status=["unread"],
                    limit=batch_size,
                    offset=current_offset,
                    order="published_at",
                    direction="desc",
                )

                entries = [Entry.from_dict(entry) for entry in response.get("entries", [])]

                if not entries:
                    break

                all_entries.extend(entries)

                # If we got fewer entries than requested, we've reached the end
                if len(entries) < batch_size:
                    break

                current_offset += batch_size

            return all_entries

        # For explicit limits other than default, use single request
        response = await self._call_with_retry(
            self.client.get_entries, status=["unread"], limit=limit, offset=offset, order="published_at", direction="desc"
        )

        return [Entry.from_dict(entry) for entry in response.get("entries", [])]

    async def get_starred_entries(self, limit: int = 100, offset: int = 0) -> list[Entry]:
        """
        Get starred feed entries with retry logic and automatic pagination.

        Fetches all available entries if limit > 100 by making multiple API calls.

        Args:
            limit: Maximum number of entries to retrieve (if > 100, fetches all)
            offset: Offset for pagination

        Returns:
            List of starred Entry objects
        """
        # If limit is exactly 100 (default), fetch all entries
        if limit == 100:
            all_entries = []
            current_offset = offset
            batch_size = 100

            while True:
                response = await self._call_with_retry(
                    self.client.get_entries, starred=True, limit=batch_size, offset=current_offset, order="published_at", direction="desc"
                )

                entries = [Entry.from_dict(entry) for entry in response.get("entries", [])]

                if not entries:
                    break

                all_entries.extend(entries)

                # If we got fewer entries than requested, we've reached the end
                if len(entries) < batch_size:
                    break

                current_offset += batch_size

            return all_entries

        # For explicit limits other than default, use single request
        response = await self._call_with_retry(
            self.client.get_entries, starred=True, limit=limit, offset=offset, order="published_at", direction="desc"
        )

        return [Entry.from_dict(entry) for entry in response.get("entries", [])]

    async def get_read_entries(self, limit: int = 100, offset: int = 0) -> list[Entry]:
        """
        Get read feed entries (history) with retry logic.

        Args:
            limit: Maximum number of entries to retrieve
            offset: Offset for pagination

        Returns:
            List of read Entry objects ordered by most recently read first (changed_at)
        """
        response = await self._call_with_retry(
            self.client.get_entries, status=["read"], limit=limit, offset=offset, order="changed_at", direction="desc"
        )

        return [Entry.from_dict(entry) for entry in response.get("entries", [])]

    async def change_entry_status(self, entry_id: int, status: str) -> None:
        """
        Change the read status of an entry with retry logic.

        Args:
            entry_id: ID of the entry
            status: New status ("read" or "unread")
        """
        await self._call_with_retry(self.client.update_entries, entry_ids=[entry_id], status=status)

    async def mark_as_read(self, entry_id: int) -> None:
        """Mark an entry as read with retry logic."""
        await self.change_entry_status(entry_id, "read")

    async def mark_as_unread(self, entry_id: int) -> None:
        """Mark an entry as unread with retry logic."""
        await self.change_entry_status(entry_id, "unread")

    async def toggle_starred(self, entry_id: int) -> None:
        """
        Toggle the starred status of an entry with retry logic.

        Args:
            entry_id: ID of the entry
        """
        await self._call_with_retry(self.client.toggle_bookmark, entry_id)

    async def save_entry(self, entry_id: int) -> None:
        """
        Save an entry to third-party service (e.g., Wallabag, Shiori, Shaarli) with retry logic.

        Args:
            entry_id: ID of the entry
        """
        await self._call_with_retry(self.client.save_entry, entry_id)

    async def mark_all_as_read(self, entry_ids: list[int]) -> None:
        """
        Mark multiple entries as read with retry logic.

        Args:
            entry_ids: List of entry IDs to mark as read
        """
        await self._call_with_retry(self.client.update_entries, entry_ids=entry_ids, status="read")

    async def refresh_all_feeds(self) -> None:
        """Trigger a refresh of all feeds with retry logic."""
        await self._call_with_retry(self.client.refresh_all_feeds)

    async def refresh_feed(self, feed_id: int) -> None:
        """Refresh a specific feed with retry logic.

        Args:
            feed_id: ID of the feed to refresh
        """
        await self._call_with_retry(self.client.refresh_feed, feed_id)

    async def get_categories(self) -> list[Category]:
        """Get all categories with retry logic.

        Returns:
            List of Category objects
        """
        response = await self._call_with_retry(self.client.get_categories)
        # The official client returns a list directly, not wrapped in a dict
        if isinstance(response, list):
            return [Category.from_dict(cat) for cat in response]
        # Fallback for dict response with 'categories' key
        categories_data = response.get("categories", []) if isinstance(response, dict) else []
        return [Category.from_dict(cat) for cat in categories_data]

    async def create_category(self, title: str) -> Category:
        """Create a new category with retry logic.

        Args:
            title: Title of the new category

        Returns:
            The created Category object
        """
        response = await self._call_with_retry(self.client.create_category, title)
        return Category.from_dict(response)

    async def update_category(self, category_id: int, title: str) -> Category:
        """Update a category with retry logic.

        Args:
            category_id: ID of the category to update
            title: New title for the category

        Returns:
            The updated Category object
        """
        response = await self._call_with_retry(self.client.update_category, category_id, title)
        return Category.from_dict(response)

    async def delete_category(self, category_id: int) -> None:
        """Delete a category with retry logic.

        Args:
            category_id: ID of the category to delete
        """
        await self._call_with_retry(self.client.delete_category, category_id)

    async def create_feed(
        self,
        feed_url: str,
        category_id: int | None = None,
    ) -> Feed:
        """Create a new feed with retry logic (Issue #58 - Feed Management).

        Args:
            feed_url: URL of the feed to add
            category_id: Optional category ID to assign feed to

        Returns:
            The created Feed object
        """
        # Build kwargs for create_feed
        kwargs: dict = {}
        if category_id is not None:
            kwargs["category_id"] = category_id

        response = await self._call_with_retry(
            self.client.create_feed,
            feed_url,
            **kwargs,
        )
        return Feed.from_dict(response)  # type: ignore[arg-type]

    async def update_feed(
        self,
        feed_id: int,
        **kwargs,
    ) -> Feed:
        """Update feed settings with retry logic (Issue #58 - Feed Management).

        Args:
            feed_id: ID of the feed to update
            **kwargs: Feed attributes to update (title, category_id, etc.)

        Returns:
            The updated Feed object
        """
        response = await self._call_with_retry(
            self.client.update_feed,
            feed_id,
            **kwargs,
        )
        return Feed.from_dict(response)  # type: ignore[arg-type]

    async def get_feed(self, feed_id: int) -> Feed:
        """Get feed details with retry logic (Issue #58 - Feed Management).

        Args:
            feed_id: ID of the feed to retrieve

        Returns:
            The Feed object
        """
        response = await self._call_with_retry(
            self.client.get_feed,
            feed_id,
        )
        return Feed.from_dict(response)  # type: ignore[arg-type]

    async def delete_feed(self, feed_id: int) -> None:
        """Delete a feed with retry logic (Issue #58 - Feed Management).

        Args:
            feed_id: ID of the feed to delete
        """
        await self._call_with_retry(self.client.delete_feed, feed_id)

    async def fetch_original_content(self, entry_id: int) -> str:
        """
        Fetch the original content of an entry with retry logic.

        Args:
            entry_id: ID of the entry

        Returns:
            Original content HTML
        """
        response = await self._call_with_retry(self.client.fetch_entry_content, entry_id)
        return response.get("content", "")

    async def get_version(self) -> dict:
        """Get Miniflux server version information.

        Returns:
            Dictionary with version information from the server
        """
        return await self._call_with_retry(self.client.get_version)

    async def get_user_info(self) -> dict:
        """Get current user information.

        Returns:
            Dictionary with current user details (username, timezone, language, etc.)
        """
        return await self._call_with_retry(self.client.me)

    async def get_integrations_status(self) -> bool:
        """Get integrations status from the server.

        Returns:
            bool: True if at least one third-party integration is enabled
        """
        return await self._call_with_retry(self.client.get_integrations_status)

    async def update_user_settings(self, user_id: int, **settings) -> dict:
        """Update user settings.

        Args:
            user_id: User ID to update
            **settings: Settings to update (language, timezone, theme, entries_per_page, etc.)

        Returns:
            Dictionary with updated user information
        """
        return await self._call_with_retry(self.client.update_user, user_id, **settings)

    async def get_feeds(self) -> list[Feed]:
        """Get all feeds with retry logic.

        Returns:
            List of Feed objects with error/status information
        """
        response = await self._call_with_retry(self.client.get_feeds)
        # The official client returns a list directly
        if isinstance(response, list):
            return [Feed.from_dict(feed) for feed in response]
        # Fallback for dict response with 'feeds' key
        feeds_data = response.get("feeds", []) if isinstance(response, dict) else []
        return [Feed.from_dict(feed) for feed in feeds_data]

    async def get_category_entries(self, category_id: int, **kwargs) -> list[Entry]:
        """Get all entries for a specific category with retry logic.

        This is useful for building a category_id → entry mapping when the
        feeds endpoint doesn't include category_id information.

        Args:
            category_id: ID of the category to fetch entries for
            **kwargs: Additional parameters (limit, offset, status, etc.)

        Returns:
            List of Entry objects in the category
        """
        response = await self._call_with_retry(self.client.get_category_entries, category_id, **kwargs)
        # Handle both dict and list responses
        if isinstance(response, list):
            return [Entry.from_dict(entry) for entry in response]
        entries_data = response.get("entries", []) if isinstance(response, dict) else []
        return [Entry.from_dict(entry) for entry in entries_data]

__init__(base_url, api_key, allow_invalid_certs=False, timeout=30.0)

Initialize the Miniflux API client.

Parameters:

Name Type Description Default
base_url str

Base URL of the Miniflux server

required
api_key str

API key for authentication

required
allow_invalid_certs bool

Whether to allow invalid SSL certificates (not supported by official client)

False
timeout float

Request timeout in seconds (not supported by official client)

30.0
Source code in miniflux_tui/api/client.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    base_url: str,
    api_key: str,
    allow_invalid_certs: bool = False,
    timeout: float = 30.0,
):
    """
    Initialize the Miniflux API client.

    Args:
        base_url: Base URL of the Miniflux server
        api_key: API key for authentication
        allow_invalid_certs: Whether to allow invalid SSL certificates (not supported by official client)
        timeout: Request timeout in seconds (not supported by official client)
    """
    self.base_url = base_url.rstrip("/")

    # Create official Miniflux client (synchronous)
    # The official client expects api_key as a keyword argument
    self.client = MinifluxClientBase(base_url, api_key=api_key)

    # Allow invalid certs
    self.allow_invalid_certs: bool = allow_invalid_certs

    # Timeout for network calls
    self.timeout: float = timeout

change_entry_status(entry_id, status) async

Change the read status of an entry with retry logic.

Parameters:

Name Type Description Default
entry_id int

ID of the entry

required
status str

New status ("read" or "unread")

required
Source code in miniflux_tui/api/client.py
237
238
239
240
241
242
243
244
245
async def change_entry_status(self, entry_id: int, status: str) -> None:
    """
    Change the read status of an entry with retry logic.

    Args:
        entry_id: ID of the entry
        status: New status ("read" or "unread")
    """
    await self._call_with_retry(self.client.update_entries, entry_ids=[entry_id], status=status)

toggle_starred(entry_id) async

Toggle the starred status of an entry with retry logic.

Parameters:

Name Type Description Default
entry_id int

ID of the entry

required
Source code in miniflux_tui/api/client.py
255
256
257
258
259
260
261
262
async def toggle_starred(self, entry_id: int) -> None:
    """
    Toggle the starred status of an entry with retry logic.

    Args:
        entry_id: ID of the entry
    """
    await self._call_with_retry(self.client.toggle_bookmark, entry_id)

save_entry(entry_id) async

Save an entry to third-party service (e.g., Wallabag, Shiori, Shaarli) with retry logic.

Parameters:

Name Type Description Default
entry_id int

ID of the entry

required
Source code in miniflux_tui/api/client.py
264
265
266
267
268
269
270
271
async def save_entry(self, entry_id: int) -> None:
    """
    Save an entry to third-party service (e.g., Wallabag, Shiori, Shaarli) with retry logic.

    Args:
        entry_id: ID of the entry
    """
    await self._call_with_retry(self.client.save_entry, entry_id)

Connection

The client connects to your Miniflux server using:

  • Base URL: The server's URL (e.g., https://miniflux.example.com)
  • API Key: Your personal API token (retrieved via the password command in config.toml)
  • Certificate Validation: Configurable for self-signed certificates

Async Operations

All API calls are asynchronous and use asyncio for non-blocking operations.

Example:

client = MinifluxClient(
    base_url="https://miniflux.example.com",
    api_key="your-api-key"
)

# Fetch unread entries
entries = await client.get_entries("unread")

# Mark an entry as read
await client.change_entry_status(entry_id=123, status="read")

Error Handling

The client may raise exceptions for:

  • Network errors (connection failures)
  • Invalid credentials (wrong API token)
  • Server errors (5xx responses)
  • Invalid requests (malformed parameters)

Always wrap API calls in try-except blocks when appropriate.