🔥 Load Test LiteLLM
How to run a locust load test on LiteLLM Proxy​
- Add 
fake-openai-endpointto your proxy config.yaml and start your litellm proxy litellm provides a free hostedfake-openai-endpointyou can load test against 
model_list:
  - model_name: fake-openai-endpoint
    litellm_params:
      model: openai/fake
      api_key: fake-key
      api_base: https://exampleopenaiendpoint-production.up.railway.app/
pip install locustCreate a file called
locustfile.pyon your local machine. Copy the contents from the litellm load test located hereStart locust Run
locustin the same directory as yourlocustfile.pyfrom step 2locustOutput on terminal
[2024-03-15 07:19:58,893] Starting web interface at http://0.0.0.0:8089
[2024-03-15 07:19:58,898] Starting Locust 2.24.0Run Load test on locust
Head to the locust UI on http://0.0.0.0:8089
Set Users=100, Ramp Up Users=10, Host=Base URL of your LiteLLM Proxy
Expected Results
Expect to see the following response times for
/health/readinessMedian → /health/readiness is150msAvg → /health/readiness is
219ms
Load Test LiteLLM Proxy - 1500+ req/s​
1500+ concurrent requests/s​
LiteLLM proxy has been load tested to handle 1500+ concurrent req/s
import time, asyncio
from openai import AsyncOpenAI, AsyncAzureOpenAI
import uuid
import traceback
# base_url - litellm proxy endpoint
# api_key - litellm proxy api-key, is created proxy with auth
litellm_client = AsyncOpenAI(base_url="http://0.0.0.0:4000", api_key="sk-1234")
async def litellm_completion():
    # Your existing code for litellm_completion goes here
    try:
        response = await litellm_client.chat.completions.create(
            model="azure-gpt-3.5",
            messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
        )
        print(response)
        return response
    except Exception as e:
        # If there's an exception, log the error message
        with open("error_log.txt", "a") as error_log:
            error_log.write(f"Error during completion: {str(e)}\n")
        pass
async def main():
    for i in range(1):
        start = time.time()
        n = 1500  # Number of concurrent tasks
        tasks = [litellm_completion() for _ in range(n)]
        chat_completions = await asyncio.gather(*tasks)
        successful_completions = [c for c in chat_completions if c is not None]
        # Write errors to error_log.txt
        with open("error_log.txt", "a") as error_log:
            for completion in chat_completions:
                if isinstance(completion, str):
                    error_log.write(completion + "\n")
        print(n, time.time() - start, len(successful_completions))
        time.sleep(10)
if __name__ == "__main__":
    # Blank out contents of error_log.txt
    open("error_log.txt", "w").close()
    asyncio.run(main())
Throughput - 30% Increase​
LiteLLM proxy + Load Balancer gives 30% increase in throughput compared to Raw OpenAI API
Latency Added - 0.00325 seconds​
LiteLLM proxy adds 0.00325 seconds latency as compared to using the Raw OpenAI API
Testing LiteLLM Proxy with Locust​
- 1 LiteLLM container can handle ~140 requests/second with 0.4 failures
 
Load Test LiteLLM SDK vs OpenAI​
Here is a script to load test LiteLLM vs OpenAI
from openai import AsyncOpenAI, AsyncAzureOpenAI
import random, uuid
import time, asyncio, litellm
# import logging
# logging.basicConfig(level=logging.DEBUG)
#### LITELLM PROXY #### 
litellm_client = AsyncOpenAI(
    api_key="sk-1234", # [CHANGE THIS]
    base_url="http://0.0.0.0:4000"
)
#### AZURE OPENAI CLIENT #### 
client = AsyncAzureOpenAI(
    api_key="my-api-key", # [CHANGE THIS]
    azure_endpoint="my-api-base", # [CHANGE THIS]
    api_version="2023-07-01-preview" 
)
#### LITELLM ROUTER #### 
model_list = [
  {
    "model_name": "azure-canada",
    "litellm_params": {
      "model": "azure/my-azure-deployment-name", # [CHANGE THIS]
      "api_key": "my-api-key", # [CHANGE THIS]
      "api_base": "my-api-base", # [CHANGE THIS]
      "api_version": "2023-07-01-preview"
    }
  }
]
router = litellm.Router(model_list=model_list)
async def openai_completion():
  try:
    response = await client.chat.completions.create(
              model="gpt-35-turbo",
              messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
              stream=True
          )
    return response
  except Exception as e:
    print(e)
    return None
  
async def router_completion():
  try:
    response = await router.acompletion(
              model="azure-canada", # [CHANGE THIS]
              messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
              stream=True
          )
    return response
  except Exception as e:
    print(e)
    return None
async def proxy_completion_non_streaming():
  try:
    response = await litellm_client.chat.completions.create(
              model="sagemaker-models", # [CHANGE THIS] (if you call it something else on your proxy)
              messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}],
          )
    return response
  except Exception as e:
    print(e)
    return None
async def loadtest_fn():
    start = time.time()
    n = 500  # Number of concurrent tasks
    tasks = [proxy_completion_non_streaming() for _ in range(n)]
    chat_completions = await asyncio.gather(*tasks)
    successful_completions = [c for c in chat_completions if c is not None]
    print(n, time.time() - start, len(successful_completions))
# Run the event loop to execute the async function
asyncio.run(loadtest_fn())