Skip to content

Core

tangram_core

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
44
45
46
47
48
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config

redis_client instance-attribute

redis_client: Redis

http_client instance-attribute

http_client: AsyncClient

config instance-attribute

config: Config

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: Path) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config

core class-attribute instance-attribute

core: CoreConfig = field(default_factory=CoreConfig)

server class-attribute instance-attribute

server: ServerConfig = field(default_factory=ServerConfig)

channel class-attribute instance-attribute

channel: ChannelConfig = field(
    default_factory=ChannelConfig
)

map class-attribute instance-attribute

map: MapConfig = field(default_factory=MapConfig)

plugins class-attribute instance-attribute

plugins: dict[str, Any] = field(default_factory=dict)

cache class-attribute instance-attribute

cache: CacheConfig = field(default_factory=CacheConfig)

from_file classmethod

from_file(config_path: Path) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def from_file(cls, config_path: Path) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

Plugin dataclass

Source code in packages/tangram_core/src/tangram_core/plugin.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass
class Plugin:
    frontend_path: str | None = None
    routers: list[APIRouter] = field(default_factory=list)
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator

frontend_path class-attribute instance-attribute

frontend_path: str | None = None

routers class-attribute instance-attribute

routers: list[APIRouter] = field(default_factory=list)

services class-attribute instance-attribute

services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)

register_service

register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]
Source code in packages/tangram_core/src/tangram_core/plugin.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

backend

logger module-attribute

logger = getLogger(__name__)

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

LOG_LEVEL_MAP module-attribute

LOG_LEVEL_MAP = {
    "TRACE": DEBUG,
    "DEBUG": DEBUG,
    "INFO": INFO,
    "WARN": WARNING,
    "ERROR": ERROR,
}

BackendState dataclass

Source code in packages/tangram_core/src/tangram_core/backend.py
44
45
46
47
48
@dataclass
class BackendState:
    redis_client: redis.Redis
    http_client: httpx.AsyncClient
    config: Config
redis_client instance-attribute
redis_client: Redis
http_client instance-attribute
http_client: AsyncClient
config instance-attribute
config: Config

get_state async

get_state(request: Request) -> BackendState
Source code in packages/tangram_core/src/tangram_core/backend.py
51
52
async def get_state(request: Request) -> BackendState:
    return request.app.state.backend_state  # type: ignore

resolve_frontend

resolve_frontend(
    *, path: str, dist_name: str
) -> Path | Traversable | None
Source code in packages/tangram_core/src/tangram_core/backend.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def resolve_frontend(*, path: str, dist_name: str) -> Path | Traversable | None:
    # always try to parse from direct url first (this is robust for editable
    # installs like `uv sync --all-packages`)
    try:
        dist = Distribution.from_name(dist_name)
        if direct_url_content := dist.read_text("direct_url.json"):
            direct_url_data = json.loads(direct_url_content)
            if (url := direct_url_data.get("url")) and (
                path1 := Path(url.removeprefix("file://")) / path
            ).is_dir():
                return path1
    except (PackageNotFoundError, json.JSONDecodeError, FileNotFoundError):
        pass

    # fallback in case it was installed via pip
    if (path2 := importlib.resources.files(dist_name) / path).is_dir():
        return path2
    return None

load_enabled_plugins

load_enabled_plugins(
    config: Config,
) -> list[tuple[DistName, Plugin]]
Source code in packages/tangram_core/src/tangram_core/backend.py
78
79
80
81
82
83
84
85
86
87
88
89
90
def load_enabled_plugins(
    config: Config,
) -> list[tuple[DistName, Plugin]]:
    loaded_plugins = []
    enabled_plugin_names = set(config.core.plugins)

    for entry_point in scan_plugins():
        if entry_point.name not in enabled_plugin_names:
            continue
        if (plugin := load_plugin(entry_point)) is not None:
            loaded_plugins.append(plugin)

    return loaded_plugins

lifespan async

lifespan(
    app: FastAPI, backend_state: BackendState
) -> AsyncGenerator[None, None]
Source code in packages/tangram_core/src/tangram_core/backend.py
 93
 94
 95
 96
 97
 98
 99
100
@asynccontextmanager
async def lifespan(
    app: FastAPI, backend_state: BackendState
) -> AsyncGenerator[None, None]:
    # we don't need to __aenter__ httpx.AsyncClient again
    async with backend_state.redis_client:
        app.state.backend_state = backend_state
        yield

make_cache_route_handler

make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]

Factory function that creates a route handler for caching and serving files. Dynamically handles URL parameters found in both serve_route and origin.

Args: entry: Cache entry configuration state: Backend state with http_client for fetching remote resources

Returns: Async function that handles the route with dynamic parameters

Source code in packages/tangram_core/src/tangram_core/backend.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def make_cache_route_handler(
    entry: CacheEntry, state: BackendState
) -> Callable[..., Awaitable[FileResponse]]:
    """
    Factory function that creates a route handler for caching and serving files.
    Dynamically handles URL parameters found in both serve_route and origin.

    Args:
        entry: Cache entry configuration
        state: Backend state with http_client for fetching remote resources

    Returns:
        Async function that handles the route with dynamic parameters
    """
    from inspect import Parameter, Signature

    # Extract parameter names from the serve_route (e.g., {fontstack}, {range})
    param_pattern = re.compile(r"\{(\w+)\}")
    params = param_pattern.findall(entry.serve_route)

    async def cache_route_handler(**kwargs: str) -> FileResponse:
        local_path = entry.local_path
        if local_path is None:
            local_path = Path(user_cache_dir("tangram_core"))
            if not local_path.exists():
                local_path.mkdir(parents=True)
        else:
            local_path = local_path.expanduser()

        # Build the local file path by replacing parameters
        local_file = local_path
        for param in params:
            if param in kwargs:
                local_file = local_file / kwargs[param]

        logger.info(f"Serving cached file from {local_file}")

        if not local_file.exists():
            assert entry.origin is not None
            # Build the remote URL by replacing parameters
            remote_url = entry.origin
            for param, value in kwargs.items():
                remote_url = remote_url.replace(f"{{{param}}}", value)

            logger.info(f"Downloading from {remote_url} to {local_file}")
            c = await state.http_client.get(remote_url)
            c.raise_for_status()
            local_file.parent.mkdir(parents=True, exist_ok=True)
            local_file.write_bytes(c.content)

        return FileResponse(path=local_file, media_type=entry.media_type)

    # Create explicit parameters for the function signature
    sig_params = [
        Parameter(
            name=param,
            kind=Parameter.POSITIONAL_OR_KEYWORD,
            annotation=str,
        )
        for param in params
    ]
    cache_route_handler.__signature__ = Signature(  # type: ignore
        parameters=sig_params,
        return_annotation=FileResponse,
    )

    return cache_route_handler

create_app

create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> FastAPI
Source code in packages/tangram_core/src/tangram_core/backend.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> FastAPI:
    app = FastAPI(lifespan=partial(lifespan, backend_state=backend_state))
    frontend_plugins = []

    for dist_name, plugin in loaded_plugins:
        for router in plugin.routers:
            app.include_router(router)

        if (p := plugin.frontend_path) is not None and (
            frontend_path_resolved := resolve_frontend(path=p, dist_name=dist_name)
        ) is not None:
            app.mount(
                f"/plugins/{dist_name}",
                StaticFiles(directory=str(frontend_path_resolved)),
                name=dist_name,
            )
            frontend_plugins.append(dist_name)

    # unlike v0.1 which uses `process.env`, v0.2 *compiles* the js so we no
    # no longer have access to it, so we selectively forward the config.
    @app.get("/config")
    async def get_frontend_config(
        state: Annotated[BackendState, Depends(get_state)],
    ) -> FrontendConfig:
        channel_cfg = state.config.channel
        if channel_cfg.public_url:
            channel_url = channel_cfg.public_url
        else:
            # for local/non-proxied setups, user must set a reachable host.
            # '0.0.0.0' is for listening, not connecting.
            host = "localhost" if channel_cfg.host == "0.0.0.0" else channel_cfg.host
            channel_url = f"http://{host}:{channel_cfg.port}"

        return FrontendConfig(
            channel=FrontendChannelConfig(url=channel_url),
            map=state.config.map,
        )

    @app.get("/manifest.json")
    async def get_manifest() -> JSONResponse:
        return JSONResponse(content={"plugins": frontend_plugins})

    # Cache mechanism - MUST be registered BEFORE the catch-all frontend mount
    for cache_entry in backend_state.config.cache.entries:
        logger.info(
            f"caching {cache_entry.origin} to {cache_entry.local_path} "
            f"and serving at {cache_entry.serve_route}"
        )
        route_handler = make_cache_route_handler(cache_entry, backend_state)

        logger.info(
            f"Registering route: GET {cache_entry.serve_route} with dynamic params"
        )
        app.add_api_route(
            cache_entry.serve_route,
            route_handler,
            methods=["GET"],
            name=f"cache-{cache_entry.serve_route.replace('/', '_')}",
        )

    # Frontend mount - this is a catch-all and must come LAST
    if (
        frontend_path := resolve_frontend(
            path="dist-frontend", dist_name="tangram_core"
        )
    ) is None:
        raise ValueError(
            "error: frontend was not found, did you run `pnpm i && pnpm run build`?"
        )
    app.mount("/", StaticFiles(directory=str(frontend_path), html=True), name="core")

    return app

run_channel_service async

run_channel_service(config: Config) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
async def run_channel_service(config: Config) -> None:
    from . import _core

    _core.init_tracing_stderr(config.core.log_level)

    rust_config = _core.ChannelConfig(
        host=config.channel.host,
        port=config.channel.port,
        redis_url=config.core.redis_url,
        jwt_secret=config.channel.jwt_secret,
        jwt_expiration_secs=config.channel.jwt_expiration_secs,
        id_length=config.channel.id_length,
    )
    await _core.run(rust_config)

run_services async

run_services(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> AsyncGenerator[Task[None], None]
Source code in packages/tangram_core/src/tangram_core/backend.py
274
275
276
277
278
279
280
281
282
283
284
285
async def run_services(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> AsyncGenerator[asyncio.Task[None], None]:
    yield asyncio.create_task(run_channel_service(backend_state.config))

    for dist_name, plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            yield asyncio.create_task(service_func(backend_state))
            logger.info(f"started service from plugin: {dist_name}")

run_server async

run_server(
    backend_state: BackendState,
    loaded_plugins: list[tuple[DistName, Plugin]],
) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
288
289
290
291
292
293
294
295
296
297
298
299
async def run_server(
    backend_state: BackendState, loaded_plugins: list[tuple[DistName, Plugin]]
) -> None:
    app_instance = create_app(backend_state, loaded_plugins)
    server_config = uvicorn.Config(
        app_instance,
        host=backend_state.config.server.host,
        port=backend_state.config.server.port,
        log_config=get_log_config_dict(backend_state.config),
    )
    server = uvicorn.Server(server_config)
    await server.serve()

start_tasks async

start_tasks(config: Config) -> None
Source code in packages/tangram_core/src/tangram_core/backend.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def start_tasks(config: Config) -> None:
    loaded_plugins = load_enabled_plugins(config)

    async with AsyncExitStack() as stack:
        redis_client = await stack.enter_async_context(
            redis.from_url(config.core.redis_url)  # type: ignore
        )
        http_client = await stack.enter_async_context(httpx.AsyncClient(http2=True))
        state = BackendState(
            redis_client=redis_client, http_client=http_client, config=config
        )

        server_task = asyncio.create_task(run_server(state, loaded_plugins))
        service_tasks = [s async for s in run_services(state, loaded_plugins)]

        await asyncio.gather(server_task, *service_tasks)

get_log_config_dict

get_log_config_dict(config: Config) -> dict[str, Any]
Source code in packages/tangram_core/src/tangram_core/backend.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def get_log_config_dict(config: Config) -> dict[str, Any]:
    def format_time(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ ")

    return {
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {
                "class": "rich.logging.RichHandler",
                "log_time_format": format_time,
                "omit_repeated_times": False,
            },
        },
        "root": {"handlers": ["default"], "level": config.core.log_level.upper()},
    }

config

ServerConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
 9
10
11
12
@dataclass
class ServerConfig:
    host: str = "127.0.0.1"
    port: int = 2346
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2346

ChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
15
16
17
18
19
20
21
22
23
24
@dataclass
class ChannelConfig:
    # TODO: we should make it clear that host:port is for the *backend* to
    # listen on, and not to be confused with the frontend.
    host: str = "127.0.0.1"
    port: int = 2347
    public_url: str | None = None
    jwt_secret: str = "secret"
    jwt_expiration_secs: int = 315360000  # 10 years
    id_length: int = 8
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2347
public_url class-attribute instance-attribute
public_url: str | None = None
jwt_secret class-attribute instance-attribute
jwt_secret: str = 'secret'
jwt_expiration_secs class-attribute instance-attribute
jwt_expiration_secs: int = 315360000
id_length class-attribute instance-attribute
id_length: int = 8

UrlConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
27
28
29
30
@dataclass
class UrlConfig:
    url: str
    type: str = "vector"
url instance-attribute
url: str
type class-attribute instance-attribute
type: str = 'vector'

SourceSpecification dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
33
34
35
36
@dataclass
class SourceSpecification:
    carto: UrlConfig | None = None
    protomaps: UrlConfig | None = None
carto class-attribute instance-attribute
carto: UrlConfig | None = None
protomaps class-attribute instance-attribute
protomaps: UrlConfig | None = None

StyleSpecification dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
39
40
41
42
43
44
@dataclass
class StyleSpecification:
    sources: SourceSpecification | None = None
    glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
    layers: list[Any] | None = None
    version: Literal[8] = 8
sources class-attribute instance-attribute
sources: SourceSpecification | None = None
glyphs class-attribute instance-attribute
glyphs: str = "https://cdn.protomaps.com/fonts/pbf/{fontstack}/{range}.pbf"
layers class-attribute instance-attribute
layers: list[Any] | None = None
version class-attribute instance-attribute
version: Literal[8] = 8

MapConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@dataclass
class MapConfig:
    style: str | StyleSpecification = (
        "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
    )
    attribution: str = (
        '&copy; <a href="https://www.openstreetmap.org/copyright">'
        "OpenStreetMap</a> contributors &copy; "
        '<a href="https://carto.com/attributions">CARTO</a>'
    )
    center_lat: float = 48.0
    center_lon: float = 7.0
    zoom: int = 4
    pitch: float = 0
    bearing: float = 0
    lang: str = "en"
style class-attribute instance-attribute
style: str | StyleSpecification = (
    "https://basemaps.cartocdn.com/gl/voyager-gl-style/style.json"
)
attribution class-attribute instance-attribute
attribution: str = '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors &copy; <a href="https://carto.com/attributions">CARTO</a>'
center_lat class-attribute instance-attribute
center_lat: float = 48.0
center_lon class-attribute instance-attribute
center_lon: float = 7.0
zoom class-attribute instance-attribute
zoom: int = 4
pitch class-attribute instance-attribute
pitch: float = 0
bearing class-attribute instance-attribute
bearing: float = 0
lang class-attribute instance-attribute
lang: str = 'en'

CoreConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
65
66
67
68
69
@dataclass
class CoreConfig:
    redis_url: str = "redis://127.0.0.1:6379"
    plugins: list[str] = field(default_factory=list)
    log_level: str = "INFO"
redis_url class-attribute instance-attribute
redis_url: str = 'redis://127.0.0.1:6379'
plugins class-attribute instance-attribute
plugins: list[str] = field(default_factory=list)
log_level class-attribute instance-attribute
log_level: str = 'INFO'

CacheEntry dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
72
73
74
75
76
77
78
79
80
81
@dataclass
class CacheEntry:
    # origin url (if None, just serve the local file)
    origin: str | None = None
    # local path to cache the file
    local_path: Path | None = None
    # how to serve the file
    serve_route: str = ""
    # media type for the served file
    media_type: str = "application/octet-stream"
origin class-attribute instance-attribute
origin: str | None = None
local_path class-attribute instance-attribute
local_path: Path | None = None
serve_route class-attribute instance-attribute
serve_route: str = ''
media_type class-attribute instance-attribute
media_type: str = 'application/octet-stream'

CacheConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
84
85
86
@dataclass
class CacheConfig:
    entries: list[CacheEntry] = field(default_factory=list)
entries class-attribute instance-attribute
entries: list[CacheEntry] = field(default_factory=list)

Config dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    map: MapConfig = field(default_factory=MapConfig)
    plugins: dict[str, Any] = field(default_factory=dict)
    cache: CacheConfig = field(default_factory=CacheConfig)

    @classmethod
    def from_file(cls, config_path: Path) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config
core class-attribute instance-attribute
core: CoreConfig = field(default_factory=CoreConfig)
server class-attribute instance-attribute
server: ServerConfig = field(default_factory=ServerConfig)
channel class-attribute instance-attribute
channel: ChannelConfig = field(
    default_factory=ChannelConfig
)
map class-attribute instance-attribute
map: MapConfig = field(default_factory=MapConfig)
plugins class-attribute instance-attribute
plugins: dict[str, Any] = field(default_factory=dict)
cache class-attribute instance-attribute
cache: CacheConfig = field(default_factory=CacheConfig)
from_file classmethod
from_file(config_path: Path) -> Config
Source code in packages/tangram_core/src/tangram_core/config.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def from_file(cls, config_path: Path) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

FrontendChannelConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
114
115
116
@dataclass
class FrontendChannelConfig:
    url: str
url instance-attribute
url: str

FrontendConfig dataclass

Source code in packages/tangram_core/src/tangram_core/config.py
119
120
121
122
@dataclass
class FrontendConfig:
    channel: FrontendChannelConfig
    map: MapConfig
channel instance-attribute
map instance-attribute
map: MapConfig

plugin

RouterFunc module-attribute

RouterFunc: TypeAlias = Callable[[], APIRouter]

ServiceAsyncFunc module-attribute

ServiceAsyncFunc: TypeAlias = Callable[
    [BackendState], Awaitable[None]
]

ServiceFunc module-attribute

ServiceFunc: TypeAlias = (
    ServiceAsyncFunc | Callable[[BackendState], None]
)

Priority module-attribute

Priority: TypeAlias = int

DistName module-attribute

DistName = NewType('DistName', str)

logger module-attribute

logger = getLogger(__name__)

Plugin dataclass

Source code in packages/tangram_core/src/tangram_core/plugin.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass
class Plugin:
    frontend_path: str | None = None
    routers: list[APIRouter] = field(default_factory=list)
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator
frontend_path class-attribute instance-attribute
frontend_path: str | None = None
routers class-attribute instance-attribute
routers: list[APIRouter] = field(default_factory=list)
services class-attribute instance-attribute
services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)
register_service
register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]
Source code in packages/tangram_core/src/tangram_core/plugin.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

scan_plugins

scan_plugins() -> EntryPoints
Source code in packages/tangram_core/src/tangram_core/plugin.py
51
52
def scan_plugins() -> importlib.metadata.EntryPoints:
    return importlib.metadata.entry_points(group="tangram_core.plugins")

load_plugin

load_plugin(
    entry_point: EntryPoint,
) -> tuple[DistName, Plugin] | None
Source code in packages/tangram_core/src/tangram_core/plugin.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def load_plugin(
    entry_point: importlib.metadata.EntryPoint,
) -> tuple[DistName, Plugin] | None:
    try:
        plugin_instance = entry_point.load()
    except Exception as e:
        tb = traceback.format_exc()
        logger.error(
            f"failed to load plugin {entry_point.name}: {e}. {tb}"
            f"\n= help: does {entry_point.value} exist?"
        )
        return None
    if not isinstance(plugin_instance, Plugin):
        logger.error(f"entry point {entry_point.name} is not an instance of `Plugin`")
        return None
    return DistName(entry_point.name), plugin_instance

redis

log module-attribute

log = getLogger(__name__)

StateT module-attribute

StateT = TypeVar('StateT')

Subscriber

Bases: ABC, Generic[StateT]

Source code in packages/tangram_core/src/tangram_core/redis.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Subscriber(abc.ABC, Generic[StateT]):
    redis: Redis
    task: asyncio.Task[None]
    pubsub: PubSub

    def __init__(
        self, name: str, redis_url: str, channels: List[str], initial_state: StateT
    ):
        self.name = name
        self.redis_url: str = redis_url
        self.channels: List[str] = channels
        self.state: StateT = initial_state
        self._running = False

    async def subscribe(self) -> None:
        if self._running:
            log.warning("%s already running", self.name)
            return

        try:
            self.redis = await Redis.from_url(self.redis_url)
            self.pubsub = self.redis.pubsub()
            await self.pubsub.psubscribe(*self.channels)
        except RedisError as e:
            log.error("%s failed to connect to Redis: %s", self.name, e)
            raise

        async def listen() -> None:
            try:
                log.info("%s listening ...", self.name)
                async for message in self.pubsub.listen():
                    log.debug("message: %s", message)
                    if message["type"] == "pmessage":
                        await self.message_handler(
                            message["channel"].decode("utf-8"),
                            message["data"].decode("utf-8"),
                            message["pattern"].decode("utf-8"),
                            self.state,
                        )
            except asyncio.CancelledError:
                log.warning("%s cancelled", self.name)

        self._running = True

        self.task = asyncio.create_task(listen())
        log.info("%s task created, running ...", self.name)

    async def cleanup(self) -> None:
        if not self._running:
            return

        if self.task:
            log.debug("%s canceling task ...", self.name)
            self.task.cancel()
            try:
                log.debug("%s await task to finish ...", self.name)
                await self.task
                log.debug("%s task canceled", self.name)
            except asyncio.CancelledError as exc:
                log.error("%s task canceling error: %s", self.name, exc)
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis:
            await self.redis.close()
        self._running = False

    def is_active(self) -> bool:
        """Return True if the subscriber is actively listening."""
        return self._running and self.task is not None and not self.task.done()

    @abc.abstractmethod
    async def message_handler(
        self, event: str, payload: str, pattern: str, state: StateT
    ) -> None:
        pass
redis instance-attribute
redis: Redis
task instance-attribute
task: Task[None]
pubsub instance-attribute
pubsub: PubSub
name instance-attribute
name = name
redis_url instance-attribute
redis_url: str = redis_url
channels instance-attribute
channels: List[str] = channels
state instance-attribute
state: StateT = initial_state
subscribe async
subscribe() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def subscribe(self) -> None:
    if self._running:
        log.warning("%s already running", self.name)
        return

    try:
        self.redis = await Redis.from_url(self.redis_url)
        self.pubsub = self.redis.pubsub()
        await self.pubsub.psubscribe(*self.channels)
    except RedisError as e:
        log.error("%s failed to connect to Redis: %s", self.name, e)
        raise

    async def listen() -> None:
        try:
            log.info("%s listening ...", self.name)
            async for message in self.pubsub.listen():
                log.debug("message: %s", message)
                if message["type"] == "pmessage":
                    await self.message_handler(
                        message["channel"].decode("utf-8"),
                        message["data"].decode("utf-8"),
                        message["pattern"].decode("utf-8"),
                        self.state,
                    )
        except asyncio.CancelledError:
            log.warning("%s cancelled", self.name)

    self._running = True

    self.task = asyncio.create_task(listen())
    log.info("%s task created, running ...", self.name)
cleanup async
cleanup() -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def cleanup(self) -> None:
    if not self._running:
        return

    if self.task:
        log.debug("%s canceling task ...", self.name)
        self.task.cancel()
        try:
            log.debug("%s await task to finish ...", self.name)
            await self.task
            log.debug("%s task canceled", self.name)
        except asyncio.CancelledError as exc:
            log.error("%s task canceling error: %s", self.name, exc)
    if self.pubsub:
        await self.pubsub.unsubscribe()
    if self.redis:
        await self.redis.close()
    self._running = False
is_active
is_active() -> bool

Return True if the subscriber is actively listening.

Source code in packages/tangram_core/src/tangram_core/redis.py
81
82
83
def is_active(self) -> bool:
    """Return True if the subscriber is actively listening."""
    return self._running and self.task is not None and not self.task.done()
message_handler abstractmethod async
message_handler(
    event: str, payload: str, pattern: str, state: StateT
) -> None
Source code in packages/tangram_core/src/tangram_core/redis.py
85
86
87
88
89
@abc.abstractmethod
async def message_handler(
    self, event: str, payload: str, pattern: str, state: StateT
) -> None:
    pass

tangram_core._core

ChannelConfig

host property writable

host: str

port property writable

port: int

redis_url property writable

redis_url: str

jwt_secret property writable

jwt_secret: str

jwt_expiration_secs property writable

jwt_expiration_secs: int

id_length property writable

id_length: int

__new__

__new__(
    host: str,
    port: int,
    redis_url: str,
    jwt_secret: str,
    jwt_expiration_secs: int,
    id_length: int,
) -> ChannelConfig

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run

run(config: ChannelConfig) -> Any