| // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| part of '../pubsub.dart'; |
| |
| class _PubSubImpl implements PubSub { |
| @override |
| final String project; |
| final pubsub.PubsubApi _api; |
| final String _topicPrefix; |
| final String _subscriptionPrefix; |
| |
| _PubSubImpl(http.Client client, this.project) |
| : _api = pubsub.PubsubApi(client), |
| _topicPrefix = 'projects/$project/topics/', |
| _subscriptionPrefix = 'projects/$project/subscriptions/'; |
| |
| _PubSubImpl.rootUrl(http.Client client, this.project, String rootUrl) |
| : _api = pubsub.PubsubApi(client, rootUrl: rootUrl), |
| _topicPrefix = 'projects/$project/topics/', |
| _subscriptionPrefix = 'projects/$project/subscriptions/'; |
| |
| String _fullTopicName(String name) { |
| return name.startsWith('projects/') ? name : '$_topicPrefix$name'; |
| } |
| |
| String _fullSubscriptionName(String name) { |
| return name.startsWith('projects/') ? name : '$_subscriptionPrefix$name'; |
| } |
| |
| Future<pubsub.Topic> _createTopic(String name) { |
| return _api.projects.topics.create(pubsub.Topic(), name); |
| } |
| |
| Future _deleteTopic(String name) { |
| // The Pub/Sub delete API returns an instance of Empty. |
| return _api.projects.topics.delete(name).then((_) => null); |
| } |
| |
| Future<pubsub.Topic> _getTopic(String name) { |
| return _api.projects.topics.get(name); |
| } |
| |
| Future<pubsub.ListTopicsResponse> _listTopics( |
| int pageSize, String? nextPageToken) { |
| return _api.projects.topics.list('projects/$project', |
| pageSize: pageSize, pageToken: nextPageToken); |
| } |
| |
| Future<pubsub.Subscription> _createSubscription( |
| String name, String topic, Uri? endpoint) { |
| var subscription = pubsub.Subscription()..topic = topic; |
| if (endpoint != null) { |
| var pushConfig = pubsub.PushConfig()..pushEndpoint = endpoint.toString(); |
| subscription.pushConfig = pushConfig; |
| } |
| return _api.projects.subscriptions.create(subscription, name); |
| } |
| |
| Future _deleteSubscription(String name) { |
| // The Pub/Sub delete API returns an instance of Empty. |
| return _api.projects.subscriptions |
| .delete(_fullSubscriptionName(name)) |
| .then((_) => null); |
| } |
| |
| Future<pubsub.Subscription> _getSubscription(String name) { |
| return _api.projects.subscriptions.get(name); |
| } |
| |
| Future<pubsub.ListSubscriptionsResponse> _listSubscriptions( |
| String? topic, int pageSize, String? nextPageToken) { |
| return _api.projects.subscriptions.list('projects/$project', |
| pageSize: pageSize, pageToken: nextPageToken); |
| } |
| |
| Future _modifyPushConfig(String subscription, Uri? endpoint) { |
| var pushConfig = pubsub.PushConfig()..pushEndpoint = endpoint?.toString(); |
| var request = pubsub.ModifyPushConfigRequest()..pushConfig = pushConfig; |
| return _api.projects.subscriptions.modifyPushConfig(request, subscription); |
| } |
| |
| Future _publish( |
| String topic, List<int> message, Map<String, String> attributes) { |
| var request = pubsub.PublishRequest() |
| ..messages = [ |
| (pubsub.PubsubMessage() |
| ..dataAsBytes = message |
| ..attributes = attributes.isEmpty ? null : attributes) |
| ]; |
| // TODO(sgjesse): Handle PublishResponse containing message ids. |
| return _api.projects.topics.publish(request, topic).then((_) => null); |
| } |
| |
| Future<pubsub.PullResponse> _pull( |
| String subscription, bool returnImmediately) { |
| var request = pubsub.PullRequest() |
| ..maxMessages = 1 |
| // ignore: deprecated_member_use |
| ..returnImmediately = returnImmediately; |
| return _api.projects.subscriptions.pull(request, subscription); |
| } |
| |
| Future _ack(String ackId, String subscription) { |
| var request = pubsub.AcknowledgeRequest()..ackIds = [ackId]; |
| // The Pub/Sub acknowledge API returns an instance of Empty. |
| return _api.projects.subscriptions |
| .acknowledge(request, subscription) |
| .then((_) => null); |
| } |
| |
| void _checkTopicName(String name) { |
| if (name.startsWith('projects/') && !name.contains('/topics/')) { |
| throw ArgumentError( |
| 'Illegal topic name. Absolute topic names must have the form ' |
| "'projects/[project-id]/topics/[topic-name]"); |
| } |
| if (name.endsWith('/topics/')) { |
| throw ArgumentError( |
| 'Illegal topic name. Relative part of the name cannot be empty'); |
| } |
| } |
| |
| void _checkSubscriptionName(String name) { |
| if (name.startsWith('projects/') && !name.contains('/subscriptions/')) { |
| throw ArgumentError( |
| 'Illegal subscription name. Absolute subscription names must have ' |
| "the form 'projects/[project-id]/subscriptions/[subscription-name]"); |
| } |
| if (name.endsWith('/subscriptions/')) { |
| throw ArgumentError( |
| 'Illegal subscription name. Relative part of the name cannot be ' |
| 'empty'); |
| } |
| } |
| |
| @override |
| Future<Topic> createTopic(String name) { |
| _checkTopicName(name); |
| return _createTopic(_fullTopicName(name)) |
| .then((top) => _TopicImpl(this, top)); |
| } |
| |
| @override |
| Future deleteTopic(String name) { |
| _checkTopicName(name); |
| return _deleteTopic(_fullTopicName(name)); |
| } |
| |
| @override |
| Future<Topic> lookupTopic(String name) { |
| _checkTopicName(name); |
| return _getTopic(_fullTopicName(name)).then((top) => _TopicImpl(this, top)); |
| } |
| |
| @override |
| Stream<Topic> listTopics() { |
| Future<Page<Topic>> firstPage(int pageSize) { |
| return _listTopics(pageSize, null) |
| .then((response) => _TopicPageImpl(this, pageSize, response)); |
| } |
| |
| return StreamFromPages<Topic>(firstPage).stream; |
| } |
| |
| @override |
| Future<Page<Topic>> pageTopics({int pageSize = 50}) { |
| return _listTopics(pageSize, null).then((response) { |
| return _TopicPageImpl(this, pageSize, response); |
| }); |
| } |
| |
| @override |
| Future<Subscription> createSubscription(String name, String topic, |
| {Uri? endpoint}) { |
| _checkSubscriptionName(name); |
| _checkTopicName(topic); |
| return _createSubscription( |
| _fullSubscriptionName(name), _fullTopicName(topic), endpoint) |
| .then((sub) => _SubscriptionImpl(this, sub)); |
| } |
| |
| @override |
| Future deleteSubscription(String name) { |
| _checkSubscriptionName(name); |
| return _deleteSubscription(_fullSubscriptionName(name)); |
| } |
| |
| @override |
| Future<Subscription> lookupSubscription(String name) { |
| _checkSubscriptionName(name); |
| return _getSubscription(_fullSubscriptionName(name)) |
| .then((sub) => _SubscriptionImpl(this, sub)); |
| } |
| |
| @override |
| Stream<Subscription> listSubscriptions([String? query]) { |
| Future<Page<Subscription>> firstPage(int pageSize) { |
| return _listSubscriptions(query, pageSize, null).then( |
| (response) => _SubscriptionPageImpl(this, query, pageSize, response)); |
| } |
| |
| return StreamFromPages<Subscription>(firstPage).stream; |
| } |
| |
| @override |
| Future<Page<Subscription>> pageSubscriptions( |
| {String? topic, int pageSize = 50}) { |
| return _listSubscriptions(topic, pageSize, null).then((response) { |
| return _SubscriptionPageImpl(this, topic, pageSize, response); |
| }); |
| } |
| } |
| |
| /// Message class for messages constructed through 'new Message()'. It stores |
| /// the user supplied body as either String or bytes. |
| class _MessageImpl implements Message { |
| // The message body, if it is a `String`. In that case, [bytesMessage] is |
| // null. |
| final String? _stringMessage; |
| |
| // The message body, if it is a byte list. In that case, [stringMessage] is |
| // null. |
| final List<int>? _bytesMessage; |
| |
| @override |
| final Map<String, String> attributes; |
| |
| _MessageImpl.withString( |
| this._stringMessage, { |
| Map<String, String>? attributes, |
| }) : _bytesMessage = null, |
| attributes = attributes ?? <String, String>{}; |
| |
| _MessageImpl.withBytes(this._bytesMessage, {Map<String, String>? attributes}) |
| : _stringMessage = null, |
| attributes = attributes ?? <String, String>{}; |
| |
| @override |
| List<int> get asBytes => _bytesMessage ?? utf8.encode(_stringMessage!); |
| |
| @override |
| String get asString => _stringMessage ?? utf8.decode(_bytesMessage!); |
| } |
| |
| /// Message received using [Subscription.pull]. |
| /// |
| /// Contains the [pubsub.PubsubMessage] received from Pub/Sub, and |
| /// makes the message body and labels available on request. |
| /// |
| /// The labels map is lazily created when first accessed. |
| class _PullMessage implements Message { |
| final pubsub.PubsubMessage _message; |
| List<int>? _bytes; |
| String? _string; |
| |
| _PullMessage(this._message); |
| |
| @override |
| List<int> get asBytes { |
| _bytes ??= _message.dataAsBytes; |
| return _bytes!; |
| } |
| |
| @override |
| String get asString { |
| _string ??= utf8.decode(_message.dataAsBytes); |
| return _string!; |
| } |
| |
| @override |
| Map<String, String> get attributes => |
| _message.attributes ?? <String, String>{}; |
| } |
| |
| /// Message received through Pub/Sub push delivery. |
| /// |
| /// Stores the message body received from Pub/Sub as the Base64 encoded string |
| /// from the wire protocol. |
| /// |
| /// The labels have been decoded into a Map. |
| class _PushMessage implements Message { |
| final String _base64Message; |
| @override |
| final Map<String, String> attributes; |
| |
| _PushMessage(this._base64Message, this.attributes); |
| |
| @override |
| List<int> get asBytes => base64.decode(_base64Message); |
| |
| @override |
| String get asString => utf8.decode(asBytes); |
| } |
| |
| /// Pull event received from Pub/Sub pull delivery. |
| /// |
| /// Stores the pull response received from Pub/Sub. |
| class _PullEventImpl implements PullEvent { |
| /// Pub/Sub API object. |
| final _PubSubImpl _api; |
| |
| /// Subscription this was received from. |
| final String _subscriptionName; |
| |
| /// Low level response received from Pub/Sub. |
| final pubsub.PullResponse _response; |
| @override |
| final Message message; |
| |
| _PullEventImpl( |
| this._api, this._subscriptionName, pubsub.PullResponse response) |
| : _response = response, |
| message = _PullMessage(response.receivedMessages![0].message!); |
| |
| @override |
| Future acknowledge() { |
| return _api._ack(_response.receivedMessages![0].ackId!, _subscriptionName); |
| } |
| } |
| |
| /// Push event received from Pub/Sub push delivery. |
| /// |
| /// decoded from JSON encoded push HTTP request body. |
| class _PushEventImpl implements PushEvent { |
| static const _prefix = '/subscriptions/'; |
| final Message _message; |
| final String _subscriptionName; |
| |
| @override |
| Message get message => _message; |
| |
| @override |
| String get subscriptionName => _subscriptionName; |
| |
| _PushEventImpl(this._message, this._subscriptionName); |
| |
| factory _PushEventImpl.fromJson(String json) { |
| Map body = jsonDecode(json) as Map<String, dynamic>; |
| var data = (body['message'] as Map)['data'] as String; |
| Map<String, String> labels = HashMap(); |
| for (var label in (body['message'] as Map)['labels'] as List) { |
| final l = label as Map; |
| var key = l['key'] as String; |
| var value = l['strValue'] ?? l['numValue']; |
| labels[key] = value.toString(); |
| } |
| var subscription = body['subscription'] as String; |
| // TODO(#1): Remove this when the push event subscription name is prefixed |
| // with '/subscriptions/'. |
| if (!subscription.startsWith(_prefix)) { |
| subscription = _prefix + subscription; |
| } |
| return _PushEventImpl(_PushMessage(data, labels), subscription); |
| } |
| } |
| |
| class _TopicImpl implements Topic { |
| final _PubSubImpl _api; |
| final pubsub.Topic _topic; |
| |
| _TopicImpl(this._api, this._topic); |
| |
| @override |
| String get name { |
| assert(_topic.name!.startsWith(_api._topicPrefix)); |
| return _topic.name!.substring(_api._topicPrefix.length); |
| } |
| |
| @override |
| String get project { |
| assert(_topic.name!.startsWith(_api._topicPrefix)); |
| return _api.project; |
| } |
| |
| @override |
| String get absoluteName => _topic.name!; |
| |
| @override |
| Future publish(Message message) { |
| return _api._publish(_topic.name!, message.asBytes, message.attributes); |
| } |
| |
| @override |
| Future delete() => _api._deleteTopic(_topic.name!); |
| |
| @override |
| Future publishString(String message, {Map<String, String>? attributes}) { |
| attributes ??= <String, String>{}; |
| return _api._publish(_topic.name!, utf8.encode(message), attributes); |
| } |
| |
| @override |
| Future publishBytes(List<int> message, {Map<String, String>? attributes}) { |
| attributes ??= <String, String>{}; |
| return _api._publish(_topic.name!, message, attributes); |
| } |
| } |
| |
| class _SubscriptionImpl implements Subscription { |
| final _PubSubImpl _api; |
| final pubsub.Subscription _subscription; |
| |
| _SubscriptionImpl(this._api, this._subscription); |
| |
| @override |
| String get name { |
| assert(_subscription.name!.startsWith(_api._subscriptionPrefix)); |
| return _subscription.name!.substring(_api._subscriptionPrefix.length); |
| } |
| |
| @override |
| String get project { |
| assert(_subscription.name!.startsWith(_api._subscriptionPrefix)); |
| return _api.project; |
| } |
| |
| @override |
| String get absoluteName => _subscription.name!; |
| |
| @override |
| Topic get topic { |
| var topic = pubsub.Topic()..name = _subscription.topic; |
| return _TopicImpl(_api, topic); |
| } |
| |
| @override |
| Future delete() => _api._deleteSubscription(_subscription.name!); |
| |
| @override |
| Future<PullEvent?> pull({ |
| @Deprecated('returnImmediately has been deprecated from pubsub') |
| bool wait = true, |
| }) { |
| return _api._pull(_subscription.name!, !wait).then((response) { |
| // The documentation says 'Returns an empty list if there are no |
| // messages available in the backlog'. However the receivedMessages |
| // property can also be null in that case. |
| if (response.receivedMessages == null || |
| response.receivedMessages!.isEmpty) { |
| return null; |
| } |
| return _PullEventImpl(_api, _subscription.name!, response); |
| }).catchError((e) => null, |
| test: (e) => e is pubsub.DetailedApiRequestError && e.status == 400); |
| } |
| |
| @override |
| Uri? get endpoint => null; |
| |
| @override |
| bool get isPull => endpoint == null; |
| |
| @override |
| bool get isPush => endpoint != null; |
| |
| @override |
| Future updatePushConfiguration(Uri endpoint) { |
| return _api._modifyPushConfig(_subscription.name!, endpoint); |
| } |
| } |
| |
| class _TopicPageImpl implements Page<Topic> { |
| final _PubSubImpl _api; |
| final int _pageSize; |
| final String? _nextPageToken; |
| @override |
| final List<Topic> items = []; |
| |
| _TopicPageImpl(this._api, this._pageSize, pubsub.ListTopicsResponse response) |
| : _nextPageToken = response.nextPageToken { |
| final topics = response.topics; |
| if (topics != null) { |
| items.addAll(topics.map((t) => _TopicImpl(_api, t))); |
| } |
| } |
| |
| @override |
| bool get isLast => _nextPageToken == null; |
| |
| @override |
| Future<Page<Topic>> next({int? pageSize}) async { |
| throwIfIsLast(); |
| final pageSize_ = pageSize ?? _pageSize; |
| |
| return _api._listTopics(pageSize_, _nextPageToken).then((response) { |
| return _TopicPageImpl(_api, pageSize_, response); |
| }); |
| } |
| } |
| |
| class _SubscriptionPageImpl implements Page<Subscription> { |
| final _PubSubImpl _api; |
| final String? _topic; |
| final int _pageSize; |
| final String? _nextPageToken; |
| @override |
| final List<Subscription> items = []; |
| |
| _SubscriptionPageImpl(this._api, this._topic, this._pageSize, |
| pubsub.ListSubscriptionsResponse response) |
| : _nextPageToken = response.nextPageToken { |
| final subscriptions = response.subscriptions; |
| if (subscriptions != null) { |
| items.addAll(subscriptions.map((s) => _SubscriptionImpl(_api, s))); |
| } |
| } |
| |
| @override |
| bool get isLast => _nextPageToken == null; |
| |
| @override |
| Future<Page<Subscription>> next({int? pageSize}) { |
| throwIfIsLast(); |
| final pageSize_ = pageSize ?? _pageSize; |
| |
| return _api |
| ._listSubscriptions(_topic, pageSize_, _nextPageToken) |
| .then((response) { |
| return _SubscriptionPageImpl(_api, _topic, pageSize_, response); |
| }); |
| } |
| } |