title: Strawberry docs
General
Types
Codegen
Extensions
>Errors
>Guides
Editor integration
Concepts
Integrations
Federation
Operations
Channels
Strawberry provides support for Channels with Consumers to provide GraphQL support over WebSockets and HTTP.
Introduction
While Channels does require Django to be installed as a dependency, you can actually run this integration without using Django's request handler. However, the most common use case will be to run a normal Django project with GraphQL subscriptions support, typically taking advantage of the Channel Layers functionality which is exposed through the Strawberry integration.
🍓
Getting Started
Pre-requisites
Make sure you have read the following Channels documentation:
- Introduction
- Tutorial
- Consumers
- And our Subscriptions documentation.
If you have read the Channels documentation, You should know by now that:
- ASGI application is a callable that can handle multiple send / receive operations without the need of a new application instance.
- Channels is all about making ASGI applications instances (whether in another processes or in another machine) talk to each other seamlessly.
- A
scope
is a single connection represented by a dict, whether it would be a websocket or an HTTP request or another protocol. - A
Consumer
is an ASGI application abstraction that helps to handle a single scope.
Installation
Before using Strawberry's Channels support, make sure you install all the required dependencies by running:
pip install 'strawberry-graphql[channels]'
🍓
Tutorial
The following example will pick up where the Channels tutorials left off.
By the end of this tutorial, You will have a graphql chat subscription that will be able to talk with the channels chat consumer from the tutorial.
Types setup
First, let's create some Strawberry-types for the chat.
# mysite/gqlchat/subscription.py
import strawberry
@strawberry.inputclass ChatRoom: room_name: str
@strawberry.typeclass ChatRoomMessage: room_name: str current_user: str message: str
Channel Layers
The Context for Channels integration includes the consumer, which has an instance of the channel layer and the consumer's channel name. This tutorial is an example of how this can be used in the schema to provide subscriptions to events generated by background tasks or the web server. Even if these are executed in other threads, processes, or even other servers, if you are using a Layer backend like Redis or RabbitMQ you should receive the events.
To set this up, you'll need to make sure Channel Layers is configured as per the documentation.
Then you'll want to add a subscription that accesses the channel layer and joins one or more broadcast groups.
Since listening for events and passing them along to the client is a common use case, the base consumer provides a high level API for that using a generator pattern, as we will see below.
The chat subscription
Now we will create the chat subscription.
# mysite/gqlchat/subscription.py
import osimport threading
from typing import AsyncGenerator, List
@strawberry.typeclass Subscription: @strawberry.subscription async def join_chat_rooms( self, info: strawberry.Info, rooms: List[ChatRoom], user: str, ) -> AsyncGenerator[ChatRoomMessage, None]: """Join and subscribe to message sent to the given rooms.""" ws = info.context["ws"] channel_layer = ws.channel_layer room_ids = [f"chat_{room.room_name}" for room in rooms]
for room in room_ids: # Join room group await channel_layer.group_add(room, ws.channel_name)
for room in room_ids: await channel_layer.group_send( room, { "type": "chat.message", "room_id": room, "message": f"process: {os.getpid()} thread: {threading.current_thread().name}" f" -> Hello my name is {user}!", }, )
async with ws.listen_to_channel("chat.message", groups=room_ids) as cm: async for message in cm: if message["room_id"] in room_ids: yield ChatRoomMessage( room_name=message["room_id"], message=message["message"], current_user=user, )
Explanation: Info.context["ws"]
or Info.context["request"]
is a pointer to
the ChannelsConsumer
instance. Here we have first sent a
message to all the channel_layer groups (specified in the subscription argument
rooms
) that we have joined the chat.
The ChannelsConsumer
instance is shared between all subscriptions created in a
single websocket connection. The ws.listen_to_channel
context manager will
return a function to yield all messages sent using the given message type
(chat.message
in the above example) but does not ensure that the message was
sent to the same group or groups that it was called with - if another
subscription using the same ChannelsConsumer
also uses ws.listen_to_channel
with some other group names, those will be returned as well.
In the example we ensure message["room_id"] in room_ids
before passing
messages on to the subscription client to ensure subscriptions only receive
messages for the chat rooms requested in that subscription.
We do not need to call await channel_layer.group_add(room, ws.channel_name)
if
we don't want to send an initial message while instantiating the subscription.
It is handled by ws.listen_to_channel
.
Chat message mutation
If you noticed, the subscription client can't send a message willingly. You will
have to create a mutation for sending messages via the channel_layer
# mysite/gqlchat/subscription.py
from channels.layers import get_channel_layer
@strawberry.typeclass Mutation: @strawberry.mutation async def send_chat_message( self, info: strawberry.Info, room: ChatRoom, message: str, ) -> None: channel_layer = get_channel_layer()
await channel_layer.group_send( f"chat_{room.room_name}", { "type": "chat.message", "room_id": f"chat_{room.room_name}", "message": message, }, )
Creating the consumers
All we did so far is useless without creating an asgi consumer for our schema.
The easiest way to do that is to use the
GraphQLProtocolTypeRouter
which will wrap your
Django application, and route HTTP and websockets for "/graphql"
to
Strawberry, while sending all other requests to Django. You'll need to modify
the myproject.asgi.py
file from the Channels instructions to look something
like this:
import os
from django.core.asgi import get_asgi_applicationfrom strawberry.channels import GraphQLProtocolTypeRouter os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")django_asgi_app = get_asgi_application()
# Import your Strawberry schema after creating the django ASGI application# This ensures django.setup() has been called before any ORM models are imported# for the schema.from mysite.graphql import schema application = GraphQLProtocolTypeRouter( schema, django_application=django_asgi_app,)
This approach is not very flexible, taking away some useful capabilities of
Channels. For more complex deployments, i.e you want to integrate several ASGI
applications on different URLs and protocols, like what is described in the
Channels documentation.
You will probably craft your own
ProtocolTypeRouter
.
An example of this (continuing from channels tutorial) would be:
import os
from channels.auth import AuthMiddlewareStackfrom channels.routing import ProtocolTypeRouter, URLRouterfrom django.core.asgi import get_asgi_applicationfrom django.urls import re_pathfrom strawberry.channels import GraphQLHTTPConsumer, GraphQLWSConsumer os.environ.setdefault("DJANGO_SETTINGS_MODULE", "berry.settings")django_asgi_app = get_asgi_application()
# Import your Strawberry schema after creating the django ASGI application# This ensures django.setup() has been called before any ORM models are imported# for the schema.
from chat import routingfrom mysite.graphql import schema gql_http_consumer = AuthMiddlewareStack(GraphQLHTTPConsumer.as_asgi(schema=schema))gql_ws_consumer = GraphQLWSConsumer.as_asgi(schema=schema)
websocket_urlpatterns = routing.websocket_urlpatterns + [ re_path(r"graphql", gql_ws_consumer),]
application = ProtocolTypeRouter( { "http": URLRouter( [ re_path("^graphql", gql_http_consumer), re_path( "^", django_asgi_app ), # This might be another endpoint in your app ] ), "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)), })
This example demonstrates some ways that Channels can be set up to handle
routing. A very common scenario will be that you want user and session
information inside the GraphQL context, which the AuthMiddlewareStack wrapper
above will provide. It might be apparent by now, there's no reason at all why
you couldn't run a Channels server without any Django ASGI application at all.
However, take care to ensure you run django.setup()
instead of
get_asgi_application()
, if you need any Django ORM or other Django features in
Strawberry.
Running our example
First run your asgi application (The ProtocolTypeRouter) using your asgi server. If you are coming from the channels tutorial, there is no difference. Then open three different tabs on your browser and go to the following URLs:
localhost:8000/graphql
localhost:8000/graphql
localhost:8000/chat
If you want, you can run 3 different instances of your application with different ports it should work the same!
On tab #1 start the subscription:
subscription SubscribeToChatRooms { joinChatRooms( rooms: [{ roomName: "room1" }, { roomName: "room2" }] user: "foo" ) { roomName message currentUser }}
On tab #2 we will run sendChatMessage
mutation:
mutation echo { sendChatMessage(message: "hello room 1", room: { roomName: "room1" })}
On tab #3 we will join the room you subscribed to ("room1") and start chatting.
Before we do that there is a slight change we need to make in the ChatConsumer
you created with channels in order to make it compatible with our
ChatRoomMessage
type.
# Send message to room groupawait self.channel_layer.group_send( self.room_group_name, { "type": "chat.message", "room_id": self.room_group_name, # <<< here is the change "message": f"process is {os.getpid()}, Thread is {threading.current_thread().name}" f" -> {message}", },)
Look here for some more complete examples:
- The Strawberry Examples repo contains a basic example app demonstrating subscriptions with Channels.
🍓
Confirming GraphQL Subscriptions
By default no confirmation message is sent to the GraphQL client once the subscription has started. However, this is useful to be able to synchronize actions and detect communication errors. The code below shows how the above example can be adapted to send a null from the server to the client to confirm that the subscription has successfully started. This includes confirming that the Channels layer subscription has started.
# mysite/gqlchat/subscription.py
@strawberry.typeclass Subscription: @strawberry.subscription async def join_chat_rooms( self, info: strawberry.Info, rooms: List[ChatRoom], user: str, ) -> AsyncGenerator[ChatRoomMessage | None, None]: ... async with ws.listen_to_channel("chat.message", groups=room_ids) as cm: yield None async for message in cm: if message["room_id"] in room_ids: yield ChatRoomMessage( room_name=message["room_id"], message=message["message"], current_user=user, )
Note the change in return signature for join_chat_rooms
and the yield None
after entering the listen_to_channel
context manger.
Testing
We provide a minimal application communicator (GraphQLWebsocketCommunicator
)
for subscribing. Here is an example based on the tutorial above: Make sure you
have pytest-async installed
from channels.testing import WebsocketCommunicatorimport pytestfrom myapp.asgi import application # your channels asgifrom strawberry.channels.testing import GraphQLWebsocketCommunicator
@pytest.fixtureasync def gql_communicator() -> GraphQLWebsocketCommunicator: client = GraphQLWebsocketCommunicator(application=application, path="/graphql") await client.gql_init() yield client await client.disconnect()
chat_subscription_query = """
subscription fooChat {
joinChatRooms(
rooms: [{ roomName: "room1" }, { roomName: "room2" }]
user: "foo"){
roomName
message
currentUser
}
}
"""
@pytest.mark.asyncioasync def test_joinChatRooms_sends_welcome_message(gql_communicator): async for result in gql_communicator.subscribe(query=chat_subscription_query): data = result.data assert data["currentUser"] == "foo" assert "room1" in data["roomName"] assert "hello" in data["message"]
In order to test a real server connection we can use python
gql client and channels
ChannelsLiveServerTestCase
.
This example is based on the extended ChannelsLiveServerTestCase
class from
channels tutorial part 4. You cannot run this test with a pytest session.
Add this test in your ChannelsLiveServerTestCase
extended class:
from gql import Client, gqlfrom gql.transport.websockets import WebsocketsTransport
def test_send_message_via_channels_chat_joinChatRooms_recieves(self): transport = WebsocketsTransport(url=self.live_server_ws_url + "/graphql")
client = Client( transport=transport, fetch_schema_from_transport=False, )
query = gql(chat_subscription_query) for index, result in enumerate(client.subscribe(query)): if index == 0 or 1: print(result) # because we subscribed to 2 rooms we received two welcome messages. elif index == 2: print(result) assert "hello from web browser" in result["joinChatRooms"]["message"] break
try: self._enter_chat_room("room1") self._post_message("hello from web browser") finally: self._close_all_new_windows()
🍓
The HTTP and WebSockets protocol are handled by different base classes. HTTP
uses GraphQLHTTPConsumer
and WebSockets uses GraphQLWSConsumer
. Both of them
can be extended:
Passing connection params
Connection parameters can be passed using the connection_params
parameter of
the GraphQLWebsocketCommunicator
class. This is particularily useful to test
websocket authentication.
GraphQLWebsocketCommunicator( application=application, path="/graphql", connection_params={"token": "strawberry"},)
GraphQLHTTPConsumer (HTTP)
Options
GraphQLHTTPConsumer
supports the same options as all other integrations:
schema
: mandatory, the schema created bystrawberry.Schema
.graphql_ide
: optional, defaults to"graphiql"
, allows to choose the GraphQL IDE interface (one ofgraphiql
,apollo-sandbox
orpathfinder
) or to disable it by passingNone
.allow_queries_via_get
: optional, defaults toTrue
, whether to enable queries viaGET
requestsmultipart_uploads_enabled
: optional, defaults toFalse
, controls whether to enable multipart uploads. Please make sure to consider the security implications mentioned in the GraphQL Multipart Request Specification when enabling this feature.
Extending the consumer
The base GraphQLHTTPConsumer
class can be extended by overriding any of the
following methods:
async def get_context(self, request: ChannelsRequest, response: TemporalResponse) -> Context
async def get_root_value(self, request: ChannelsRequest) -> Optional[RootValue]
async def process_result(self, request: Request, result: ExecutionResult) -> GraphQLHTTPResponse
def decode_json(self, data: Union[str, bytes]) -> object
def encode_json(self, data: object) -> str
async def render_graphql_ide(self, request: ChannelsRequest) -> ChannelsResponse
Context
The default context returned by get_context()
is a dict
that includes the
following keys by default:
request
: AChannelsRequest
object with the following fields and methods:consumer
: TheGraphQLHTTPConsumer
instance for this connectionbody
: The request bodyheaders
: A dict containing the headers of the requestmethod
: The HTTP method of the requestcontent_type
: The content type of the request
response
ATemporalResponse
object, that can be used to influence the HTTP response:status_code
: The status code of the response, if there are no execution errors (defaults to200
)headers
: Any additional headers that should be send with the response
render_graphql_ide
In case you need more control over the rendering of the GraphQL IDE than the
graphql_ide
option provides, you can override the render_graphql_ide
method.
from strawberry.channels import GraphQLHTTPConsumer, ChannelsRequest, ChannelsResponse
class MyGraphQLHTTPConsumer(GraphQLHTTPConsumer): async def render_graphql_ide(self, request: ChannelsRequest) -> ChannelsResponse: custom_html = """<html><body><h1>Custom GraphQL IDE</h1></body></html>"""
return ChannelsResponse(content=custom_html, content_type="text/html")
GraphQLWSConsumer (WebSockets / Subscriptions)
Options
schema
: mandatory, the schema created bystrawberry.Schema
.debug
: optional, defaults toFalse
, whether to enable debug mode.keep_alive
: optional, defaults toFalse
, whether to enable keep alive mode for websockets.keep_alive_interval
: optional, defaults to1
, the interval in seconds for keep alive messages.
Extending the consumer
The base GraphQLWSConsumer
class can be extended by overriding any of the
following methods:
async def get_context(self, request: GraphQLWSConsumer, response: GraphQLWSConsumer) -> Context
async def get_root_value(self, request: GraphQLWSConsumer) -> Optional[RootValue]
def decode_json(self, data: Union[str, bytes]) -> object
def encode_json(self, data: object) -> str
async def on_ws_connect(self, context: Context) -> Union[UnsetType, None, Dict[str, object]]
on_ws_connect
By overriding on_ws_connect
you can customize the behavior when a graphql-ws
or graphql-transport-ws
connection is established. This is particularly useful
for authentication and authorization. By default, all connections are accepted.
To manually accept a connection, return strawberry.UNSET
or a connection
acknowledgment payload. The acknowledgment payload will be sent to the client.
Note that the legacy protocol does not support None
/null
acknowledgment
payloads, while the new protocol does. Our implementation will treat
None
/null
payloads the same as strawberry.UNSET
in the context of the
legacy protocol.
To reject a connection, raise a ConnectionRejectionError
. You can optionally
provide a custom error payload that will be sent to the client when the legacy
GraphQL over WebSocket protocol is used.
from typing import Dictfrom strawberry.exceptions import ConnectionRejectionErrorfrom strawberry.channels import GraphQLWSConsumer
class MyGraphQLWSConsumer(GraphQLWSConsumer): async def on_ws_connect(self, context: Dict[str, object]): connection_params = context["connection_params"]
if not isinstance(connection_params, dict): # Reject without a custom graphql-ws error payload raise ConnectionRejectionError()
if connection_params.get("password") != "secret": # Reject with a custom graphql-ws error payload raise ConnectionRejectionError({"reason": "Invalid password"})
if username := connection_params.get("username"): # Accept with a custom acknowledgment payload return {"message": f"Hello, {username}!"}
# Accept without a acknowledgment payload return await super().on_ws_connect(context)
Context
The default context returned by get_context()
is a dict
and it includes the
following keys by default:
request
: TheGraphQLWSConsumer
instance of the current connection. It can be used to access the connection scope, e.g.info.context["ws"].headers
allows access to any headers.ws
: The same asrequest
connection_params
: Anyconnection_params
, see Authenticating Subscriptions
Example for defining a custom context
Here is an example for extending the base classes to offer a different context
object in your resolvers. For the HTTP integration, you can also have properties
to access the current user and the session. Both properties depend on the
AuthMiddlewareStack
wrapper.
from django.contrib.auth.models import AnonymousUser
from strawberry.channels import ChannelsConsumer, ChannelsRequestfrom strawberry.channels import GraphQLHTTPConsumer as BaseGraphQLHTTPConsumerfrom strawberry.channels import GraphQLWSConsumer as BaseGraphQLWSConsumerfrom strawberry.http.temporal_response import TemporalResponse
@dataclassclass ChannelsContext: request: ChannelsRequest response: TemporalResponse @property def user(self): # Depends on Channels' AuthMiddlewareStack if "user" in self.request.consumer.scope: return self.request.consumer.scope["user"]
return AnonymousUser()
@property def session(self): # Depends on Channels' SessionMiddleware / AuthMiddlewareStack if "session" in self.request.consumer.scope: return self.request.consumer.scope["session"]
return None
@dataclassclass ChannelsWSContext: request: ChannelsConsumer connection_params: Optional[Dict[str, Any]] = None
@property def ws(self) -> ChannelsConsumer: return self.request
class GraphQLHTTPConsumer(BaseGraphQLHTTPConsumer): @override async def get_context( self, request: ChannelsRequest, response: TemporalResponse ) -> ChannelsContext: return ChannelsContext( request=request, response=response, )
class GraphQLWSConsumer(BaseGraphQLWSConsumer): @override async def get_context( self, request: ChannelsConsumer, connection_params: Any ) -> ChannelsWSContext: return ChannelsWSContext( request=request, connection_params=connection_params, )
You can import and use the extended GraphQLHTTPConsumer
and
GraphQLWSConsumer
classes in your myproject.asgi.py
file as shown before.
🍓
API
GraphQLProtocolTypeRouter
A helper for creating a common strawberry-django
ProtocolTypeRouter
Implementation.
Example usage:
from strawberry.channels import GraphQLProtocolTypeRouterfrom django.core.asgi import get_asgi_application django_asgi = get_asgi_application()
from myapi import schema application = GraphQLProtocolTypeRouter( schema, django_application=django_asgi,)
This will route all requests to /graphql on either HTTP or websockets to us, and everything else to the Django application.
ChannelsConsumer
Strawberries extended
AsyncConsumer
.
Every graphql session will have an instance of this class inside
info.context["ws"]
(WebSockets) or info.context["request"].consumer
(HTTP).
properties
@contextlib.asynccontextmanagerasync def listen_to_channel( self, type: str, *, timeout: float | None = None, groups: Sequence[str] | None = None) -> AsyncGenerator[Any, None]: ...
type
- The type of the message to wait for, equivalent toscope['type']
timeout
- An optional timeout to wait for each subsequent message.groups
- list of groups to yield messages from threw channel layer.