���� JFIF fdasasfas213sdaf
Server IP : 84.32.84.196 / Your IP : 216.73.216.199 Web Server : LiteSpeed System : Linux in-mum-web669.main-hosting.eu 5.14.0-503.23.2.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Feb 12 05:52:18 EST 2025 x86_64 User : u479334040 ( 479334040) PHP Version : 8.2.27 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : OFF | Python : OFF | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/opt/alt/python27/lib/python2.7/site-packages/fluent/ |
Upload File : |
# -*- coding: utf-8 -*- from __future__ import print_function import threading try: from queue import Queue, Full, Empty except ImportError: from Queue import Queue, Full, Empty from fluent import sender from fluent.sender import EventTime __all__ = ["EventTime", "FluentSender"] DEFAULT_QUEUE_MAXSIZE = 100 DEFAULT_QUEUE_CIRCULAR = False _TOMBSTONE = object() _global_sender = None def _set_global_sender(sender): # pragma: no cover """ [For testing] Function to set global sender directly """ global _global_sender _global_sender = sender def setup(tag, **kwargs): # pragma: no cover global _global_sender _global_sender = FluentSender(tag, **kwargs) def get_global_sender(): # pragma: no cover return _global_sender def close(): # pragma: no cover get_global_sender().close() class FluentSender(sender.FluentSender): def __init__(self, tag, host='localhost', port=24224, bufmax=1 * 1024 * 1024, timeout=3.0, verbose=False, buffer_overflow_handler=None, nanosecond_precision=False, msgpack_kwargs=None, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, queue_circular=DEFAULT_QUEUE_CIRCULAR, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. """ super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout, verbose=verbose, buffer_overflow_handler=buffer_overflow_handler, nanosecond_precision=nanosecond_precision, msgpack_kwargs=msgpack_kwargs, **kwargs) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular self._thread_guard = threading.Event() # This ensures visibility across all variables self._closed = False self._queue = Queue(maxsize=queue_maxsize) self._send_thread = threading.Thread(target=self._send_loop, name="AsyncFluentSender %d" % id(self)) self._send_thread.daemon = True self._send_thread.start() def close(self, flush=True): with self.lock: if self._closed: return self._closed = True if not flush: while True: try: self._queue.get(block=False) except Empty: break self._queue.put(_TOMBSTONE) self._send_thread.join() @property def queue_maxsize(self): return self._queue_maxsize @property def queue_blocking(self): return not self._queue_circular @property def queue_circular(self): return self._queue_circular def _send(self, bytes_): with self.lock: if self._closed: return False if self._queue_circular and self._queue.full(): # discard oldest try: self._queue.get(block=False) except Empty: # pragma: no cover pass try: self._queue.put(bytes_, block=(not self._queue_circular)) except Full: # pragma: no cover return False # this actually can't happen return True def _send_loop(self): send_internal = super(FluentSender, self)._send_internal try: while True: bytes_ = self._queue.get(block=True) if bytes_ is _TOMBSTONE: break send_internal(bytes_) finally: self._close() def __exit__(self, exc_type, exc_val, exc_tb): self.close()