Persisting logs in DB for each task in Celery (+ FastAPI)

At FereAI.xyz, we use celery at scale. Each time the trading agent wants to perform a trade or a manual sell is executed or emails are supposed to be sent for scheduled jobs. All these actions happen via celery.

Celery is great, but I always miss the feature from airflow where you can see the logs of each individual task run, and then be able to diagnose or debug something. So, I decided to build something around it.

Requirements

I wanted a solution where

  • All logs from celery jobs are stored in DB along with their task id, status & a few other params
  • Logging works otb for celery, fastapi & standalone usage (jupyter notebooks)

Here’s what I did

A database table for storing logs

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

from sqlalchemy import Column, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from sqlalchemy.sql.expression import func
from sqlalchemy import DateTime
from .base import Base


class BatchedLog(Base):
  __tablename__ = "batched_log"
  id = Column(Integer, primary_key=True, autoincrement=True)
  task_id = Column(String(255), nullable=True, index=True)
  task_name = Column(String(255), nullable=True, index=True)
  status = Column(String(255), nullable=True, index=True)
  logs = Column(Text)  # Store logs as JSON
  created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    nullable=False,
    default=func.now(),
    index=True,
  )

Logger config

import contextvars
import logging
import os

from celery.app.log import TaskFormatter

# Create and configure the logger
logger = logging.getLogger("trader")
logger.setLevel(logging.DEBUG)

# Remove existing handlers to avoid duplication
if logger.hasHandlers():
  logger.handlers.clear()

# Stream handler for outputting to stdout
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)

# Formatter for the log messages
formatter = TaskFormatter(
  "%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(threadName)s- %(levelname)s - %(message)s"
)

logger.addHandler(stream_handler)

# Prevent propagation to the root logger
logger.propagate = False

task_logs = contextvars.ContextVar("task_logs", default=[])

# Define a context variable to store the logger
logger_context = contextvars.ContextVar("logger_context", default=None)


def get_logger():
  """Retrieve the current logger from the context variable."""
  lc = logger_context.get()
  if lc is None:
    logger.name = "Jupyter" if "ipykernel" in os.environ else "Standalone"
    return logger
  return lc

Celery app

import json
import logging
import traceback
from uuid import UUID

from celery.app import Celery
from celery.app.log import TaskFormatter
from celery.schedules import crontab
from celery.signals import after_setup_task_logger
from celery.utils.log import get_task_logger
from celery.signals import task_prerun, task_postrun, task_failure
from celery.result import AsyncResult

from .logger_config import task_logs, logger_context, get_logger

app = Celery("trader", broker=get_redis_url(), backend=get_redis_url())
app.conf.task_logging_level = logging.DEBUG

# Signal to initialize logs
@task_prerun.connect
def initialize_logs(task_id=None, task=None, args=None, kwargs=None, **extras):
  logger_context.set(celery_task_logger)
  task_logs.set([])
  celery_task_logger.info(f"Logger set for Celery task: {task_id}")


# Signal to persist logs on task completion
@task_postrun.connect
def persist_logs_on_completion(
  task_id=None, task=None, args=None, kwargs=None, retval=None, **extras
):

  result = AsyncResult(task_id)
  status = result.status

  task_name = task.name if task else "unknown"

  logs = task_logs.get()
  print(f"Persisting logs for task_id={task_id}")

  # Save logs to the database
  with get_trade_db_session() as session:
    session.merge(
      BatchedLog(
        task_id=task_id,
        status=status.lower(),
        task_name=task_name,
        logs=json.dumps(logs),
      )
    )
    session.commit()
  logger_context.set(None)

# Signal to handle task failure
@task_failure.connect
def handle_failure(
  task_id=None,
  task=None,
  args=None,
  kwargs=None,
  exc=None,
  traceback=None,
  **extras,
):
  logger = get_logger()
  result = AsyncResult(task_id)
  status = result.status

  task_name = task.name if task else "unknown"

  logger.error(f"Task failed: {exc}")
  logger.error(
    "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
  )

  logs = task_logs.get()
  print(f"Task failed. Persisting logs for task_id={task_id}")

  # Save logs to the database
  with get_trade_db_session() as session:
    session.merge(
      BatchedLog(
        task_id=task_id,
        status=status.lower(),
        task_name=task_name,
        logs=json.dumps(logs),
      )
    )
    session.commit()

class TaskLogHandler(logging.Handler):
  def emit(self, record):
    logs = task_logs.get()
    logs.append(
      {
        "level": record.levelname,
        "message": record.getMessage(),
        "time": record.created,
        "filename": record.filename,
        "lineno": record.lineno,
        "funcName": record.funcName,
        "thread": record.threadName,
      }
    )
    task_logs.set(logs)


@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
  task_handler = TaskLogHandler()
  logger.addHandler(task_handler)
  for handler in logger.handlers:
    handler.setFormatter(
      TaskFormatter(
        "%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s"
      )
    )

@app.task
def test_task():
  celery_task_logger.info("Starting main task")
  try:
    celery_task_logger.info(
      f"Started {len(agents_started)} agents with UUIDs: {agents_started}"
    )
    return agents_started
  except Exception as e:
    celery_task_logger.error(f"Task failed: {e}")
    celery_task_logger.error(traceback.format_exc())
    raise

Any other custom claases or functions used should also use the get_logger

from .logger_config import get_logger

class MyClass:

  def __init__(self):
    self.logger = get_logger()
  
  def foo(self):
    self.logger.info("Inside foo")

fastapi app

app = FastAPI(
...
)

@app.middleware("http")
async def set_fastapi_logger(request: Request, call_next):
  # Assign a request-specific logger to the logger context
  if not hasattr(request.state, "logger"):
    request.state.logger = get_logger()
  logger_context.set(request.state.logger)
  response = await call_next(request)
  # Clear the logger context after request handling
  logger_context.set(None)
  return response

Using websockets with Autogen

Autogen provides a default implementation of Websockets. It’s available at https://microsoft.github.io/autogen/docs/notebooks/agentchat_websockets. However, it has some limitations. Notably

  1. Isn’t compatible with ASGI Servers running multiple instances of FastAPI Server.

For a production grade deployment, one wants to have many instances of FastAPI or Django being served over an ASGI server like uvicorn.

So, this notebook demonstrates how to build an alternative approach that works seamlessly and scales up as the demand grows.

Requirements

Some extra dependencies are needed for this notebook, which can be installed via pip:

pip install pyautogen[websockets] fastapi uvicorn

Define your Agents

agent = autogen.ConversableAgent(
        name="chatbot",
        system_message="Complete a task given to you and reply TERMINATE when the task is done. If asked about the weather, use tool 'weather_forecast(city)' to get the weather forecast for a city.",
        llm_config={
            "config_list": autogen.config_list_from_json(
                env_or_file="OAI_CONFIG_LIST",
                filter_dict={
                    "model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
                },
            ),
            "stream": True,
        },
    )

user_proxy = autogen.UserProxyAgent(
        name="user_proxy",
        system_message="A proxy for the user.",
        is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        code_execution_config=False,
    )

def weather_forecast(city: str) -> str:
        return f"The weather forecast for {city} at {datetime.now()} is sunny."

autogen.register_function(
        weather_forecast, caller=agent, executor=user_proxy, description="Weather forecast for a city"
    )

Create a custom IOStream that handles websocket connections

from autogen.io.base import IOStream
from fastapi import WebSocket

class CustomIOWebsockets(IOStream):
  r"""A websocket input/output stream.

  Attributes:
      _websocket (WebSocket): The websocket server.
  """

  def __init__(self, websocket: WebSocket) -> None:
    """Initialize the websocket input/output stream.

    Args:
        websocket (ServerConnection): The websocket server.

    Raises:
        ImportError: If the websockets module is not available.
    """
    self._websocket = websocket

  @staticmethod
  async def handler(websocket: WebSocket, on_connect, *args, **kwargs) -> None:
    """The handler function for the websocket server."""
    logger.debug(
      f" - CustomIOWebsockets._handler(): Client connected on {websocket}"
    )
    # create a new IOWebsockets instance using the websocket that is
    # create when a client connects
    iowebsocket = CustomIOWebsockets(websocket)
    with CustomIOWebsockets.set_default(iowebsocket):
      # call the on_connect function
      await on_connect(iowebsocket, *args, **kwargs)

  @property
  def websocket(self) -> "WebSocket":
    """The URI of the websocket server."""
    return self._websocket


  def print(
    self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False
  ) -> None:
    r"""Print data to the output stream.

    Args:
        objects (any): The data to print.
        sep (str, optional): The separator between objects.
        Defaults to " ".
        end (str, optional): The end of the output. Defaults to "\n".
        flush (bool, optional): Whether to flush the output.
        Defaults to False.
    """
    if isinstance(objects, tuple) and isinstance(objects[0], dict):
      _xs = sep.join(map(json.dumps, objects)) + end
    else:
      _xs = sep.join(map(str, objects)) + end
    if _xs:
      xs = {"type": "websocket.send", "text": _text_to_send}

      def send_async():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
          loop.run_until_complete(self._websocket.send(xs))
        finally:
          loop.close()

      thread = threading.Thread(target=send_async)
      thread.start()
      thread.join()

  async def input(self, prompt: str = "", *, password: bool = False) -> str:
    """Read a line from the input stream.

    Args:
        prompt (str, optional): The prompt to display. Defaults to "".
        password (bool, optional): Whether to read a password.
        Defaults to False.

    Returns:
        str: The line read from the input stream.
    """
    if prompt != "":
      await self._websocket.send(prompt)

    msg = await self._websocket.receive_text()

    return msg.decode("utf-8") if isinstance(msg, bytes) else msg

Define your on_connect handler

async def chat_on_connect(
  iostream: CustomIOWebsockets,
) -> None:
  logger.debug(
    " - on_connect(): Connected to client using CustomIOWebsockets "
    f"{iostream}"
  )

  logger.debug(" - on_connect(): Receiving message from client.")

  msg = json.loads(await iostream.input())  # Await the input method
  query = msg["query"]
  user_proxy.initiate_chat(  # noqa: F704
        agent,
        message=initial_msg,
    )
  

Adding this with FastAPI

from fastapi.websockets import WebSocketState
from fastapi import WebSocket
from starlette.websockets import WebSocketDisconnect

app = FastAPI(
  title="WebSockets with Autogen",
  description="""A better websocket with Autogen""",
  version="1.0.1"
)

@app.websocket("/chat")
async def websocket_endpoint_v2(
  websocket: WebSocket,
):
  await websocket.accept()
  try:
    origin = websocket.headers.get("origin")
    await CustomIOWebsockets.handler(
      websocket,
      chat_on_connect,
    )
  except WebSocketDisconnect:
    logger.info("Client disconnected")
    pass
  except Exception as e:
    logger.exception(f"An error occurred: {e}", exc_info=True)
    # send the error message to the client
    if websocket.client_state == WebSocketState.CONNECTED:
      try:
        await websocket.send_text("An internal server error occurred. Closing.")
      except RuntimeError:
        pass
  finally:
    if websocket.client_state == WebSocketState.CONNECTED:
      try:
        await websocket.close()
      except RuntimeError:
        pass

Start your ASGI

uvicorn server:app  --workers 3 --ws auto

The Doors of Perception – Extracts

By its very nature every embodied spirit is doomed to suffer and enjoy in solitude. Sensations, feelings, insights, fancies – all these are private and, except through symbols and at second hand, incommunicable. We can pool information about experiences, but never the experiences themselves. From family to nation, every human group is a society of island universe.

Neither agreeable nor disagreeable, it just is.

The suggestion is that the function of the brain and nervous system and sense organs is in the main eliminative and not productive. Each person is at each moment capable of remembering all that has ever happened to him and of perceiving everything that is happening everywhere in the universe. The function of the brain and nervous system is to protect us from being overwhelmed and confused by this mass of largely useless and irrelevant knowledge, by shutting out most of what we should otherwise perceive or remember at any moment, and leaving only that very small and special selection which is likely to be practically useful. 

According to such a theory, each one of us is potentially Mind at Large. But in so far as we are animals, our business is at all costs to survive. To make biological survival possible, Mind at Large has to be funneled through the reducing valve of the brain and nervous system. What comes out at the other end is a measly trickle of the kind of consciousness which will help us to stay alive on the surface of this Particular planet

Most men and women lead lives at the worst so painful, at the best so monotonous, poor and limited that the urge to escape, the longing to transcend themselves if only for a few moments, is and has always been one of the principal appetites of the soul

Solr gem and xml ruby gem How it matters

If you use solr-ruby gem, you should be well aware that Solr attempts to create an XML doc from the provided doc-hash. It first attempts to use ‘xml/libxml’, which if not available, falls back to REXML. It is recommended to use libxml.

All you need to do is


gem install libxml-ruby

If you are lucky enough, that’s all for you. However, many a times we face issues like


Building native extensions. This could take a while...
ERROR: Error installing libxml-ruby:
ERROR: Failed to build gem native extension.

/usr/bin/ruby extconf.rb
checking for socket() in -lsocket... no
checking for gethostbyname() in -lnsl... yes
checking for atan() in -lm... no
checking for atan() in -lm... yes
checking for inflate() in -lz... no
checking for inflate() in -lzlib... no
checking for inflate() in -lzlib1... no
checking for inflate() in -llibz... no
*** extconf.rb failed ***
Could not create Makefile due to some reason, probably lack of
necessary libraries and/or headers. Check the mkmf.log file for more
details. You may need configuration options.

If this is the case, you should install zlib-devel and libxml2-devel packages.


yum -y install zlib-devel
yum -y install libxml2-devel
gem install libxml-ruby

That sorts it all. You are good to go

ruby fetch_hash ArgumentError: NULL pointer given

Recently, One of our servers started to have the following error

irb(main):001:0> require 'rubygems'
=> true
irb(main):002:0> require 'mysql'
=> true
irb(main):003:0> conn = Mysql.connect('db01', 'xxxxxx', 'xxxxxx', 'employee')
=> #
irb(main):004:0> a = conn.query("SELECT * FROM employees WHERE id >= 8500 AND id < 9000")
=> #
irb(main):005:0> a.fetch_hash
ArgumentError: NULL pointer given
    from (irb):5:in `fetch_hash'
	from (irb):5
irb(main):006:0> exit

The method fetch_hash is a standard method and works. Here is the versions of MySQL and ruby that I used

-bash-3.2$ mysql --version
mysql  Ver 14.12 Distrib 5.0.89, for unknown-linux-gnu (x86_64) using readline 5.1

-bash-3.2$ ruby --version
ruby 1.8.7 (2009-06-12 patchlevel 174) [x86_64-linux]

If you face this issue in your servers or anywhere try the following workaround. They have worked for me, and they should work for you as well.

  1. uninstall the MySQL-shared-compat package
  2. Re Install mysql gem

Control activity feeds on facebook

Controlling activity feeds on facebook

Facebook has a setting which lets you chose amongst your friends and pages, whose updates you want to see and whose you want to skip. Here it is how

Next to Most Recent, there is a drop down button. Click on that to gain access to control options. The drop down menu comes only when you are viewing the Most Recent news feeds only. So if you don’t see an option, just click on Most Recent. The page will refresh itself and the menu options will come.

image

Select Edit Options in the drop down menu

image

Now conveniently you can choose to show news from

  1. Those friends and pages you interact with most or
  2. All friends and pages.

The default setting is “friends and pages” you interact with most. You can also edit friends whose feeds have been blocked in the